[Python] Folder monitoring with watchdog

Introduction

Good evening, this is my first post in a long time. I have a little time With a memorial service for opening the app that flowed about a year ago I will try to make it again.

This time we will make a resident application that monitors folders. If the file is simply placed in a specified folder somewhere It's as simple as copying to a specified folder. It took some time. .. ..

Development environment

Windows 10 python-3.8.2 Library used: watchdog

Surveillance

For the time being, create the following WatchFileHandler class. This is about the end of monitoring. You did it!

First

folder_watch.py


#Monitoring event acquisition class
class WacthFileHandler(FileSystemEventHandler):
    def __init__(self, watch_path, copy_to_path, backup_path):
        super(WacthFileHandler, self).__init__()
        self.watch_path = watch_path
        self.copy_to_path = copy_to_path
        self.backup_path = backup_path

    def on_moved(self, event):
        """
File movement detection
        :param event:
        :return:
        """
        file_path = event.src_path
        file_name = os.path.basename(file_path)

    def on_created(self, event):
        """
File creation detection
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)

    def on_modified(self, event):
        """
File change detection
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)

    def on_deleted(self, event):
        """
File deletion detection
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)


def watch_start(watch_path, copy_to_path, backup_path):
    """
Folder monitoring process started
    :param from_watch_path  :Monitoring folder path
    :param to_copy_path     :Destination folder path
    :param backup_path      :Evacuation destination folder path
    :return
    """
    event_handler = WacthFileHandler(watch_path, copy_to_path, backup_path)
    observer = Observer()
    observer.schedule(event_handler, watch_path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(5)
    except KeyboardInterrupt:
        observer.stop()
    except Exception as e:
        observer.stop()
        raise e
    finally:
        # finaly =Last processing regardless of the occurrence of an exception
        observer.join()

Next, we will monitor and consider the behavior when the file arrives. This time after moving (copying) the file Make sure that the original file matches properly and delete the original file. If you cannot copy it properly, isolate it. It has become an on-parade of static method. .. ..

Monitoring behavior

folder_watch.py


    def on_created(self, event):
        """
File creation detection
        :param event:
        :return:
        """
        #Get file name
        src_name = os.path.basename(event.src_path)
        #Generate the folder path of the monitoring source
        src_path = pathlib.Path(self.watch_path) / pathlib.Path(f'{src_name}')
        #copy(Move)Generate destination folder path
        copy_path = pathlib.Path(self.copy_to_path) / pathlib.Path(f'{src_name}')
        #Generate backup destination folder path
        backup_link = pathlib.Path(self.backup_path)

        try:
            #Execute processing
            self._run(src_path, copy_path, backup_link)
        except TimeoutError as e:
            #It's too big!
            pass
        except Exception as e:
            pass

    def _run(self, src: Path, copy: Path, bk: Path):
        """
Copy, check, delete when file is detected/Move
        :param src:
        :param copy:
        :return:
        """
        #Wait for placement to complete(For a certain period of time[600s]Wait)
        if not self._wait_for_file_created_finished_windows(file_path=src, time_out=600):
            raise TimeoutError

        #Copy the placed file
        if not self._copy_to_file(src, copy):
            return

        #Get the hashes of two files
        src_hash = self._get_md5_hash(src)
        copy_hash = self._get_md5_hash(copy)

        if self._check_hash(src_hash, copy_hash):
            #Hash match
            #Delete the original file
            self._del_original_file(src)
        else:
            #Hash mismatch
            #Move to evacuation destination
            self._move_original_file(bk)

    def _copy_to_file(self, src, copy):
        """
Copy the placed file to the specified folder
        :param src:
        :param copy_to:
        :return:
        """
        #If there is no placed file, no further processing will be performed.
        if not src.exists():
            return False

        #File metadata(Creation time, change time, etc.)Copy including
        copy_link = shutil.copy2(src, copy, follow_symlinks=True)

        #Check that the path you were trying to copy matches the path you copied
        if copy != copy_link:
            return False

        if not copy.exists():
            return False

        return True

    @staticmethod
    def _wait_for_file_created_finished_linux(file_path, time_out):
        """
Not confirmed to work on Linux
Creation completion judgment method of the placed file
Reference URL:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
        :param file_path:
        :param time_out:
        :return:
        """
        size_now = 0
        size_past = -1
        start = time.time()
        while True:
            size_now = os.path.getsize(file_path)
            time.sleep(1)
            elapsed_time = time.time() - start
            if size_now == size_past and os.access(file_path, os.R_OK):
                return True
            else:
                size_past = os.path.getsize(file_path)
                if elapsed_time >= time_out:
                    return False

    @staticmethod
    def _wait_for_file_created_finished_windows(file_path: Path, time_out):
        """
Creating a placed file(copy)Completion determination method
Reference URL:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
        :param file_path:
        :param time_out:
        :return:
        """
        start = time.time()
        while True:
            try:
                elapsed_time = time.time() - start
                new_path = str(file_path) + "_"
                os.rename(file_path, new_path)
                os.rename(new_path, file_path)
                time.sleep(1)
                return True
            except OSError:
                time.sleep(1)
                if elapsed_time >= time_out:
                    return False

    @staticmethod
    def _get_md5_hash(file_path):
        """
File md5 hash value(Hexagonal format)Get
        :param file_path:
        :return:
        """
        with open(file_path, 'rb') as file:
            binary_data = file.read()
            #Get hash value in hexadecimal format
            md5 = hashlib.md5(binary_data).hexdigest()
            return md5

    @staticmethod
    def _check_hash(src_hash, target_hash):
        """
Compare two hashes
        :param src_hash:
        :param target_hash:
        :return:
        """
        return src_hash == target_hash

    @staticmethod
    def _del_original_file( src):
        """
Delete the copy source file
        :param src:
        :return:
        """
        os.remove(src)

    @staticmethod
    def _move_original_file(src_path, move_path):
        """
Move copy source file(Evacuation)
        :return:
        """
        shutil.move(src_path, move_path)
Allows it to be run with run-time arguments. I will also add the argument judgment. ↓ processing is added.
Runtime argument processing

folder_watch.py


def interpret_args():
    """
Run-time argument interpretation method
    :return:Runtime arguments
    """
    #Object creation
    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)

    #Argument setting
    #Monitoring folder path(Mandatory)
    parser.add_argument("-w", "--watch_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is watch folder path'''), type=str)

    #Copy destination folder path(Mandatory)
    parser.add_argument("-cp", "--copy_to_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is copy to folder path'''), type=str)

    #Evacuation destination folder path(Mandatory)
    parser.add_argument("-bk", "--backup_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is backup to folder path'''), type=str)

    #Return the result
    return parser.parse_args()


def check_args(args):
    """
Execution argument judgment method
    :param args:
    :return: True or False
    """
    #Error if the path of the monitored folder is not specified
    if not hasattr(args, 'watch_path') and args.watch_path is None:
        raise argparse.ArgumentError('I don't specify a monitoring folder!')

    #Error if the path of the destination folder is not specified
    if not hasattr(args, 'copy_to_path') and args.copy_to_path is None:
        raise argparse.ArgumentError('I don't specify the destination folder!')

    #Error if the save destination folder path is not specified
    if not hasattr(args, 'backup_path') and args.backup_path is None:
        raise argparse.ArgumentError('I don't specify the save destination folder!')

    #Object generation for each path
    watch_path = pathlib.Path(args.watch_path)
    copy_to_path = pathlib.Path(args.copy_to_path)
    backup_path = pathlib.Path(args.backup_path)

    #Check if the path of the monitoring folder exists
    if not watch_path.exists():
        raise FileNotFoundError('There is no monitoring folder!')

    #Check if the watch folder is a directory
    if not watch_path.is_dir():
        raise TypeError('The specified monitoring folder is not a folder!')

    #Check if the path of the destination folder exists
    if not copy_to_path.exists():
        raise FileNotFoundError('There is no destination folder!')

    #Check if the destination folder is a directory
    if not copy_to_path.is_dir():
        raise TypeError('The specified destination folder is not a folder!')

    #Check if the path of the destination folder exists
    if not backup_path.exists():
        raise FileNotFoundError('There is no save destination folder!')

    #Check if the destination folder is a directory
    if not backup_path.is_dir():
        raise TypeError('The specified save destination folder is not a folder!')


#Execution processing
if __name__ == '__main__':
    try:
        #Argument interpretation,Judgment
        args = interpret_args()
        #Argument check
        check_args(args)
        #Monitoring execution
        watch_start(args.watch_path, args.copy_to_path)
    except argparse.ArgumentError as e:
        pass
    except FileNotFoundError as e:
        pass
    except TypeError as e:
        pass
    except Exception as e:
        pass

At this rate, I don't know what I moved or what came, so I will prepare a log. For the time being, the sauce is almost complete.

Source almost full text

folder_watch.py


# -*- coding: utf-8 -*-
import os
import time
import sys
import logging
import hashlib
import argparse
import textwrap
import pathlib
from pathlib import Path
import shutil
from datetime import datetime
from watchdog.observers import Observer
from logging.handlers import TimedRotatingFileHandler
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

try:
    import codecs
except ImportError:
    codecs = None


class MyTimedRotatingFileHandler(logging.handlers.TimedRotatingFileHandler):
    """
File handler for date log output-class
    """
    def __init__(self, dir_log):
        self.dir_log = dir_log
        filename = self.dir_log + time.strftime("%Y%m%d") + ".log"  # dir_log here MUST be with os.sep on the end
        logging.handlers.TimedRotatingFileHandler.__init__(self, filename, when='midnight', interval=1, backupCount=0,
                                                           encoding=None)

    def doRollover(self):
        """
        TimedRotatingFileHandler remix - rotates logs on daily basis, and filename of current logfile is time.strftime("%m%d%Y")+".txt" always
        """
        self.stream.close()
        # get the time that this sequence started at and make it a TimeTuple
        t = self.rolloverAt - self.interval
        timeTuple = time.localtime(t)
        self.baseFilename = self.dir_log + time.strftime("%Y%m%d") + ".log"
        if self.encoding:
            self.stream = codecs.open(self.baseFilename, 'w', self.encoding)
        else:
            self.stream = open(self.baseFilename, 'w')
        self.rolloverAt = self.rolloverAt + self.interval


#Monitoring event acquisition class
class WacthFileHandler(FileSystemEventHandler):
    def __init__(self, watch_path, copy_to_path, backup_path):
        super(WacthFileHandler, self).__init__()
        self.watch_path = watch_path
        self.copy_to_path = copy_to_path
        self.backup_path = backup_path

    def on_moved(self, event):
        """
File movement detection
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)
        logger.info(f'{src_name}Has moved')

    def on_created(self, event):
        """
File creation detection
        :param event:
        :return:
        """
        #Get file name
        src_name = os.path.basename(event.src_path)
        logger.info(f'{src_name}Was done')
        #Generate the folder path of the monitoring source
        src_path = pathlib.Path(self.watch_path) / pathlib.Path(f'{src_name}')
        #copy(Move)Generate destination folder path
        copy_path = pathlib.Path(self.copy_to_path) / pathlib.Path(f'{src_name}')
        #Generate backup destination folder path
        backup_link = pathlib.Path(self.backup_path)

        try:
            #Execute processing
            self._run(src_path, copy_path, backup_link)
        except TimeoutError as e:
            #It's too big!
            logger.error('It's too big!')
            logger.error(e)
        except Exception as e:
            logger.error(e)

    def on_modified(self, event):
        """
File change detection
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)
        logger.info(f'{src_name}Changed')

    def on_deleted(self, event):
        """
File deletion detection
        :param event:
        :return:
        """
        src_path = event.src_path
        src_name = os.path.basename(src_path)
        logger.info(f'{src_name}Deleted s')

    def _run(self, src: Path, copy: Path, bk: Path):
        """
Copy, check, delete when file is detected/Move
        :param src:
        :param copy:
        :return:
        """
        #Wait for placement to complete(For a certain period of time[600s]Wait)
        if not self._wait_for_file_created_finished_windows(file_path=src, time_out=600):
            raise TimeoutError

        #Copy the placed file
        if not self._copy_to_file(src, copy):
            return

        #Get the hashes of two files
        src_hash = self._get_md5_hash(src)
        copy_hash = self._get_md5_hash(copy)

        if self._check_hash(src_hash, copy_hash):
            #Hash match
            #Delete the original file
            self._del_original_file(src)
        else:
            #Hash mismatch
            #Move to evacuation destination
            self._move_original_file(bk)

    def _copy_to_file(self, src, copy):
        """
Copy the placed file to the specified folder
        :param src:
        :param copy_to:
        :return:
        """
        #If there is no placed file, no further processing will be performed.
        if not src.exists():
            return False

        #File metadata(Creation time, change time, etc.)Copy including
        copy_link = shutil.copy2(src, copy, follow_symlinks=True)

        #Check that the path you were trying to copy matches the path you copied
        if copy != copy_link:
            return False

        if not copy.exists():
            return False

        return True

    @staticmethod
    def _wait_for_file_created_finished_linux(file_path, time_out):
        """
Not confirmed to work on Linux
Creation completion judgment method of the placed file
Reference URL:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
        :param file_path:
        :param time_out:
        :return:
        """
        size_now = 0
        size_past = -1
        start = time.time()
        while True:
            size_now = os.path.getsize(file_path)
            time.sleep(1)
            elapsed_time = time.time() - start
            logger.info(f"size_now: {size_now}")
            logger.info(f"size_past: {size_past}")
            if size_now == size_past and os.access(file_path, os.R_OK):
                logger.info("file has copied completely now size: %s", size_now)
                return True
            else:
                size_past = os.path.getsize(file_path)
                if elapsed_time >= time_out:
                    logger.info('time out error')
                    return False

    @staticmethod
    def _wait_for_file_created_finished_windows(file_path: Path, time_out):
        """
Creating a placed file(copy)Completion determination method
Reference URL:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
        :param file_path:
        :param time_out:
        :return:
        """
        start = time.time()
        while True:
            try:
                elapsed_time = time.time() - start
                new_path = str(file_path) + "_"
                os.rename(file_path, new_path)
                os.rename(new_path, file_path)
                time.sleep(1)
                logger.info('file copy...')
                return True
            except OSError:
                time.sleep(1)
                if elapsed_time >= time_out:
                    logger.info('time out error')
                    return False

    @staticmethod
    def _get_md5_hash(file_path):
        """
File md5 hash value(Hexagonal format)Get
        :param file_path:
        :return:
        """
        with open(file_path, 'rb') as file:
            binary_data = file.read()
            #Get hash value in hexadecimal format
            md5 = hashlib.md5(binary_data).hexdigest()
            logger.info(f'File:{file_path} -Hash value- {md5}')
            return md5

    @staticmethod
    def _check_hash(src_hash, target_hash):
        """
Compare two hashes
        :param src_hash:
        :param target_hash:
        :return:
        """
        return src_hash == target_hash

    @staticmethod
    def _del_original_file( src):
        """
Delete the copy source file
        :param src:
        :return:
        """
        os.remove(src)

    @staticmethod
    def _move_original_file(src_path, move_path):
        """
Move copy source file(Evacuation)
        :return:
        """
        shutil.move(src_path, move_path)


def watch_start(from_watch_path, to_copy_path, backup_path):
    """
Folder monitoring process started
    :param from_watch_path  :Monitoring folder path
    :param to_copy_path     :Destination folder path
    :param backup_path      :Evacuation destination folder path
    :return:
    """
    event_handler = WacthFileHandler(from_watch_path, to_copy_path, backup_path)
    observer = Observer()
    observer.schedule(event_handler, from_watch_path, recursive=True)
    logger.info(f'Folder monitoring start')
    observer.start()
    try:
        while True:
            time.sleep(5)
    except KeyboardInterrupt:
        observer.stop()
    except Exception as e:
        observer.stop()
        raise e
    finally:
        # finaly =Last processing regardless of the occurrence of an exception
        logger.info(f'Folder monitoring finished')
        observer.join()


def make_log_folder():
    """
Create if there is no logs folder at startup
    :return:
    """
    p = pathlib.Path(sys.argv[0])
    p2 = pathlib.Path(p.parent) / pathlib.Path('logs')
    if not p2.exists():
        os.makedirs(str(p2))


def interpret_args():
    """
Run-time argument interpretation method
    :return:Runtime arguments
    """
    #Object creation
    parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)

    #Argument setting
    #Monitoring folder path(Mandatory)
    parser.add_argument("-wp", "--watch_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is watch folder path'''), type=str)

    #Copy destination folder path(Mandatory)
    parser.add_argument("-cp", "--copy_to_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is copy to folder path'''), type=str)

    #Evacuation destination folder path(Mandatory)
    parser.add_argument("-bk", "--backup_path", help=textwrap.dedent(
        '''\
        please set me.
        this is essential argument.
        this is backup folder path'''), type=str)

    #Return the result
    return parser.parse_args()


def check_args(args):
    """
Execution argument judgment method
    :param args:
    :return: True or False
    """
    #Error if the path of the monitored folder is not specified
    if not hasattr(args, 'watch_path') and args.watch_path is None:
        raise argparse.ArgumentError('I don't specify a monitoring folder!')

    #Error if the path of the destination folder is not specified
    if not hasattr(args, 'copy_to_path') and args.copy_to_path is None:
        raise argparse.ArgumentError('I don't specify the destination folder!')

    #Error if the save destination folder path is not specified
    if not hasattr(args, 'backup_path') and args.backup_path is None:
        raise argparse.ArgumentError('I don't specify the save destination folder!')

    #Object generation for each path
    watch_path = pathlib.Path(args.watch_path)
    copy_to_path = pathlib.Path(args.copy_to_path)
    backup_path = pathlib.Path(args.backup_path)

    #Check if the path of the monitoring folder exists
    if not watch_path.exists():
        raise FileNotFoundError('There is no monitoring folder!')

    #Check if the watch folder is a directory
    if not watch_path.is_dir():
        raise TypeError('The specified monitoring folder is not a folder!')

    #Check if the path of the destination folder exists
    if not copy_to_path.exists():
        raise FileNotFoundError('There is no destination folder!')

    #Check if the destination folder is a directory
    if not copy_to_path.is_dir():
        raise TypeError('The specified destination folder is not a folder!')

    #Check if the path of the destination folder exists
    if not backup_path.exists():
        raise FileNotFoundError('There is no save destination folder!')

    #Check if the destination folder is a directory
    if not backup_path.is_dir():
        raise TypeError('The specified save destination folder is not a folder!')


#Execution processing
if __name__ == '__main__':

    #Create if there is no log folder(* Since logging does not create folders)
    make_log_folder()

    #Logging settings
    # get the root logger
    root_logger = logging.getLogger()
    # set overall level to debug, default is warning for root logger
    root_logger.setLevel(logging.DEBUG)

    # setup logging to file, rotating at midnight
    file_log = MyTimedRotatingFileHandler(f'./logs/log_')
    file_log.setLevel(logging.DEBUG)
    file_formatter = logging.Formatter('■%(asctime)s - %(levelname)s - [%(funcName)s() %(lineno)line d] : %(message)s',
                                       datefmt='%Y-%m-%d %H:%M:%S')
    file_log.setFormatter(file_formatter)
    root_logger.addHandler(file_log)

    # setup logging to console
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    formatter = logging.Formatter('■%(asctime)s - %(levelname)s - [%(funcName)s() %(lineno)line d] : %(message)s',
                                  datefmt='%Y-%m-%d %H:%M:%S')
    console.setFormatter(formatter)
    root_logger.addHandler(console)

    # get a logger for my script
    logger = logging.getLogger(__name__)

    try:
        #Argument interpretation,Judgment
        args = interpret_args()
        #Argument check
        check_args(args)
        #Monitoring execution
        watch_start(args.watch_path, args.copy_to_path, args.backup_path)
    except argparse.ArgumentError as e:
        logger.error(e)
    except FileNotFoundError as e:
        logger.error(e)
    except TypeError as e:
        logger.error(e)
    except Exception as e:
        logger.error(e)

It's difficult to execute as it is Create a bat like ↓ as usual.

bat

execute.bat


@echo off
setlocal
Make the location where the rem script is located the current directory
cd /d %~dp0

SET APP_TITLE=folder_watch
SET EXE_SCRIPT=folder_watch.py
rem watch folder
SET WATCH_PATH=.\test\from
rem copy destination folder
SET COPY_PATH=.\test\to
rem backup destination folder
SET BK_PATH=.\test\bk

rem folder monitoring execution
START "%APP_TITLE%" ./python-3.8.2-embed-amd64/python.exe %EXE_SCRIPT% -wp %WATCH_PATH% -cp %COPY_PATH% -bk %BK_PATH%

I wonder if this is the end if I put it in the task scheduler appropriately I was wondering, but it seems to be the case. This app is resident and I would like you to start it immediately even if it falls. Also, if you use the task scheduler, many processes will be started. .. .. So I made the bat and source as follows.

bat

execute.bat


@echo off
setlocal
Make the location where the rem script is located the current directory
cd /d %~dp0

SET APP_TITLE=folder_watch
SET EXE_SCRIPT=folder_watch.py
rem watch folder
SET WATCH_PATH=.\test\from
rem copy destination folder
SET COPY_PATH=.\test\to
rem backup destination folder
SET BK_PATH=.\test\bk
rem PID write folder
SET PID_FILD=pid

Execute immediately if there is no rem pid file
IF EXIST %PID_FILD% (GOTO FILE_TRUE) ELSE GOTO FILE_FALSE

if the rem pid file already exists
:FILE_TRUE

Get pid number from rem file
SET /p PID_VAL=<pid
rem pid presence flag(true=1, false=0)
SET IS_EXIST=0
rem image name:"python.exe"Search for pid with(Only the number part)
for /F "usebackq tokens=2" %%a in (
`tasklist /fi "IMAGENAME eq python.exe" ^| findstr "[0-9]"`) do (
rem ECHO %%a
if %%a==%PID_VAL% SET IS_EXIST=1
)
rem ECHO %PID_VAL%
rem ECHO %IS_EXIST%

rem there is a match=Do nothing because it is already running
rem There is no match=Script execution because it is not started
IF %IS_EXIST%==1 (GOTO EOF) ELSE (GOTO APPT_START) 

If the rem pid file does not exist
:FILE_FALSE
GOTO APPT_START

rem folder monitoring execution
:APPT_START
START "%APP_TITLE%" ./python-3.8.2-embed-amd64/python.exe %EXE_SCRIPT% -w %WATCH_PATH% -cp %COPY_PATH% -bk %BK_PATH%
GOTO EOF


rem end
:EOF
rem pause
folder_watch

folder_watch.py


    try:
        #Add here!
        with open('pid', mode='w') as f:
            logger.info(f'pid = [{str(os.getpid())}]')
            f.write(str(os.getpid()))
        
        #Argument interpretation,Judgment
        args = interpret_args()
        #Argument check
        check_args(args)
        #Monitoring execution
        watch_start(args.watch_path, args.copy_to_path, args.backup_path)
    except argparse.ArgumentError as e:
        logger.error(e)
    except FileNotFoundError as e:
        logger.error(e)
    except TypeError as e:
        logger.error(e)
    except Exception as e:
        logger.error(e)

Save the pid to a file at startup on the python side, It is a process that does not start if the file contents and the current pid match on the batch side. With this, if you start the task scheduler at 1 minute intervals, you should be able to do something! So this is the end of this time. -It does not support copying folders ・ Isn't parallel processing better? -There was a File comparison in python without using MD5. There are various holes, but I will think about it again if I feel like it. The log is also fairly appropriate. .. .. We have not confirmed the operation so much, so please use it as a reference only. Github is on here

Where I got stuck

The point I got stuck in was when watchdog detected the creation (move / copy) of a file in the folder. The event was going to run when it wasn't completed yet. When a large file arrives, the copy has not been completed yet I get an error trying to move it. At first, the judgment process was done by referring to this article. In the case of windows, it seems that the behavior is not what I expected and the size of the file is acquired even during copying. (Will it be the move you expected for Linux?)

When I was in trouble, I found the next article. The answer of the copy judgment process in windows was written. Thank you stackoverflow! It is a process that renames whether copying (moving) is in progress and determines that it is still being copied (moved) if an error occurs. I wonder if there was such a way. However, the bottleneck is that the cost is high as it is because an Exception is generated. If anyone knows another method, please teach me.

Move (copy) completion judgment of detection file

folder_watch.py


@staticmethod
    def _wait_for_file_created_finished_linux(file_path, time_out):
        """
Not confirmed to work on Linux
Creation completion judgment method of the placed file
Reference URL:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
        :param file_path:
        :param time_out:
        :return:
        """
        size_now = 0
        size_past = -1
        start = time.time()
        while True:
            size_now = os.path.getsize(file_path)
            time.sleep(1)
            elapsed_time = time.time() - start
            if size_now == size_past and os.access(file_path, os.R_OK):
                return True
            else:
                size_past = os.path.getsize(file_path)
                if elapsed_time >= time_out:
                    return False

    @staticmethod
    def _wait_for_file_created_finished_windows(file_path: Path, time_out):
        """
Creating a placed file(copy)Completion determination method
Reference URL:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
        :param file_path:
        :param time_out:
        :return:
        """
        start = time.time()
        while True:
            try:
                elapsed_time = time.time() - start
                new_path = str(file_path) + "_"
                os.rename(file_path, new_path)
                os.rename(new_path, file_path)
                time.sleep(1)
                return True
            except OSError:
                time.sleep(1)
                if elapsed_time >= time_out:
                    return False

Reference URL / Source

https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying Thanks for the URL above.

Recommended Posts

[Python] Folder monitoring with watchdog
Easy folder synchronization with Python
FizzBuzz with Python3
Scraping with Python
Statistics with python
Device monitoring with On-box Python in IOS-XE
Scraping with Python
Python with Go
Twilio with Python
Integrate with Python
Play with 2016-Python
AES256 with python
Tested with Python
python starts with ()
with syntax (Python)
Traffic monitoring with Kibana, ElasticSearch and Python
Bingo with python
Zundokokiyoshi with python
Make a GIF animation with folder monitoring
Organize data divided by folder with Python
Winning with Monitoring
Excel with Python
Microcomputer with Python
Cast with python
[Python] Get the files in a folder with Python
Serial communication with Python
Zip, unzip with python
Django 1.11 started with Python3.6
Primality test with Python
Python with eclipse + PyDev.
Socket communication with Python
Data analysis with python 2
Scraping with Python (preparation)
Learning Python with ChemTHEATER 03
Sequential search with Python
Run Python with VBA
Handling yaml with python
Solve AtCoder 167 with python
Serial communication with python
[Python] Use JSON with Python
Learning Python with ChemTHEATER 05-1
Learn Python with ChemTHEATER
Run prepDE.py with python3
1.1 Getting Started with Python
Collecting tweets with Python
Binarization with OpenCV / Python
3. 3. AI programming with Python
Kernel Method with Python
Non-blocking with Python + uWSGI
Scraping with Python + PhantomJS
Posting tweets with python
Use mecab with Python3
[Python] Redirect with CGIHTTPServer
Operate Kinesis with Python
Getting Started with Python
Use DynamoDB with Python
Zundko getter with python
Handle Excel with python
Ohm's Law with Python
Primality test with python
Run Blender with python