[Python] Surveillance des dossiers avec watchdog

introduction

Bonsoir, c'est mon premier post depuis longtemps. J'ai un peu de temps Avec un service commémoratif pour l'ouverture de l'application qui a coulé il y a environ un an J'essaierai de le refaire.

Cette fois, nous allons créer une application résidente qui surveille les dossiers. Si le fichier est simplement placé quelque part dans un dossier spécifié C'est aussi simple que de copier dans un dossier spécifié. Cela a pris du temps. .. ..

Environnement de développement

Windows 10 python-3.8.2 Bibliothèque utilisée: watchdog

Surveillance

Pour le moment, créez la classe WatchFileHandler suivante. Il s'agit de la fin de la surveillance. Tu l'as fait! <détails> <résumé> Premier </ résumé>

folder_watch.py


#Surveillance de la classe d'acquisition d'événements
class WacthFileHandler(FileSystemEventHandler):
    def __init__(self, watch_path, copy_to_path, backup_path):
        super(WacthFileHandler, self).__init__()
        self.watch_path = watch_path
        self.copy_to_path = copy_to_path
        self.backup_path = backup_path

    def on_moved(self, event):
        """
Détection de mouvement de fichier
        :param event:
        :return:
        """
        file_path = event.src_path
        file_name = os.path.basename(file_path)

    def on_created(self, event):
        """
Détection de création de fichier
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)

    def on_modified(self, event):
        """
Détection de changement de fichier
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)

    def on_deleted(self, event):
        """
Détection de suppression de fichier
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)


def watch_start(watch_path, copy_to_path, backup_path):
    """
Le processus de surveillance des dossiers a commencé
    :param from_watch_path  :Suivi du chemin du dossier
    :param to_copy_path     :Chemin du dossier de destination
    :param backup_path      :Enregistrer le chemin du dossier de destination
    :return
    """
    event_handler = WacthFileHandler(watch_path, copy_to_path, backup_path)
    observer = Observer()
    observer.schedule(event_handler, watch_path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(5)
    except KeyboardInterrupt:
        observer.stop()
    except Exception as e:
        observer.stop()
        raise e
    finally:
        # finaly =Dernier traitement indépendamment de l'occurrence d'une exception
        observer.join()

Ensuite, nous surveillerons et examinerons le comportement lorsque le fichier arrivera. Cette fois après avoir déplacé (copié) le fichier Assurez-vous que le fichier d'origine correspond et supprimez le fichier d'origine. Si vous ne pouvez pas le copier correctement, isolez-le. C'est devenu une parade de méthode statique. .. .. <détails>

Surveillance du comportement </ summary>

folder_watch.py


    def on_created(self, event):
        """
Détection de création de fichier
        :param event:
        :return:
        """
        #Obtenir le nom du fichier
        src_name = os.path.basename(event.src_path)
        #Générer le chemin du dossier de la source de surveillance
        src_path = pathlib.Path(self.watch_path) / pathlib.Path(f'{src_name}')
        #copie(Bouge toi)Générer le chemin du dossier de destination
        copy_path = pathlib.Path(self.copy_to_path) / pathlib.Path(f'{src_name}')
        #Générer un chemin de dossier de destination de sauvegarde
        backup_link = pathlib.Path(self.backup_path)

        try:
            #Exécuter le traitement
            self._run(src_path, copy_path, backup_link)
        except TimeoutError as e:
            #C'est trop grand!
            pass
        except Exception as e:
            pass

    def _run(self, src: Path, copy: Path, bk: Path):
        """
Copier, vérifier, supprimer lorsque le fichier est détecté/Bouge toi
        :param src:
        :param copy:
        :return:
        """
        #Attendez la fin du placement(Pour une certaine période de temps[600s]Attendre)
        if not self._wait_for_file_created_finished_windows(file_path=src, time_out=600):
            raise TimeoutError

        #Copiez le fichier placé
        if not self._copy_to_file(src, copy):
            return

        #Obtenez le hachage de deux fichiers
        src_hash = self._get_md5_hash(src)
        copy_hash = self._get_md5_hash(copy)

        if self._check_hash(src_hash, copy_hash):
            #Matchs de hachage
            #Supprimer le fichier d'origine
            self._del_original_file(src)
        else:
            #Non-concordance de hachage
            #Déplacer pour enregistrer la destination
            self._move_original_file(bk)

    def _copy_to_file(self, src, copy):
        """
Copiez le fichier placé dans le dossier spécifié
        :param src:
        :param copy_to:
        :return:
        """
        #S'il n'y a pas de fichier placé, aucun autre traitement ne sera effectué.
        if not src.exists():
            return False

        #Métadonnées du fichier(Heure de création, heure de changement, etc.)Copie y compris
        copy_link = shutil.copy2(src, copy, follow_symlinks=True)

        #Vérifiez que le chemin que vous tentiez de copier correspond au chemin que vous avez copié
        if copy != copy_link:
            return False

        if not copy.exists():
            return False

        return True

    @staticmethod
    def _wait_for_file_created_finished_linux(file_path, time_out):
        """
Non confirmé pour fonctionner sous Linux
Méthode de jugement de fin de création du fichier placé
URL de référence:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
        :param file_path:
        :param time_out:
        :return:
        """
        size_now = 0
        size_past = -1
        start = time.time()
        while True:
            size_now = os.path.getsize(file_path)
            time.sleep(1)
            elapsed_time = time.time() - start
            if size_now == size_past and os.access(file_path, os.R_OK):
                return True
            else:
                size_past = os.path.getsize(file_path)
                if elapsed_time >= time_out:
                    return False

    @staticmethod
    def _wait_for_file_created_finished_windows(file_path: Path, time_out):
        """
Création du fichier placé(copie)Méthode de jugement d'achèvement
URL de référence:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
        :param file_path:
        :param time_out:
        :return:
        """
        start = time.time()
        while True:
            try:
                elapsed_time = time.time() - start
                new_path = str(file_path) + "_"
                os.rename(file_path, new_path)
                os.rename(new_path, file_path)
                time.sleep(1)
                return True
            except OSError:
                time.sleep(1)
                if elapsed_time >= time_out:
                    return False

    @staticmethod
    def _get_md5_hash(file_path):
        """
Valeur de hachage du fichier md5(Format hexagonal)Avoir
        :param file_path:
        :return:
        """
        with open(file_path, 'rb') as file:
            binary_data = file.read()
            #Obtenir la valeur de hachage au format hexadécimal
            md5 = hashlib.md5(binary_data).hexdigest()
            return md5

    @staticmethod
    def _check_hash(src_hash, target_hash):
        """
Comparez deux hachages
        :param src_hash:
        :param target_hash:
        :return:
        """
        return src_hash == target_hash

    @staticmethod
    def _del_original_file( src):
        """
Supprimer le fichier source de la copie
        :param src:
        :return:
        """
        os.remove(src)

    @staticmethod
    def _move_original_file(src_path, move_path):
        """
Déplacer le fichier source de la copie(Évacuation)
        :return:
        """
        shutil.move(src_path, move_path)
Permet de l'exécuter avec des arguments d'exécution. J'ajouterai également cet argument de jugement. ↓ le traitement est ajouté.

<détails>

Traitement des arguments d'exécution </ summary>

folder_watch.py


def interpret_args():
    """
Méthode d'interprétation des arguments au moment de l'exécution
    :return:Arguments d'exécution
    """
    #Création d'objets
    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)

    #Réglage de l'argument
    #Suivi du chemin du dossier(Obligatoire)
    parser.add_argument("-w", "--watch_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is watch folder path'''), type=str)

    #Copier le chemin du dossier de destination(Obligatoire)
    parser.add_argument("-cp", "--copy_to_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is copy to folder path'''), type=str)

    #Enregistrer le chemin du dossier de destination(Obligatoire)
    parser.add_argument("-bk", "--backup_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is backup to folder path'''), type=str)

    #Renvoyer le résultat
    return parser.parse_args()


def check_args(args):
    """
Méthode de jugement de l'argument d'exécution
    :param args:
    :return: True or False
    """
    #Erreur si le chemin du dossier surveillé n'est pas spécifié
    if not hasattr(args, 'watch_path') and args.watch_path is None:
        raise argparse.ArgumentError('Je ne spécifie pas de dossier de surveillance!')

    #Erreur si le chemin du dossier de destination n'est pas spécifié
    if not hasattr(args, 'copy_to_path') and args.copy_to_path is None:
        raise argparse.ArgumentError('Je ne spécifie pas le dossier de destination!')

    #Erreur si le chemin du dossier de destination d'enregistrement n'est pas spécifié
    if not hasattr(args, 'backup_path') and args.backup_path is None:
        raise argparse.ArgumentError('Je ne spécifie pas le dossier de destination de sauvegarde!')

    #Génération d'objets pour chaque chemin
    watch_path = pathlib.Path(args.watch_path)
    copy_to_path = pathlib.Path(args.copy_to_path)
    backup_path = pathlib.Path(args.backup_path)

    #Vérifiez si le chemin du dossier de surveillance existe
    if not watch_path.exists():
        raise FileNotFoundError('Il n'y a pas de dossier de surveillance!')

    #Vérifiez si le dossier de surveillance est un répertoire
    if not watch_path.is_dir():
        raise TypeError('Le dossier de surveillance spécifié n'est pas un dossier!')

    #Vérifiez si le chemin du dossier de destination existe
    if not copy_to_path.exists():
        raise FileNotFoundError('Il n'y a pas de dossier de destination!')

    #Vérifiez si le dossier de destination est un répertoire
    if not copy_to_path.is_dir():
        raise TypeError('Le dossier de destination spécifié n'est pas un dossier!')

    #Vérifiez si le chemin du dossier de destination existe
    if not backup_path.exists():
        raise FileNotFoundError('Il n'y a pas de dossier de destination de sauvegarde!')

    #Vérifiez si le dossier de destination est un répertoire
    if not backup_path.is_dir():
        raise TypeError('Le dossier de destination d'enregistrement spécifié n'est pas un dossier!')


#Traitement d'exécution
if __name__ == '__main__':
    try:
        #Interprétation des arguments,Jugement
        args = interpret_args()
        #Vérification des arguments
        check_args(args)
        #Suivi de l'exécution
        watch_start(args.watch_path, args.copy_to_path)
    except argparse.ArgumentError as e:
        pass
    except FileNotFoundError as e:
        pass
    except TypeError as e:
        pass
    except Exception as e:
        pass

À ce rythme, je ne sais pas ce que j'ai déménagé ou ce qui est arrivé, alors je vais préparer un journal. Pour le moment, la source est presque complète. <détails>

Source texte presque intégral </ summary>

folder_watch.py


# -*- coding: utf-8 -*-
import os
import time
import sys
import logging
import hashlib
import argparse
import textwrap
import pathlib
from pathlib import Path
import shutil
from datetime import datetime
from watchdog.observers import Observer
from logging.handlers import TimedRotatingFileHandler
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

try:
    import codecs
except ImportError:
    codecs = None


class MyTimedRotatingFileHandler(logging.handlers.TimedRotatingFileHandler):
    """
Gestionnaire de fichiers pour la classe de sortie du journal de date
    """
    def __init__(self, dir_log):
        self.dir_log = dir_log
        filename = self.dir_log + time.strftime("%Y%m%d") + ".log"  # dir_log here MUST be with os.sep on the end
        logging.handlers.TimedRotatingFileHandler.__init__(self, filename, when='midnight', interval=1, backupCount=0,
                                                           encoding=None)

    def doRollover(self):
        """
        TimedRotatingFileHandler remix - rotates logs on daily basis, and filename of current logfile is time.strftime("%m%d%Y")+".txt" always
        """
        self.stream.close()
        # get the time that this sequence started at and make it a TimeTuple
        t = self.rolloverAt - self.interval
        timeTuple = time.localtime(t)
        self.baseFilename = self.dir_log + time.strftime("%Y%m%d") + ".log"
        if self.encoding:
            self.stream = codecs.open(self.baseFilename, 'w', self.encoding)
        else:
            self.stream = open(self.baseFilename, 'w')
        self.rolloverAt = self.rolloverAt + self.interval


#Surveillance de la classe d'acquisition d'événements
class WacthFileHandler(FileSystemEventHandler):
    def __init__(self, watch_path, copy_to_path, backup_path):
        super(WacthFileHandler, self).__init__()
        self.watch_path = watch_path
        self.copy_to_path = copy_to_path
        self.backup_path = backup_path

    def on_moved(self, event):
        """
Détection de mouvement de fichier
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)
        logger.info(f'{src_name}A bougé')

    def on_created(self, event):
        """
Détection de création de fichier
        :param event:
        :return:
        """
        #Obtenir le nom du fichier
        src_name = os.path.basename(event.src_path)
        logger.info(f'{src_name}A été fait')
        #Générer le chemin du dossier de la source de surveillance
        src_path = pathlib.Path(self.watch_path) / pathlib.Path(f'{src_name}')
        #copie(Bouge toi)Générer le chemin du dossier de destination
        copy_path = pathlib.Path(self.copy_to_path) / pathlib.Path(f'{src_name}')
        #Générer un chemin de dossier de destination de sauvegarde
        backup_link = pathlib.Path(self.backup_path)

        try:
            #Exécuter le traitement
            self._run(src_path, copy_path, backup_link)
        except TimeoutError as e:
            #C'est trop grand!
            logger.error('C'est trop grand!')
            logger.error(e)
        except Exception as e:
            logger.error(e)

    def on_modified(self, event):
        """
Détection de changement de fichier
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)
        logger.info(f'{src_name}Modifié')

    def on_deleted(self, event):
        """
Détection de suppression de fichier
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)
        logger.info(f'{src_name}Supprimé s')

    def _run(self, src: Path, copy: Path, bk: Path):
        """
Copier, vérifier, supprimer lorsque le fichier est détecté/Bouge toi
        :param src:
        :param copy:
        :return:
        """
        #Attendez la fin du placement(Pour une certaine période de temps[600s]Attendre)
        if not self._wait_for_file_created_finished_windows(file_path=src, time_out=600):
            raise TimeoutError

        #Copiez le fichier placé
        if not self._copy_to_file(src, copy):
            return

        #Obtenez le hachage de deux fichiers
        src_hash = self._get_md5_hash(src)
        copy_hash = self._get_md5_hash(copy)

        if self._check_hash(src_hash, copy_hash):
            #Matchs de hachage
            #Supprimer le fichier d'origine
            self._del_original_file(src)
        else:
            #Non-concordance de hachage
            #Déplacer pour enregistrer la destination
            self._move_original_file(bk)

    def _copy_to_file(self, src, copy):
        """
Copiez le fichier placé dans le dossier spécifié
        :param src:
        :param copy_to:
        :return:
        """
        #S'il n'y a pas de fichier placé, aucun autre traitement ne sera effectué.
        if not src.exists():
            return False

        #Métadonnées du fichier(Heure de création, heure de changement, etc.)Copie y compris
        copy_link = shutil.copy2(src, copy, follow_symlinks=True)

        #Vérifiez que le chemin que vous tentiez de copier correspond au chemin que vous avez copié
        if copy != copy_link:
            return False

        if not copy.exists():
            return False

        return True

    @staticmethod
    def _wait_for_file_created_finished_linux(file_path, time_out):
        """
Non confirmé pour fonctionner sous Linux
Méthode de jugement de fin de création du fichier placé
URL de référence:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
        :param file_path:
        :param time_out:
        :return:
        """
        size_now = 0
        size_past = -1
        start = time.time()
        while True:
            size_now = os.path.getsize(file_path)
            time.sleep(1)
            elapsed_time = time.time() - start
            logger.info(f"size_now: {size_now}")
            logger.info(f"size_past: {size_past}")
            if size_now == size_past and os.access(file_path, os.R_OK):
                logger.info("file has copied completely now size: %s", size_now)
                return True
            else:
                size_past = os.path.getsize(file_path)
                if elapsed_time >= time_out:
                    logger.info('time out error')
                    return False

    @staticmethod
    def _wait_for_file_created_finished_windows(file_path: Path, time_out):
        """
Création du fichier placé(copie)Méthode de jugement d'achèvement
URL de référence:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
        :param file_path:
        :param time_out:
        :return:
        """
        start = time.time()
        while True:
            try:
                elapsed_time = time.time() - start
                new_path = str(file_path) + "_"
                os.rename(file_path, new_path)
                os.rename(new_path, file_path)
                time.sleep(1)
                logger.info('file copy...')
                return True
            except OSError:
                time.sleep(1)
                if elapsed_time >= time_out:
                    logger.info('time out error')
                    return False

    @staticmethod
    def _get_md5_hash(file_path):
        """
Valeur de hachage du fichier md5(Format hexagonal)Avoir
        :param file_path:
        :return:
        """
        with open(file_path, 'rb') as file:
            binary_data = file.read()
            #Obtenir la valeur de hachage au format hexadécimal
            md5 = hashlib.md5(binary_data).hexdigest()
            logger.info(f'Fichier:{file_path} -Valeur de hachage- {md5}')
            return md5

    @staticmethod
    def _check_hash(src_hash, target_hash):
        """
Comparez deux hachages
        :param src_hash:
        :param target_hash:
        :return:
        """
        return src_hash == target_hash

    @staticmethod
    def _del_original_file( src):
        """
Supprimer le fichier source de la copie
        :param src:
        :return:
        """
        os.remove(src)

    @staticmethod
    def _move_original_file(src_path, move_path):
        """
Déplacer le fichier source de la copie(Évacuation)
        :return:
        """
        shutil.move(src_path, move_path)


def watch_start(from_watch_path, to_copy_path, backup_path):
    """
Le processus de surveillance des dossiers a commencé
    :param from_watch_path  :Suivi du chemin du dossier
    :param to_copy_path     :Chemin du dossier de destination
    :param backup_path      :Enregistrer le chemin du dossier de destination
    :return:
    """
    event_handler = WacthFileHandler(from_watch_path, to_copy_path, backup_path)
    observer = Observer()
    observer.schedule(event_handler, from_watch_path, recursive=True)
    logger.info(f'Démarrage de la surveillance des dossiers')
    observer.start()
    try:
        while True:
            time.sleep(5)
    except KeyboardInterrupt:
        observer.stop()
    except Exception as e:
        observer.stop()
        raise e
    finally:
        # finaly =Dernier traitement indépendamment de l'occurrence d'une exception
        logger.info(f'Surveillance des dossiers terminée')
        observer.join()


def make_log_folder():
    """
Créer s'il n'y a pas de dossier de journaux au démarrage
    :return:
    """
    p = pathlib.Path(sys.argv[0])
    p2 = pathlib.Path(p.parent) / pathlib.Path('logs')
    if not p2.exists():
        os.makedirs(str(p2))


def interpret_args():
    """
Méthode d'interprétation des arguments au moment de l'exécution
    :return:Arguments d'exécution
    """
    #Création d'objets
    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)

    #Réglage de l'argument
    #Suivi du chemin du dossier(Obligatoire)
    parser.add_argument("-wp", "--watch_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is watch folder path'''), type=str)

    #Copier le chemin du dossier de destination(Obligatoire)
    parser.add_argument("-cp", "--copy_to_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is copy to folder path'''), type=str)

    #Enregistrer le chemin du dossier de destination(Obligatoire)
    parser.add_argument("-bk", "--backup_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is backup folder path'''), type=str)

    #Renvoyer le résultat
    return parser.parse_args()


def check_args(args):
    """
Méthode de jugement de l'argument d'exécution
    :param args:
    :return: True or False
    """
    #Erreur si le chemin du dossier surveillé n'est pas spécifié
    if not hasattr(args, 'watch_path') and args.watch_path is None:
        raise argparse.ArgumentError('Je ne spécifie pas de dossier de surveillance!')

    #Erreur si le chemin du dossier de destination n'est pas spécifié
    if not hasattr(args, 'copy_to_path') and args.copy_to_path is None:
        raise argparse.ArgumentError('Je ne spécifie pas le dossier de destination!')

    #Erreur si le chemin du dossier de destination d'enregistrement n'est pas spécifié
    if not hasattr(args, 'backup_path') and args.backup_path is None:
        raise argparse.ArgumentError('Je ne spécifie pas le dossier de destination de sauvegarde!')

    #Génération d'objets pour chaque chemin
    watch_path = pathlib.Path(args.watch_path)
    copy_to_path = pathlib.Path(args.copy_to_path)
    backup_path = pathlib.Path(args.backup_path)

    #Vérifiez si le chemin du dossier de surveillance existe
    if not watch_path.exists():
        raise FileNotFoundError('Il n'y a pas de dossier de surveillance!')

    #Vérifiez si le dossier de surveillance est un répertoire
    if not watch_path.is_dir():
        raise TypeError('Le dossier de surveillance spécifié n'est pas un dossier!')

    #Vérifiez si le chemin du dossier de destination existe
    if not copy_to_path.exists():
        raise FileNotFoundError('Il n'y a pas de dossier de destination!')

    #Vérifiez si le dossier de destination est un répertoire
    if not copy_to_path.is_dir():
        raise TypeError('Le dossier de destination spécifié n'est pas un dossier!')

    #Vérifiez si le chemin du dossier de destination existe
    if not backup_path.exists():
        raise FileNotFoundError('Il n'y a pas de dossier de destination de sauvegarde!')

    #Vérifiez si le dossier de destination est un répertoire
    if not backup_path.is_dir():
        raise TypeError('Le dossier de destination d'enregistrement spécifié n'est pas un dossier!')


#Traitement d'exécution
if __name__ == '__main__':

    #Créer s'il n'y a pas de dossier de journal(* La journalisation ne crée pas de dossiers.)
    make_log_folder()

    #Paramètres de journalisation
    # get the root logger
    root_logger = logging.getLogger()
    # set overall level to debug, default is warning for root logger
    root_logger.setLevel(logging.DEBUG)

    # setup logging to file, rotating at midnight
    file_log = MyTimedRotatingFileHandler(f'./logs/log_')
    file_log.setLevel(logging.DEBUG)
    file_formatter = logging.Formatter('■%(asctime)s - %(levelname)s - [%(funcName)s() %(lineno)ligne d] : %(message)s',
                                       datefmt='%Y-%m-%d %H:%M:%S')
    file_log.setFormatter(file_formatter)
    root_logger.addHandler(file_log)

    # setup logging to console
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    formatter = logging.Formatter('■%(asctime)s - %(levelname)s - [%(funcName)s() %(lineno)ligne d] : %(message)s',
                                  datefmt='%Y-%m-%d %H:%M:%S')
    console.setFormatter(formatter)
    root_logger.addHandler(console)

    # get a logger for my script
    logger = logging.getLogger(__name__)

    try:
        #Interprétation des arguments,Jugement
        args = interpret_args()
        #Vérification des arguments
        check_args(args)
        #Suivi de l'exécution
        watch_start(args.watch_path, args.copy_to_path, args.backup_path)
    except argparse.ArgumentError as e:
        logger.error(e)
    except FileNotFoundError as e:
        logger.error(e)
    except TypeError as e:
        logger.error(e)
    except Exception as e:
        logger.error(e)

C'est difficile à exécuter tel quel Créez une chauve-souris comme ↓ comme d'habitude.

bat

execute.bat


@echo off
setlocal
Faire de l'emplacement où se trouve le script rem le répertoire actuel
cd /d %~dp0

SET APP_TITLE=folder_watch
SET EXE_SCRIPT=folder_watch.py
dossier de surveillance rem
SET WATCH_PATH=.\test\from
rem copier le dossier de destination
SET COPY_PATH=.\test\to
dossier de destination de sauvegarde rem
SET BK_PATH=.\test\bk

exécution de la surveillance du dossier rem
START "%APP_TITLE%" ./python-3.8.2-embed-amd64/python.exe %EXE_SCRIPT% -wp %WATCH_PATH% -cp %COPY_PATH% -bk %BK_PATH%

Je me demande si c'est la fin si je le mets correctement dans le planificateur de tâches Je me posais la question, mais cela semble être le cas. Cette application est résidente et j'aimerais que vous la lanciez immédiatement même si elle tombe. De plus, si vous utilisez le planificateur de tâches, de nombreux processus seront lancés. .. .. J'ai donc fait la chauve-souris et la source comme suit.

bat

execute.bat


@echo off
setlocal
Faire de l'emplacement où se trouve le script rem le répertoire actuel
cd /d %~dp0

SET APP_TITLE=folder_watch
SET EXE_SCRIPT=folder_watch.py
dossier de surveillance rem
SET WATCH_PATH=.\test\from
rem copier le dossier de destination
SET COPY_PATH=.\test\to
dossier de destination de sauvegarde rem
SET BK_PATH=.\test\bk
Rem Dossier d'écriture PID
SET PID_FILD=pid

Exécuter immédiatement s'il n'y a pas de fichier pid rem
IF EXIST %PID_FILD% (GOTO FILE_TRUE) ELSE GOTO FILE_FALSE

Le fichier pid rem existe déjà
:FILE_TRUE

Obtenir le numéro pid du fichier rem
SET /p PID_VAL=<pid
indicateur de présence rem pid(true=1, false=0)
SET IS_EXIST=0
nom de l'image rem:"python.exe"Rechercher le pid avec(Seule la partie numérique)
for /F "usebackq tokens=2" %%a in (
`tasklist /fi "IMAGENAME eq python.exe" ^| findstr "[0-9]"`) do (
rem ECHO %%a
if %%a==%PID_VAL% SET IS_EXIST=1
)
rem ECHO %PID_VAL%
rem ECHO %IS_EXIST%

rem il y a un match=Ne rien faire car il fonctionne déjà
rem Il n'y a pas de correspondance=Exécution du script car il n'est pas démarré
IF %IS_EXIST%==1 (GOTO EOF) ELSE (GOTO APPT_START) 

Si le fichier pid rem n'existe pas
:FILE_FALSE
GOTO APPT_START

exécution de la surveillance du dossier rem
:APPT_START
START "%APP_TITLE%" ./python-3.8.2-embed-amd64/python.exe %EXE_SCRIPT% -w %WATCH_PATH% -cp %COPY_PATH% -bk %BK_PATH%
GOTO EOF


rem fin
:EOF
rem pause
folder_watch

folder_watch.py


    try:
        #Ajoutez ici!
        with open('pid', mode='w') as f:
            logger.info(f'pid = [{str(os.getpid())}]')
            f.write(str(os.getpid()))
        
        #Interprétation des arguments,Jugement
        args = interpret_args()
        #Vérification des arguments
        check_args(args)
        #Suivi de l'exécution
        watch_start(args.watch_path, args.copy_to_path, args.backup_path)
    except argparse.ArgumentError as e:
        logger.error(e)
    except FileNotFoundError as e:
        logger.error(e)
    except TypeError as e:
        logger.error(e)
    except Exception as e:
        logger.error(e)

Enregistrez le pid dans un fichier au démarrage côté python C'est un processus qui ne démarre pas si le contenu du fichier et le pid actuel correspondent du côté du lot. Si vous démarrez le planificateur de tâches à des intervalles d'une minute, vous devriez pouvoir y faire quelque chose! C'est donc la fin de ce temps. -Il ne prend pas en charge la copie de dossiers ・ Le traitement parallèle est meilleur -Il y a eu une Comparaison de fichiers en python sans utiliser MD5. Il y a plusieurs trous, mais j'y repenserai si j'en ai envie. Le journal est également assez approprié. .. .. Nous n'avons pas tellement confirmé l'opération, veuillez donc l'utiliser comme référence uniquement. Github peut être trouvé à ici

Où je suis resté coincé

Le point dans lequel je suis resté coincé était lorsque le chien de garde a détecté la création (déplacement / copie) d'un fichier dans un dossier. L'événement allait se dérouler lorsqu'il n'était pas encore terminé. Lorsqu'un gros fichier arrive, la copie n'est pas encore terminée J'obtiens une erreur en essayant de le déplacer. Dans un premier temps, le processus de jugement a été fait en se référant à cet article. Apparemment, dans le cas des fenêtres, cela ne fonctionne pas comme prévu et il semble que la taille du fichier soit acquise même pendant la copie. (Est-ce que ce sera le mouvement que vous attendiez pour Linux?)

Quand j'ai eu des problèmes, j'ai trouvé le [prochain article](https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size- while-copying). La réponse au processus de jugement de copie dans Windows a été écrite. Merci stackoverflow! C'est un processus qui renomme si la copie (déplacement) est en cours et détermine qu'il est toujours copié (déplacé) si une erreur se produit. Je me demande s'il y avait un tel moyen. Cependant, le goulot d'étranglement est que le coût est élevé car c'est parce qu'une exception est générée. Si quelqu'un connaît une autre méthode, apprenez-moi s'il vous plaît. <détails>

Déplacer (copier) le jugement d'achèvement du fichier de détection </ summary>

folder_watch.py


@staticmethod
    def _wait_for_file_created_finished_linux(file_path, time_out):
        """
Non confirmé pour fonctionner sous Linux
Méthode de jugement de fin de création du fichier placé
URL de référence:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
        :param file_path:
        :param time_out:
        :return:
        """
        size_now = 0
        size_past = -1
        start = time.time()
        while True:
            size_now = os.path.getsize(file_path)
            time.sleep(1)
            elapsed_time = time.time() - start
            if size_now == size_past and os.access(file_path, os.R_OK):
                return True
            else:
                size_past = os.path.getsize(file_path)
                if elapsed_time >= time_out:
                    return False

    @staticmethod
    def _wait_for_file_created_finished_windows(file_path: Path, time_out):
        """
Création du fichier placé(copie)Méthode de jugement d'achèvement
URL de référence:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
        :param file_path:
        :param time_out:
        :return:
        """
        start = time.time()
        while True:
            try:
                elapsed_time = time.time() - start
                new_path = str(file_path) + "_"
                os.rename(file_path, new_path)
                os.rename(new_path, file_path)
                time.sleep(1)
                return True
            except OSError:
                time.sleep(1)
                if elapsed_time >= time_out:
                    return False

URL / source de référence

https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying Merci pour l'URL ci-dessus.

Recommended Posts