[Python] Ordnerüberwachung mit Watchdog

Einführung

Guten Abend, dies ist mein erster Beitrag seit langer Zeit. Ich habe ein bisschen Zeit Mit einem Gedenkgottesdienst zum Öffnen der App, der vor etwa einem Jahr lief Ich werde versuchen, es noch einmal zu machen.

Dieses Mal erstellen wir eine residente Anwendung, die Ordner überwacht. Wenn die Datei einfach irgendwo in einem bestimmten Ordner abgelegt wird Es ist so einfach wie das Kopieren in einen bestimmten Ordner. Es hat einige Zeit gedauert. .. ..

Entwicklungsumgebung

Windows 10 python-3.8.2 Verwendete Bibliothek: Watchdog

Überwachung

Erstellen Sie vorerst die folgende WatchFileHandler-Klasse. Dies ist ungefähr das Ende der Überwachung. Du hast es geschafft!

Erste

folder_watch.py


#Ereigniserfassungsklasse überwachen
class WacthFileHandler(FileSystemEventHandler):
    def __init__(self, watch_path, copy_to_path, backup_path):
        super(WacthFileHandler, self).__init__()
        self.watch_path = watch_path
        self.copy_to_path = copy_to_path
        self.backup_path = backup_path

    def on_moved(self, event):
        """
Erkennung von Dateibewegungen
        :param event:
        :return:
        """
        file_path = event.src_path
        file_name = os.path.basename(file_path)

    def on_created(self, event):
        """
Erkennung der Dateierstellung
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)

    def on_modified(self, event):
        """
Erkennung von Dateiänderungen
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)

    def on_deleted(self, event):
        """
Erkennung des Löschens von Dateien
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)


def watch_start(watch_path, copy_to_path, backup_path):
    """
Der Ordnerüberwachungsprozess wurde gestartet
    :param from_watch_path  :Ordnerpfad überwachen
    :param to_copy_path     :Pfad des Zielordners
    :param backup_path      :Speichern Sie den Pfad des Zielordners
    :return
    """
    event_handler = WacthFileHandler(watch_path, copy_to_path, backup_path)
    observer = Observer()
    observer.schedule(event_handler, watch_path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(5)
    except KeyboardInterrupt:
        observer.stop()
    except Exception as e:
        observer.stop()
        raise e
    finally:
        # finaly =Letzte Verarbeitung unabhängig vom Auftreten einer Ausnahme
        observer.join()

Als nächstes werden wir das Verhalten beim Eintreffen der Datei überwachen und berücksichtigen. Diesmal nach dem Verschieben (Kopieren) der Datei Stellen Sie sicher, dass die Originaldatei übereinstimmt, und löschen Sie die Originaldatei. Wenn Sie es nicht richtig kopieren können, isolieren Sie es. Es ist zu einer Parade der statischen Methode geworden. .. ..

Überwachungsverhalten

folder_watch.py


    def on_created(self, event):
        """
Erkennung der Dateierstellung
        :param event:
        :return:
        """
        #Dateinamen abrufen
        src_name = os.path.basename(event.src_path)
        #Generieren Sie den Ordnerpfad der Überwachungsquelle
        src_path = pathlib.Path(self.watch_path) / pathlib.Path(f'{src_name}')
        #Kopieren(Bewegung)Generieren Sie den Pfad des Zielordners
        copy_path = pathlib.Path(self.copy_to_path) / pathlib.Path(f'{src_name}')
        #Generieren Sie einen Sicherungszielordnerpfad
        backup_link = pathlib.Path(self.backup_path)

        try:
            #Verarbeitung ausführen
            self._run(src_path, copy_path, backup_link)
        except TimeoutError as e:
            #Es ist zu groß!
            pass
        except Exception as e:
            pass

    def _run(self, src: Path, copy: Path, bk: Path):
        """
Kopieren, überprüfen, löschen, wenn eine Datei erkannt wird/Bewegung
        :param src:
        :param copy:
        :return:
        """
        #Warten Sie, bis die Platzierung abgeschlossen ist(Für einen bestimmten Zeitraum[600s]Warten)
        if not self._wait_for_file_created_finished_windows(file_path=src, time_out=600):
            raise TimeoutError

        #Kopieren Sie die platzierte Datei
        if not self._copy_to_file(src, copy):
            return

        #Holen Sie sich den Hash von zwei Dateien
        src_hash = self._get_md5_hash(src)
        copy_hash = self._get_md5_hash(copy)

        if self._check_hash(src_hash, copy_hash):
            #Hash-Übereinstimmungen
            #Löschen Sie die Originaldatei
            self._del_original_file(src)
        else:
            #Hash-Nichtübereinstimmung
            #Zum Speichern des Ziels verschieben
            self._move_original_file(bk)

    def _copy_to_file(self, src, copy):
        """
Kopieren Sie die platzierte Datei in den angegebenen Ordner
        :param src:
        :param copy_to:
        :return:
        """
        #Wenn keine platzierte Datei vorhanden ist, wird keine weitere Verarbeitung durchgeführt.
        if not src.exists():
            return False

        #Dateimetadaten(Erstellungszeit, Änderungszeit usw.)Kopie inklusive
        copy_link = shutil.copy2(src, copy, follow_symlinks=True)

        #Überprüfen Sie, ob der Pfad, den Sie kopieren wollten, mit dem Pfad übereinstimmt, den Sie kopiert haben
        if copy != copy_link:
            return False

        if not copy.exists():
            return False

        return True

    @staticmethod
    def _wait_for_file_created_finished_linux(file_path, time_out):
        """
Nicht bestätigt, um unter Linux zu funktionieren
Bewertungsmethode für den Abschluss der Erstellung der platzierten Datei
Referenz-URL:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
        :param file_path:
        :param time_out:
        :return:
        """
        size_now = 0
        size_past = -1
        start = time.time()
        while True:
            size_now = os.path.getsize(file_path)
            time.sleep(1)
            elapsed_time = time.time() - start
            if size_now == size_past and os.access(file_path, os.R_OK):
                return True
            else:
                size_past = os.path.getsize(file_path)
                if elapsed_time >= time_out:
                    return False

    @staticmethod
    def _wait_for_file_created_finished_windows(file_path: Path, time_out):
        """
Erstellung der platzierten Datei(Kopieren)Methode zur Beurteilung der Fertigstellung
Referenz-URL:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
        :param file_path:
        :param time_out:
        :return:
        """
        start = time.time()
        while True:
            try:
                elapsed_time = time.time() - start
                new_path = str(file_path) + "_"
                os.rename(file_path, new_path)
                os.rename(new_path, file_path)
                time.sleep(1)
                return True
            except OSError:
                time.sleep(1)
                if elapsed_time >= time_out:
                    return False

    @staticmethod
    def _get_md5_hash(file_path):
        """
Datei md5 Hashwert(Sechseckiges Format)Erhalten
        :param file_path:
        :return:
        """
        with open(file_path, 'rb') as file:
            binary_data = file.read()
            #Hash-Wert im Hexadezimalformat abrufen
            md5 = hashlib.md5(binary_data).hexdigest()
            return md5

    @staticmethod
    def _check_hash(src_hash, target_hash):
        """
Vergleichen Sie zwei Hashes
        :param src_hash:
        :param target_hash:
        :return:
        """
        return src_hash == target_hash

    @staticmethod
    def _del_original_file( src):
        """
Löschen Sie die Kopierquelldatei
        :param src:
        :return:
        """
        os.remove(src)

    @staticmethod
    def _move_original_file(src_path, move_path):
        """
Verschieben Sie die Quelldatei(Evakuierung)
        :return:
        """
        shutil.move(src_path, move_path)
Ermöglicht die Ausführung mit Laufzeitargumenten. Ich werde auch dieses Argument Urteil hinzufügen. ↓ Verarbeitung wird hinzugefügt.
Verarbeitung von Laufzeitargumenten

folder_watch.py


def interpret_args():
    """
Argumentinterpretationsmethode zur Laufzeit
    :return:Laufzeitargumente
    """
    #Objekterstellung
    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)

    #Argumenteinstellung
    #Ordnerpfad überwachen(Verpflichtend)
    parser.add_argument("-w", "--watch_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is watch folder path'''), type=str)

    #Pfad des Zielordners kopieren(Verpflichtend)
    parser.add_argument("-cp", "--copy_to_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is copy to folder path'''), type=str)

    #Speichern Sie den Pfad des Zielordners(Verpflichtend)
    parser.add_argument("-bk", "--backup_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is backup to folder path'''), type=str)

    #Geben Sie das Ergebnis zurück
    return parser.parse_args()


def check_args(args):
    """
Bewertungsmethode für Ausführungsargumente
    :param args:
    :return: True or False
    """
    #Fehler, wenn der Pfad des überwachten Ordners nicht angegeben ist
    if not hasattr(args, 'watch_path') and args.watch_path is None:
        raise argparse.ArgumentError('Ich gebe keinen Überwachungsordner an!')

    #Fehler, wenn der Pfad des Zielordners nicht angegeben ist
    if not hasattr(args, 'copy_to_path') and args.copy_to_path is None:
        raise argparse.ArgumentError('Ich gebe den Zielordner nicht an!')

    #Fehler, wenn der Pfad zum Speichern des Zielordners nicht angegeben ist
    if not hasattr(args, 'backup_path') and args.backup_path is None:
        raise argparse.ArgumentError('Ich speichere den Speicherzielordner nicht!')

    #Objekterzeugung für jeden Pfad
    watch_path = pathlib.Path(args.watch_path)
    copy_to_path = pathlib.Path(args.copy_to_path)
    backup_path = pathlib.Path(args.backup_path)

    #Überprüfen Sie, ob der Pfad des Überwachungsordners vorhanden ist
    if not watch_path.exists():
        raise FileNotFoundError('Es gibt keinen Überwachungsordner!')

    #Überprüfen Sie, ob der Überwachungsordner ein Verzeichnis ist
    if not watch_path.is_dir():
        raise TypeError('Der angegebene Überwachungsordner ist kein Ordner!')

    #Überprüfen Sie, ob der Pfad des Zielordners vorhanden ist
    if not copy_to_path.exists():
        raise FileNotFoundError('Es gibt keinen Zielordner!')

    #Überprüfen Sie, ob der Zielordner ein Verzeichnis ist
    if not copy_to_path.is_dir():
        raise TypeError('Der angegebene Zielordner ist kein Ordner!')

    #Überprüfen Sie, ob der Pfad des Zielordners vorhanden ist
    if not backup_path.exists():
        raise FileNotFoundError('Es gibt keinen Speicherzielordner!')

    #Überprüfen Sie, ob der Zielordner ein Verzeichnis ist
    if not backup_path.is_dir():
        raise TypeError('Der angegebene Speicherzielordner ist kein Ordner!')


#Ausführungsverarbeitung
if __name__ == '__main__':
    try:
        #Argumentinterpretation,Beurteilung
        args = interpret_args()
        #Argumentprüfung
        check_args(args)
        #Überwachung der Ausführung
        watch_start(args.watch_path, args.copy_to_path)
    except argparse.ArgumentError as e:
        pass
    except FileNotFoundError as e:
        pass
    except TypeError as e:
        pass
    except Exception as e:
        pass

Bei dieser Geschwindigkeit weiß ich nicht, was ich bewegt habe oder was gekommen ist, also werde ich ein Protokoll erstellen. Die Quelle ist vorerst fast vollständig.

Quelle fast Volltext

folder_watch.py


# -*- coding: utf-8 -*-
import os
import time
import sys
import logging
import hashlib
import argparse
import textwrap
import pathlib
from pathlib import Path
import shutil
from datetime import datetime
from watchdog.observers import Observer
from logging.handlers import TimedRotatingFileHandler
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

try:
    import codecs
except ImportError:
    codecs = None


class MyTimedRotatingFileHandler(logging.handlers.TimedRotatingFileHandler):
    """
Dateihandler für die Ausgabeklasse des Datumsprotokolls
    """
    def __init__(self, dir_log):
        self.dir_log = dir_log
        filename = self.dir_log + time.strftime("%Y%m%d") + ".log"  # dir_log here MUST be with os.sep on the end
        logging.handlers.TimedRotatingFileHandler.__init__(self, filename, when='midnight', interval=1, backupCount=0,
                                                           encoding=None)

    def doRollover(self):
        """
        TimedRotatingFileHandler remix - rotates logs on daily basis, and filename of current logfile is time.strftime("%m%d%Y")+".txt" always
        """
        self.stream.close()
        # get the time that this sequence started at and make it a TimeTuple
        t = self.rolloverAt - self.interval
        timeTuple = time.localtime(t)
        self.baseFilename = self.dir_log + time.strftime("%Y%m%d") + ".log"
        if self.encoding:
            self.stream = codecs.open(self.baseFilename, 'w', self.encoding)
        else:
            self.stream = open(self.baseFilename, 'w')
        self.rolloverAt = self.rolloverAt + self.interval


#Ereigniserfassungsklasse überwachen
class WacthFileHandler(FileSystemEventHandler):
    def __init__(self, watch_path, copy_to_path, backup_path):
        super(WacthFileHandler, self).__init__()
        self.watch_path = watch_path
        self.copy_to_path = copy_to_path
        self.backup_path = backup_path

    def on_moved(self, event):
        """
Erkennung von Dateibewegungen
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)
        logger.info(f'{src_name}Ist umgezogen')

    def on_created(self, event):
        """
Erkennung der Dateierstellung
        :param event:
        :return:
        """
        #Dateinamen abrufen
        src_name = os.path.basename(event.src_path)
        logger.info(f'{src_name}Geschah')
        #Generieren Sie den Ordnerpfad der Überwachungsquelle
        src_path = pathlib.Path(self.watch_path) / pathlib.Path(f'{src_name}')
        #Kopieren(Bewegung)Generieren Sie den Pfad des Zielordners
        copy_path = pathlib.Path(self.copy_to_path) / pathlib.Path(f'{src_name}')
        #Generieren Sie einen Sicherungszielordnerpfad
        backup_link = pathlib.Path(self.backup_path)

        try:
            #Verarbeitung ausführen
            self._run(src_path, copy_path, backup_link)
        except TimeoutError as e:
            #Es ist zu groß!
            logger.error('Es ist zu groß!')
            logger.error(e)
        except Exception as e:
            logger.error(e)

    def on_modified(self, event):
        """
Erkennung von Dateiänderungen
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)
        logger.info(f'{src_name}Geändert')

    def on_deleted(self, event):
        """
Erkennung des Löschens von Dateien
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)
        logger.info(f'{src_name}Gelöschte s')

    def _run(self, src: Path, copy: Path, bk: Path):
        """
Kopieren, überprüfen, löschen, wenn eine Datei erkannt wird/Bewegung
        :param src:
        :param copy:
        :return:
        """
        #Warten Sie, bis die Platzierung abgeschlossen ist(Für einen bestimmten Zeitraum[600s]Warten)
        if not self._wait_for_file_created_finished_windows(file_path=src, time_out=600):
            raise TimeoutError

        #Kopieren Sie die platzierte Datei
        if not self._copy_to_file(src, copy):
            return

        #Holen Sie sich den Hash von zwei Dateien
        src_hash = self._get_md5_hash(src)
        copy_hash = self._get_md5_hash(copy)

        if self._check_hash(src_hash, copy_hash):
            #Hash-Übereinstimmungen
            #Löschen Sie die Originaldatei
            self._del_original_file(src)
        else:
            #Hash-Nichtübereinstimmung
            #Zum Speichern des Ziels verschieben
            self._move_original_file(bk)

    def _copy_to_file(self, src, copy):
        """
Kopieren Sie die platzierte Datei in den angegebenen Ordner
        :param src:
        :param copy_to:
        :return:
        """
        #Wenn keine platzierte Datei vorhanden ist, wird keine weitere Verarbeitung durchgeführt.
        if not src.exists():
            return False

        #Dateimetadaten(Erstellungszeit, Änderungszeit usw.)Kopie inklusive
        copy_link = shutil.copy2(src, copy, follow_symlinks=True)

        #Überprüfen Sie, ob der Pfad, den Sie kopieren wollten, mit dem Pfad übereinstimmt, den Sie kopiert haben
        if copy != copy_link:
            return False

        if not copy.exists():
            return False

        return True

    @staticmethod
    def _wait_for_file_created_finished_linux(file_path, time_out):
        """
Nicht bestätigt, um unter Linux zu funktionieren
Bewertungsmethode für den Abschluss der Erstellung der platzierten Datei
Referenz-URL:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
        :param file_path:
        :param time_out:
        :return:
        """
        size_now = 0
        size_past = -1
        start = time.time()
        while True:
            size_now = os.path.getsize(file_path)
            time.sleep(1)
            elapsed_time = time.time() - start
            logger.info(f"size_now: {size_now}")
            logger.info(f"size_past: {size_past}")
            if size_now == size_past and os.access(file_path, os.R_OK):
                logger.info("file has copied completely now size: %s", size_now)
                return True
            else:
                size_past = os.path.getsize(file_path)
                if elapsed_time >= time_out:
                    logger.info('time out error')
                    return False

    @staticmethod
    def _wait_for_file_created_finished_windows(file_path: Path, time_out):
        """
Erstellung der platzierten Datei(Kopieren)Methode zur Beurteilung der Fertigstellung
Referenz-URL:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
        :param file_path:
        :param time_out:
        :return:
        """
        start = time.time()
        while True:
            try:
                elapsed_time = time.time() - start
                new_path = str(file_path) + "_"
                os.rename(file_path, new_path)
                os.rename(new_path, file_path)
                time.sleep(1)
                logger.info('file copy...')
                return True
            except OSError:
                time.sleep(1)
                if elapsed_time >= time_out:
                    logger.info('time out error')
                    return False

    @staticmethod
    def _get_md5_hash(file_path):
        """
Datei md5 Hashwert(Sechseckiges Format)Erhalten
        :param file_path:
        :return:
        """
        with open(file_path, 'rb') as file:
            binary_data = file.read()
            #Hash-Wert im Hexadezimalformat abrufen
            md5 = hashlib.md5(binary_data).hexdigest()
            logger.info(f'Datei:{file_path} -Hashwert- {md5}')
            return md5

    @staticmethod
    def _check_hash(src_hash, target_hash):
        """
Vergleichen Sie zwei Hashes
        :param src_hash:
        :param target_hash:
        :return:
        """
        return src_hash == target_hash

    @staticmethod
    def _del_original_file( src):
        """
Löschen Sie die Kopierquelldatei
        :param src:
        :return:
        """
        os.remove(src)

    @staticmethod
    def _move_original_file(src_path, move_path):
        """
Verschieben Sie die Quelldatei(Evakuierung)
        :return:
        """
        shutil.move(src_path, move_path)


def watch_start(from_watch_path, to_copy_path, backup_path):
    """
Der Ordnerüberwachungsprozess wurde gestartet
    :param from_watch_path  :Ordnerpfad überwachen
    :param to_copy_path     :Pfad des Zielordners
    :param backup_path      :Speichern Sie den Pfad des Zielordners
    :return:
    """
    event_handler = WacthFileHandler(from_watch_path, to_copy_path, backup_path)
    observer = Observer()
    observer.schedule(event_handler, from_watch_path, recursive=True)
    logger.info(f'Ordnerüberwachung starten')
    observer.start()
    try:
        while True:
            time.sleep(5)
    except KeyboardInterrupt:
        observer.stop()
    except Exception as e:
        observer.stop()
        raise e
    finally:
        # finaly =Letzte Verarbeitung unabhängig vom Auftreten einer Ausnahme
        logger.info(f'Ordnerüberwachung beendet')
        observer.join()


def make_log_folder():
    """
Erstellen, wenn beim Start kein Protokollordner vorhanden ist
    :return:
    """
    p = pathlib.Path(sys.argv[0])
    p2 = pathlib.Path(p.parent) / pathlib.Path('logs')
    if not p2.exists():
        os.makedirs(str(p2))


def interpret_args():
    """
Argumentinterpretationsmethode zur Laufzeit
    :return:Laufzeitargumente
    """
    #Objekterstellung
    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)

    #Argumenteinstellung
    #Ordnerpfad überwachen(Verpflichtend)
    parser.add_argument("-wp", "--watch_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is watch folder path'''), type=str)

    #Pfad des Zielordners kopieren(Verpflichtend)
    parser.add_argument("-cp", "--copy_to_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is copy to folder path'''), type=str)

    #Speichern Sie den Pfad des Zielordners(Verpflichtend)
    parser.add_argument("-bk", "--backup_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is backup folder path'''), type=str)

    #Geben Sie das Ergebnis zurück
    return parser.parse_args()


def check_args(args):
    """
Bewertungsmethode für Ausführungsargumente
    :param args:
    :return: True or False
    """
    #Fehler, wenn der Pfad des überwachten Ordners nicht angegeben ist
    if not hasattr(args, 'watch_path') and args.watch_path is None:
        raise argparse.ArgumentError('Ich gebe keinen Überwachungsordner an!')

    #Fehler, wenn der Pfad des Zielordners nicht angegeben ist
    if not hasattr(args, 'copy_to_path') and args.copy_to_path is None:
        raise argparse.ArgumentError('Ich gebe den Zielordner nicht an!')

    #Fehler, wenn der Pfad zum Speichern des Zielordners nicht angegeben ist
    if not hasattr(args, 'backup_path') and args.backup_path is None:
        raise argparse.ArgumentError('Ich speichere den Speicherzielordner nicht!')

    #Objekterzeugung für jeden Pfad
    watch_path = pathlib.Path(args.watch_path)
    copy_to_path = pathlib.Path(args.copy_to_path)
    backup_path = pathlib.Path(args.backup_path)

    #Überprüfen Sie, ob der Pfad des Überwachungsordners vorhanden ist
    if not watch_path.exists():
        raise FileNotFoundError('Es gibt keinen Überwachungsordner!')

    #Überprüfen Sie, ob der Überwachungsordner ein Verzeichnis ist
    if not watch_path.is_dir():
        raise TypeError('Der angegebene Überwachungsordner ist kein Ordner!')

    #Überprüfen Sie, ob der Pfad des Zielordners vorhanden ist
    if not copy_to_path.exists():
        raise FileNotFoundError('Es gibt keinen Zielordner!')

    #Überprüfen Sie, ob der Zielordner ein Verzeichnis ist
    if not copy_to_path.is_dir():
        raise TypeError('Der angegebene Zielordner ist kein Ordner!')

    #Überprüfen Sie, ob der Pfad des Zielordners vorhanden ist
    if not backup_path.exists():
        raise FileNotFoundError('Es gibt keinen Speicherzielordner!')

    #Überprüfen Sie, ob der Zielordner ein Verzeichnis ist
    if not backup_path.is_dir():
        raise TypeError('Der angegebene Speicherzielordner ist kein Ordner!')


#Ausführungsverarbeitung
if __name__ == '__main__':

    #Erstellen, wenn kein Protokollordner vorhanden ist(* Bei der Protokollierung werden keine Ordner erstellt.)
    make_log_folder()

    #Protokollierungseinstellungen
    # get the root logger
    root_logger = logging.getLogger()
    # set overall level to debug, default is warning for root logger
    root_logger.setLevel(logging.DEBUG)

    # setup logging to file, rotating at midnight
    file_log = MyTimedRotatingFileHandler(f'./logs/log_')
    file_log.setLevel(logging.DEBUG)
    file_formatter = logging.Formatter('■%(asctime)s - %(levelname)s - [%(funcName)s() %(lineno)Zeile d] : %(message)s',
                                       datefmt='%Y-%m-%d %H:%M:%S')
    file_log.setFormatter(file_formatter)
    root_logger.addHandler(file_log)

    # setup logging to console
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    formatter = logging.Formatter('■%(asctime)s - %(levelname)s - [%(funcName)s() %(lineno)Zeile d] : %(message)s',
                                  datefmt='%Y-%m-%d %H:%M:%S')
    console.setFormatter(formatter)
    root_logger.addHandler(console)

    # get a logger for my script
    logger = logging.getLogger(__name__)

    try:
        #Argumentinterpretation,Beurteilung
        args = interpret_args()
        #Argumentprüfung
        check_args(args)
        #Überwachung der Ausführung
        watch_start(args.watch_path, args.copy_to_path, args.backup_path)
    except argparse.ArgumentError as e:
        logger.error(e)
    except FileNotFoundError as e:
        logger.error(e)
    except TypeError as e:
        logger.error(e)
    except Exception as e:
        logger.error(e)

Es ist schwierig auszuführen, wie es ist Erstellen Sie wie gewohnt eine Fledermaus wie ↓.

bat

execute.bat


@echo off
setlocal
Machen Sie den Ort, an dem sich das Rem-Skript befindet, zum aktuellen Verzeichnis
cd /d %~dp0

SET APP_TITLE=folder_watch
SET EXE_SCRIPT=folder_watch.py
rem watch Ordner
SET WATCH_PATH=.\test\from
rem Kopieren Sie den Zielordner
SET COPY_PATH=.\test\to
rem Backup-Zielordner
SET BK_PATH=.\test\bk

Rem Ordner Überwachung Ausführung
START "%APP_TITLE%" ./python-3.8.2-embed-amd64/python.exe %EXE_SCRIPT% -wp %WATCH_PATH% -cp %COPY_PATH% -bk %BK_PATH%

Ich frage mich, ob dies das Ende ist, wenn ich es entsprechend in den Taskplaner stelle Ich habe mich gefragt, aber es scheint der Fall zu sein. Diese App ist resident und ich möchte, dass Sie sie sofort starten, auch wenn sie fällt. Wenn Sie den Taskplaner verwenden, werden auch viele Prozesse gestartet. .. .. Also habe ich die Fledermaus und die Quelle wie folgt gemacht.

bat

execute.bat


@echo off
setlocal
Machen Sie den Ort, an dem sich das Rem-Skript befindet, zum aktuellen Verzeichnis
cd /d %~dp0

SET APP_TITLE=folder_watch
SET EXE_SCRIPT=folder_watch.py
rem watch Ordner
SET WATCH_PATH=.\test\from
rem Kopieren Sie den Zielordner
SET COPY_PATH=.\test\to
rem Backup-Zielordner
SET BK_PATH=.\test\bk
rem PID-Schreibordner
SET PID_FILD=pid

Sofort ausführen, wenn keine Remid-Datei vorhanden ist
IF EXIST %PID_FILD% (GOTO FILE_TRUE) ELSE GOTO FILE_FALSE

rem pid Datei existiert bereits
:FILE_TRUE

Holen Sie sich die PID-Nummer aus der Rem-Datei
SET /p PID_VAL=<pid
rem pid Anwesenheitsflagge(true=1, false=0)
SET IS_EXIST=0
rem Bildname:"python.exe"Suche nach der PID mit(Nur der Nummernteil)
for /F "usebackq tokens=2" %%a in (
`tasklist /fi "IMAGENAME eq python.exe" ^| findstr "[0-9]"`) do (
rem ECHO %%a
if %%a==%PID_VAL% SET IS_EXIST=1
)
rem ECHO %PID_VAL%
rem ECHO %IS_EXIST%

Rem gibt es eine Übereinstimmung=Tun Sie nichts, weil es bereits läuft
rem Es gibt keine Übereinstimmung=Skriptausführung, da es nicht gestartet wird
IF %IS_EXIST%==1 (GOTO EOF) ELSE (GOTO APPT_START) 

Wenn die Rem-PID-Datei nicht vorhanden ist
:FILE_FALSE
GOTO APPT_START

Rem Ordner Überwachung Ausführung
:APPT_START
START "%APP_TITLE%" ./python-3.8.2-embed-amd64/python.exe %EXE_SCRIPT% -w %WATCH_PATH% -cp %COPY_PATH% -bk %BK_PATH%
GOTO EOF


rem end
:EOF
rem pause
folder_watch

folder_watch.py


    try:
        #Hier hinzufügen!
        with open('pid', mode='w') as f:
            logger.info(f'pid = [{str(os.getpid())}]')
            f.write(str(os.getpid()))
        
        #Argumentinterpretation,Beurteilung
        args = interpret_args()
        #Argumentprüfung
        check_args(args)
        #Überwachung der Ausführung
        watch_start(args.watch_path, args.copy_to_path, args.backup_path)
    except argparse.ArgumentError as e:
        logger.error(e)
    except FileNotFoundError as e:
        logger.error(e)
    except TypeError as e:
        logger.error(e)
    except Exception as e:
        logger.error(e)

Speichern Sie die PID beim Start in einer Datei auf der Python-Seite Es ist ein Prozess, der nicht gestartet wird, wenn der Dateiinhalt und die aktuelle PID auf der Stapelseite übereinstimmen. Wenn Sie den Taskplaner in Intervallen von 1 Minute starten, sollten Sie in der Lage sein, etwas dagegen zu tun! Das ist also das Ende dieser Zeit. -Das Kopieren von Ordnern wird nicht unterstützt ・ Ist Parallelverarbeitung nicht besser? -Es gab einen Dateivergleich in Python ohne Verwendung von MD5. Es gibt verschiedene Löcher, aber ich werde noch einmal darüber nachdenken, wenn ich Lust dazu habe. Das Protokoll ist auch ziemlich angemessen. .. .. Wir haben den Vorgang nicht so sehr bestätigt, verwenden Sie ihn daher bitte nur als Referenz. Github finden Sie unter hier

Wo ich feststeckte

Der Punkt, an dem ich feststeckte, war, als Watchdog die Erstellung (Verschieben / Kopieren) einer Datei in einem Ordner entdeckte. Die Veranstaltung sollte laufen, wenn sie noch nicht abgeschlossen war. Wenn eine große Datei eintrifft, ist die Kopie noch nicht abgeschlossen Beim Versuch, es zu verschieben, wird eine Fehlermeldung angezeigt. Zunächst wurde der Beurteilungsprozess unter Bezugnahme auf diesen Artikel durchgeführt. Anscheinend funktioniert es bei Windows nicht wie erwartet und es scheint, dass die Größe der Datei auch während des Kopierens erfasst wird. (Wird es der Schritt sein, den Sie für Linux erwartet haben?)

Als ich in Schwierigkeiten war, fand ich den nächsten Artikel. Die Antwort auf den Prozess der Kopierbeurteilung in Windows wurde geschrieben. Vielen Dank, dass Sie stackoverflow! Es ist ein Prozess, der umbenennt, ob gerade kopiert (verschoben) wird, und feststellt, dass es im Fehlerfall noch kopiert (verschoben) wird. Ich frage mich, ob es so einen gab. Der Engpass besteht jedoch darin, dass die Kosten hoch sind, da eine Ausnahme generiert wird. Wenn jemand eine andere Methode kennt, lehre mich bitte.

Verschieben (Kopieren) der Beurteilung der Erkennungsdatei

folder_watch.py


@staticmethod
    def _wait_for_file_created_finished_linux(file_path, time_out):
        """
Nicht bestätigt, um unter Linux zu funktionieren
Bewertungsmethode für den Abschluss der Erstellung der platzierten Datei
Referenz-URL:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
        :param file_path:
        :param time_out:
        :return:
        """
        size_now = 0
        size_past = -1
        start = time.time()
        while True:
            size_now = os.path.getsize(file_path)
            time.sleep(1)
            elapsed_time = time.time() - start
            if size_now == size_past and os.access(file_path, os.R_OK):
                return True
            else:
                size_past = os.path.getsize(file_path)
                if elapsed_time >= time_out:
                    return False

    @staticmethod
    def _wait_for_file_created_finished_windows(file_path: Path, time_out):
        """
Erstellung der platzierten Datei(Kopieren)Methode zur Beurteilung der Fertigstellung
Referenz-URL:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
        :param file_path:
        :param time_out:
        :return:
        """
        start = time.time()
        while True:
            try:
                elapsed_time = time.time() - start
                new_path = str(file_path) + "_"
                os.rename(file_path, new_path)
                os.rename(new_path, file_path)
                time.sleep(1)
                return True
            except OSError:
                time.sleep(1)
                if elapsed_time >= time_out:
                    return False

Referenz-URL / Quelle

https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying Danke für die obige URL.

Recommended Posts

[Python] Ordnerüberwachung mit Watchdog
Einfache Ordnersynchronisation mit Python
FizzBuzz in Python3
Scraping mit Python
Statistik mit Python
Geräteüberwachung durch On-Box Python von IOS-XE
Scraping mit Python
Python mit Go
Twilio mit Python
In Python integrieren
Spielen Sie mit 2016-Python
AES256 mit Python
Getestet mit Python
Python beginnt mit ()
mit Syntax (Python)
Verkehrsüberwachung mit Kibana, ElasticSearch und Python
Bingo mit Python
Zundokokiyoshi mit Python
Erstellen Sie eine GIF-Animation mit Ordnerüberwachung
Organisieren Sie mit Python nach Ordnern getrennte Daten
Mit Monitoring gewinnen
Excel mit Python
Mikrocomputer mit Python
Mit Python besetzen
[Python] Holen Sie sich die Dateien mit Python in den Ordner
Serielle Kommunikation mit Python
Zip, entpacken mit Python
Django 1.11 wurde mit Python3.6 gestartet
Primzahlbeurteilung mit Python
Python mit Eclipse + PyDev.
Socket-Kommunikation mit Python
Datenanalyse mit Python 2
Scraping in Python (Vorbereitung)
Python lernen mit ChemTHEATER 03
Sequentielle Suche mit Python
Führen Sie Python mit VBA aus
Umgang mit Yaml mit Python
Löse AtCoder 167 mit Python
Serielle Kommunikation mit Python
[Python] Verwenden Sie JSON mit Python
Python lernen mit ChemTHEATER 05-1
Lerne Python mit ChemTHEATER
Führen Sie prepDE.py mit python3 aus
1.1 Erste Schritte mit Python
Tweets mit Python sammeln
Binarisierung mit OpenCV / Python
3. 3. KI-Programmierung mit Python
Kernel-Methode mit Python
Nicht blockierend mit Python + uWSGI
Scraping mit Python + PhantomJS
Tweets mit Python posten
Verwenden Sie Mecab mit Python 3
[Python] Mit CGIHTTPServer umleiten
Kinesis mit Python betreiben
Erste Schritte mit Python
Verwenden Sie DynamoDB mit Python
Zundko Getter mit Python
Behandle Excel mit Python
Ohmsches Gesetz mit Python
Primzahlbeurteilung mit Python
Führen Sie Blender mit Python aus