Good evening, this is my first post in a long time. I have a little time With a memorial service for opening the app that flowed about a year ago I will try to make it again.
This time we will make a resident application that monitors folders. If the file is simply placed in a specified folder somewhere It's as simple as copying to a specified folder. It took some time. .. ..
Windows 10 python-3.8.2 Library used: watchdog
For the time being, create the following WatchFileHandler class. This is about the end of monitoring. You did it!
folder_watch.py
#Monitoring event acquisition class
class WacthFileHandler(FileSystemEventHandler):
def __init__(self, watch_path, copy_to_path, backup_path):
super(WacthFileHandler, self).__init__()
self.watch_path = watch_path
self.copy_to_path = copy_to_path
self.backup_path = backup_path
def on_moved(self, event):
"""
File movement detection
:param event:
:return:
"""
file_path = event.src_path
file_name = os.path.basename(file_path)
def on_created(self, event):
"""
File creation detection
:param event:
:return:
"""
src_path = event.src_path
src_name = os.path.basename(src_path)
def on_modified(self, event):
"""
File change detection
:param event:
:return:
"""
src_path = event.src_path
src_name = os.path.basename(src_path)
def on_deleted(self, event):
"""
File deletion detection
:param event:
:return:
"""
src_path = event.src_path
src_name = os.path.basename(src_path)
def watch_start(watch_path, copy_to_path, backup_path):
"""
Folder monitoring process started
:param from_watch_path :Monitoring folder path
:param to_copy_path :Destination folder path
:param backup_path :Evacuation destination folder path
:return
"""
event_handler = WacthFileHandler(watch_path, copy_to_path, backup_path)
observer = Observer()
observer.schedule(event_handler, watch_path, recursive=True)
observer.start()
try:
while True:
time.sleep(5)
except KeyboardInterrupt:
observer.stop()
except Exception as e:
observer.stop()
raise e
finally:
# finaly =Last processing regardless of the occurrence of an exception
observer.join()
Next, we will monitor and consider the behavior when the file arrives. This time after moving (copying) the file Make sure that the original file matches properly and delete the original file. If you cannot copy it properly, isolate it. It has become an on-parade of static method. .. ..
folder_watch.py
def on_created(self, event):
"""
File creation detection
:param event:
:return:
"""
#Get file name
src_name = os.path.basename(event.src_path)
#Generate the folder path of the monitoring source
src_path = pathlib.Path(self.watch_path) / pathlib.Path(f'{src_name}')
#copy(Move)Generate destination folder path
copy_path = pathlib.Path(self.copy_to_path) / pathlib.Path(f'{src_name}')
#Generate backup destination folder path
backup_link = pathlib.Path(self.backup_path)
try:
#Execute processing
self._run(src_path, copy_path, backup_link)
except TimeoutError as e:
#It's too big!
pass
except Exception as e:
pass
def _run(self, src: Path, copy: Path, bk: Path):
"""
Copy, check, delete when file is detected/Move
:param src:
:param copy:
:return:
"""
#Wait for placement to complete(For a certain period of time[600s]Wait)
if not self._wait_for_file_created_finished_windows(file_path=src, time_out=600):
raise TimeoutError
#Copy the placed file
if not self._copy_to_file(src, copy):
return
#Get the hashes of two files
src_hash = self._get_md5_hash(src)
copy_hash = self._get_md5_hash(copy)
if self._check_hash(src_hash, copy_hash):
#Hash match
#Delete the original file
self._del_original_file(src)
else:
#Hash mismatch
#Move to evacuation destination
self._move_original_file(bk)
def _copy_to_file(self, src, copy):
"""
Copy the placed file to the specified folder
:param src:
:param copy_to:
:return:
"""
#If there is no placed file, no further processing will be performed.
if not src.exists():
return False
#File metadata(Creation time, change time, etc.)Copy including
copy_link = shutil.copy2(src, copy, follow_symlinks=True)
#Check that the path you were trying to copy matches the path you copied
if copy != copy_link:
return False
if not copy.exists():
return False
return True
@staticmethod
def _wait_for_file_created_finished_linux(file_path, time_out):
"""
Not confirmed to work on Linux
Creation completion judgment method of the placed file
Reference URL:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
:param file_path:
:param time_out:
:return:
"""
size_now = 0
size_past = -1
start = time.time()
while True:
size_now = os.path.getsize(file_path)
time.sleep(1)
elapsed_time = time.time() - start
if size_now == size_past and os.access(file_path, os.R_OK):
return True
else:
size_past = os.path.getsize(file_path)
if elapsed_time >= time_out:
return False
@staticmethod
def _wait_for_file_created_finished_windows(file_path: Path, time_out):
"""
Creating a placed file(copy)Completion determination method
Reference URL:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
:param file_path:
:param time_out:
:return:
"""
start = time.time()
while True:
try:
elapsed_time = time.time() - start
new_path = str(file_path) + "_"
os.rename(file_path, new_path)
os.rename(new_path, file_path)
time.sleep(1)
return True
except OSError:
time.sleep(1)
if elapsed_time >= time_out:
return False
@staticmethod
def _get_md5_hash(file_path):
"""
File md5 hash value(Hexagonal format)Get
:param file_path:
:return:
"""
with open(file_path, 'rb') as file:
binary_data = file.read()
#Get hash value in hexadecimal format
md5 = hashlib.md5(binary_data).hexdigest()
return md5
@staticmethod
def _check_hash(src_hash, target_hash):
"""
Compare two hashes
:param src_hash:
:param target_hash:
:return:
"""
return src_hash == target_hash
@staticmethod
def _del_original_file( src):
"""
Delete the copy source file
:param src:
:return:
"""
os.remove(src)
@staticmethod
def _move_original_file(src_path, move_path):
"""
Move copy source file(Evacuation)
:return:
"""
shutil.move(src_path, move_path)
folder_watch.py
def interpret_args():
"""
Run-time argument interpretation method
:return:Runtime arguments
"""
#Object creation
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
#Argument setting
#Monitoring folder path(Mandatory)
parser.add_argument("-w", "--watch_path", help=textwrap.dedent(
'''\
please set me.
this is essential argument.
this is watch folder path'''), type=str)
#Copy destination folder path(Mandatory)
parser.add_argument("-cp", "--copy_to_path", help=textwrap.dedent(
'''\
please set me.
this is essential argument.
this is copy to folder path'''), type=str)
#Evacuation destination folder path(Mandatory)
parser.add_argument("-bk", "--backup_path", help=textwrap.dedent(
'''\
please set me.
this is essential argument.
this is backup to folder path'''), type=str)
#Return the result
return parser.parse_args()
def check_args(args):
"""
Execution argument judgment method
:param args:
:return: True or False
"""
#Error if the path of the monitored folder is not specified
if not hasattr(args, 'watch_path') and args.watch_path is None:
raise argparse.ArgumentError('I don't specify a monitoring folder!')
#Error if the path of the destination folder is not specified
if not hasattr(args, 'copy_to_path') and args.copy_to_path is None:
raise argparse.ArgumentError('I don't specify the destination folder!')
#Error if the save destination folder path is not specified
if not hasattr(args, 'backup_path') and args.backup_path is None:
raise argparse.ArgumentError('I don't specify the save destination folder!')
#Object generation for each path
watch_path = pathlib.Path(args.watch_path)
copy_to_path = pathlib.Path(args.copy_to_path)
backup_path = pathlib.Path(args.backup_path)
#Check if the path of the monitoring folder exists
if not watch_path.exists():
raise FileNotFoundError('There is no monitoring folder!')
#Check if the watch folder is a directory
if not watch_path.is_dir():
raise TypeError('The specified monitoring folder is not a folder!')
#Check if the path of the destination folder exists
if not copy_to_path.exists():
raise FileNotFoundError('There is no destination folder!')
#Check if the destination folder is a directory
if not copy_to_path.is_dir():
raise TypeError('The specified destination folder is not a folder!')
#Check if the path of the destination folder exists
if not backup_path.exists():
raise FileNotFoundError('There is no save destination folder!')
#Check if the destination folder is a directory
if not backup_path.is_dir():
raise TypeError('The specified save destination folder is not a folder!')
#Execution processing
if __name__ == '__main__':
try:
#Argument interpretation,Judgment
args = interpret_args()
#Argument check
check_args(args)
#Monitoring execution
watch_start(args.watch_path, args.copy_to_path)
except argparse.ArgumentError as e:
pass
except FileNotFoundError as e:
pass
except TypeError as e:
pass
except Exception as e:
pass
At this rate, I don't know what I moved or what came, so I will prepare a log. For the time being, the sauce is almost complete.
folder_watch.py
# -*- coding: utf-8 -*-
import os
import time
import sys
import logging
import hashlib
import argparse
import textwrap
import pathlib
from pathlib import Path
import shutil
from datetime import datetime
from watchdog.observers import Observer
from logging.handlers import TimedRotatingFileHandler
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
try:
import codecs
except ImportError:
codecs = None
class MyTimedRotatingFileHandler(logging.handlers.TimedRotatingFileHandler):
"""
File handler for date log output-class
"""
def __init__(self, dir_log):
self.dir_log = dir_log
filename = self.dir_log + time.strftime("%Y%m%d") + ".log" # dir_log here MUST be with os.sep on the end
logging.handlers.TimedRotatingFileHandler.__init__(self, filename, when='midnight', interval=1, backupCount=0,
encoding=None)
def doRollover(self):
"""
TimedRotatingFileHandler remix - rotates logs on daily basis, and filename of current logfile is time.strftime("%m%d%Y")+".txt" always
"""
self.stream.close()
# get the time that this sequence started at and make it a TimeTuple
t = self.rolloverAt - self.interval
timeTuple = time.localtime(t)
self.baseFilename = self.dir_log + time.strftime("%Y%m%d") + ".log"
if self.encoding:
self.stream = codecs.open(self.baseFilename, 'w', self.encoding)
else:
self.stream = open(self.baseFilename, 'w')
self.rolloverAt = self.rolloverAt + self.interval
#Monitoring event acquisition class
class WacthFileHandler(FileSystemEventHandler):
def __init__(self, watch_path, copy_to_path, backup_path):
super(WacthFileHandler, self).__init__()
self.watch_path = watch_path
self.copy_to_path = copy_to_path
self.backup_path = backup_path
def on_moved(self, event):
"""
File movement detection
:param event:
:return:
"""
src_path = event.src_path
src_name = os.path.basename(src_path)
logger.info(f'{src_name}Has moved')
def on_created(self, event):
"""
File creation detection
:param event:
:return:
"""
#Get file name
src_name = os.path.basename(event.src_path)
logger.info(f'{src_name}Was done')
#Generate the folder path of the monitoring source
src_path = pathlib.Path(self.watch_path) / pathlib.Path(f'{src_name}')
#copy(Move)Generate destination folder path
copy_path = pathlib.Path(self.copy_to_path) / pathlib.Path(f'{src_name}')
#Generate backup destination folder path
backup_link = pathlib.Path(self.backup_path)
try:
#Execute processing
self._run(src_path, copy_path, backup_link)
except TimeoutError as e:
#It's too big!
logger.error('It's too big!')
logger.error(e)
except Exception as e:
logger.error(e)
def on_modified(self, event):
"""
File change detection
:param event:
:return:
"""
src_path = event.src_path
src_name = os.path.basename(src_path)
logger.info(f'{src_name}Changed')
def on_deleted(self, event):
"""
File deletion detection
:param event:
:return:
"""
src_path = event.src_path
src_name = os.path.basename(src_path)
logger.info(f'{src_name}Deleted s')
def _run(self, src: Path, copy: Path, bk: Path):
"""
Copy, check, delete when file is detected/Move
:param src:
:param copy:
:return:
"""
#Wait for placement to complete(For a certain period of time[600s]Wait)
if not self._wait_for_file_created_finished_windows(file_path=src, time_out=600):
raise TimeoutError
#Copy the placed file
if not self._copy_to_file(src, copy):
return
#Get the hashes of two files
src_hash = self._get_md5_hash(src)
copy_hash = self._get_md5_hash(copy)
if self._check_hash(src_hash, copy_hash):
#Hash match
#Delete the original file
self._del_original_file(src)
else:
#Hash mismatch
#Move to evacuation destination
self._move_original_file(bk)
def _copy_to_file(self, src, copy):
"""
Copy the placed file to the specified folder
:param src:
:param copy_to:
:return:
"""
#If there is no placed file, no further processing will be performed.
if not src.exists():
return False
#File metadata(Creation time, change time, etc.)Copy including
copy_link = shutil.copy2(src, copy, follow_symlinks=True)
#Check that the path you were trying to copy matches the path you copied
if copy != copy_link:
return False
if not copy.exists():
return False
return True
@staticmethod
def _wait_for_file_created_finished_linux(file_path, time_out):
"""
Not confirmed to work on Linux
Creation completion judgment method of the placed file
Reference URL:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
:param file_path:
:param time_out:
:return:
"""
size_now = 0
size_past = -1
start = time.time()
while True:
size_now = os.path.getsize(file_path)
time.sleep(1)
elapsed_time = time.time() - start
logger.info(f"size_now: {size_now}")
logger.info(f"size_past: {size_past}")
if size_now == size_past and os.access(file_path, os.R_OK):
logger.info("file has copied completely now size: %s", size_now)
return True
else:
size_past = os.path.getsize(file_path)
if elapsed_time >= time_out:
logger.info('time out error')
return False
@staticmethod
def _wait_for_file_created_finished_windows(file_path: Path, time_out):
"""
Creating a placed file(copy)Completion determination method
Reference URL:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
:param file_path:
:param time_out:
:return:
"""
start = time.time()
while True:
try:
elapsed_time = time.time() - start
new_path = str(file_path) + "_"
os.rename(file_path, new_path)
os.rename(new_path, file_path)
time.sleep(1)
logger.info('file copy...')
return True
except OSError:
time.sleep(1)
if elapsed_time >= time_out:
logger.info('time out error')
return False
@staticmethod
def _get_md5_hash(file_path):
"""
File md5 hash value(Hexagonal format)Get
:param file_path:
:return:
"""
with open(file_path, 'rb') as file:
binary_data = file.read()
#Get hash value in hexadecimal format
md5 = hashlib.md5(binary_data).hexdigest()
logger.info(f'File:{file_path} -Hash value- {md5}')
return md5
@staticmethod
def _check_hash(src_hash, target_hash):
"""
Compare two hashes
:param src_hash:
:param target_hash:
:return:
"""
return src_hash == target_hash
@staticmethod
def _del_original_file( src):
"""
Delete the copy source file
:param src:
:return:
"""
os.remove(src)
@staticmethod
def _move_original_file(src_path, move_path):
"""
Move copy source file(Evacuation)
:return:
"""
shutil.move(src_path, move_path)
def watch_start(from_watch_path, to_copy_path, backup_path):
"""
Folder monitoring process started
:param from_watch_path :Monitoring folder path
:param to_copy_path :Destination folder path
:param backup_path :Evacuation destination folder path
:return:
"""
event_handler = WacthFileHandler(from_watch_path, to_copy_path, backup_path)
observer = Observer()
observer.schedule(event_handler, from_watch_path, recursive=True)
logger.info(f'Folder monitoring start')
observer.start()
try:
while True:
time.sleep(5)
except KeyboardInterrupt:
observer.stop()
except Exception as e:
observer.stop()
raise e
finally:
# finaly =Last processing regardless of the occurrence of an exception
logger.info(f'Folder monitoring finished')
observer.join()
def make_log_folder():
"""
Create if there is no logs folder at startup
:return:
"""
p = pathlib.Path(sys.argv[0])
p2 = pathlib.Path(p.parent) / pathlib.Path('logs')
if not p2.exists():
os.makedirs(str(p2))
def interpret_args():
"""
Run-time argument interpretation method
:return:Runtime arguments
"""
#Object creation
parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
#Argument setting
#Monitoring folder path(Mandatory)
parser.add_argument("-wp", "--watch_path", help=textwrap.dedent(
'''\
please set me.
this is essential argument.
this is watch folder path'''), type=str)
#Copy destination folder path(Mandatory)
parser.add_argument("-cp", "--copy_to_path", help=textwrap.dedent(
'''\
please set me.
this is essential argument.
this is copy to folder path'''), type=str)
#Evacuation destination folder path(Mandatory)
parser.add_argument("-bk", "--backup_path", help=textwrap.dedent(
'''\
please set me.
this is essential argument.
this is backup folder path'''), type=str)
#Return the result
return parser.parse_args()
def check_args(args):
"""
Execution argument judgment method
:param args:
:return: True or False
"""
#Error if the path of the monitored folder is not specified
if not hasattr(args, 'watch_path') and args.watch_path is None:
raise argparse.ArgumentError('I don't specify a monitoring folder!')
#Error if the path of the destination folder is not specified
if not hasattr(args, 'copy_to_path') and args.copy_to_path is None:
raise argparse.ArgumentError('I don't specify the destination folder!')
#Error if the save destination folder path is not specified
if not hasattr(args, 'backup_path') and args.backup_path is None:
raise argparse.ArgumentError('I don't specify the save destination folder!')
#Object generation for each path
watch_path = pathlib.Path(args.watch_path)
copy_to_path = pathlib.Path(args.copy_to_path)
backup_path = pathlib.Path(args.backup_path)
#Check if the path of the monitoring folder exists
if not watch_path.exists():
raise FileNotFoundError('There is no monitoring folder!')
#Check if the watch folder is a directory
if not watch_path.is_dir():
raise TypeError('The specified monitoring folder is not a folder!')
#Check if the path of the destination folder exists
if not copy_to_path.exists():
raise FileNotFoundError('There is no destination folder!')
#Check if the destination folder is a directory
if not copy_to_path.is_dir():
raise TypeError('The specified destination folder is not a folder!')
#Check if the path of the destination folder exists
if not backup_path.exists():
raise FileNotFoundError('There is no save destination folder!')
#Check if the destination folder is a directory
if not backup_path.is_dir():
raise TypeError('The specified save destination folder is not a folder!')
#Execution processing
if __name__ == '__main__':
#Create if there is no log folder(* Since logging does not create folders)
make_log_folder()
#Logging settings
# get the root logger
root_logger = logging.getLogger()
# set overall level to debug, default is warning for root logger
root_logger.setLevel(logging.DEBUG)
# setup logging to file, rotating at midnight
file_log = MyTimedRotatingFileHandler(f'./logs/log_')
file_log.setLevel(logging.DEBUG)
file_formatter = logging.Formatter('■%(asctime)s - %(levelname)s - [%(funcName)s() %(lineno)line d] : %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
file_log.setFormatter(file_formatter)
root_logger.addHandler(file_log)
# setup logging to console
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('■%(asctime)s - %(levelname)s - [%(funcName)s() %(lineno)line d] : %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
console.setFormatter(formatter)
root_logger.addHandler(console)
# get a logger for my script
logger = logging.getLogger(__name__)
try:
#Argument interpretation,Judgment
args = interpret_args()
#Argument check
check_args(args)
#Monitoring execution
watch_start(args.watch_path, args.copy_to_path, args.backup_path)
except argparse.ArgumentError as e:
logger.error(e)
except FileNotFoundError as e:
logger.error(e)
except TypeError as e:
logger.error(e)
except Exception as e:
logger.error(e)
It's difficult to execute as it is Create a bat like ↓ as usual.
execute.bat
@echo off
setlocal
Make the location where the rem script is located the current directory
cd /d %~dp0
SET APP_TITLE=folder_watch
SET EXE_SCRIPT=folder_watch.py
rem watch folder
SET WATCH_PATH=.\test\from
rem copy destination folder
SET COPY_PATH=.\test\to
rem backup destination folder
SET BK_PATH=.\test\bk
rem folder monitoring execution
START "%APP_TITLE%" ./python-3.8.2-embed-amd64/python.exe %EXE_SCRIPT% -wp %WATCH_PATH% -cp %COPY_PATH% -bk %BK_PATH%
I wonder if this is the end if I put it in the task scheduler appropriately I was wondering, but it seems to be the case. This app is resident and I would like you to start it immediately even if it falls. Also, if you use the task scheduler, many processes will be started. .. .. So I made the bat and source as follows.
execute.bat
@echo off
setlocal
Make the location where the rem script is located the current directory
cd /d %~dp0
SET APP_TITLE=folder_watch
SET EXE_SCRIPT=folder_watch.py
rem watch folder
SET WATCH_PATH=.\test\from
rem copy destination folder
SET COPY_PATH=.\test\to
rem backup destination folder
SET BK_PATH=.\test\bk
rem PID write folder
SET PID_FILD=pid
Execute immediately if there is no rem pid file
IF EXIST %PID_FILD% (GOTO FILE_TRUE) ELSE GOTO FILE_FALSE
if the rem pid file already exists
:FILE_TRUE
Get pid number from rem file
SET /p PID_VAL=<pid
rem pid presence flag(true=1, false=0)
SET IS_EXIST=0
rem image name:"python.exe"Search for pid with(Only the number part)
for /F "usebackq tokens=2" %%a in (
`tasklist /fi "IMAGENAME eq python.exe" ^| findstr "[0-9]"`) do (
rem ECHO %%a
if %%a==%PID_VAL% SET IS_EXIST=1
)
rem ECHO %PID_VAL%
rem ECHO %IS_EXIST%
rem there is a match=Do nothing because it is already running
rem There is no match=Script execution because it is not started
IF %IS_EXIST%==1 (GOTO EOF) ELSE (GOTO APPT_START)
If the rem pid file does not exist
:FILE_FALSE
GOTO APPT_START
rem folder monitoring execution
:APPT_START
START "%APP_TITLE%" ./python-3.8.2-embed-amd64/python.exe %EXE_SCRIPT% -w %WATCH_PATH% -cp %COPY_PATH% -bk %BK_PATH%
GOTO EOF
rem end
:EOF
rem pause
folder_watch.py
try:
#Add here!
with open('pid', mode='w') as f:
logger.info(f'pid = [{str(os.getpid())}]')
f.write(str(os.getpid()))
#Argument interpretation,Judgment
args = interpret_args()
#Argument check
check_args(args)
#Monitoring execution
watch_start(args.watch_path, args.copy_to_path, args.backup_path)
except argparse.ArgumentError as e:
logger.error(e)
except FileNotFoundError as e:
logger.error(e)
except TypeError as e:
logger.error(e)
except Exception as e:
logger.error(e)
Save the pid to a file at startup on the python side, It is a process that does not start if the file contents and the current pid match on the batch side. With this, if you start the task scheduler at 1 minute intervals, you should be able to do something! So this is the end of this time. -It does not support copying folders ・ Isn't parallel processing better? -There was a File comparison in python without using MD5. There are various holes, but I will think about it again if I feel like it. The log is also fairly appropriate. .. .. We have not confirmed the operation so much, so please use it as a reference only. Github is on here
The point I got stuck in was when watchdog detected the creation (move / copy) of a file in the folder. The event was going to run when it wasn't completed yet. When a large file arrives, the copy has not been completed yet I get an error trying to move it. At first, the judgment process was done by referring to this article. In the case of windows, it seems that the behavior is not what I expected and the size of the file is acquired even during copying. (Will it be the move you expected for Linux?)
When I was in trouble, I found the next article. The answer of the copy judgment process in windows was written. Thank you stackoverflow! It is a process that renames whether copying (moving) is in progress and determines that it is still being copied (moved) if an error occurs. I wonder if there was such a way. However, the bottleneck is that the cost is high as it is because an Exception is generated. If anyone knows another method, please teach me.
folder_watch.py
@staticmethod
def _wait_for_file_created_finished_linux(file_path, time_out):
"""
Not confirmed to work on Linux
Creation completion judgment method of the placed file
Reference URL:https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes
:param file_path:
:param time_out:
:return:
"""
size_now = 0
size_past = -1
start = time.time()
while True:
size_now = os.path.getsize(file_path)
time.sleep(1)
elapsed_time = time.time() - start
if size_now == size_past and os.access(file_path, os.R_OK):
return True
else:
size_past = os.path.getsize(file_path)
if elapsed_time >= time_out:
return False
@staticmethod
def _wait_for_file_created_finished_windows(file_path: Path, time_out):
"""
Creating a placed file(copy)Completion determination method
Reference URL:https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying
:param file_path:
:param time_out:
:return:
"""
start = time.time()
while True:
try:
elapsed_time = time.time() - start
new_path = str(file_path) + "_"
os.rename(file_path, new_path)
os.rename(new_path, file_path)
time.sleep(1)
return True
except OSError:
time.sleep(1)
if elapsed_time >= time_out:
return False
https://stackoverflow.com/questions/32092645/python-watchdog-windows-wait-till-copy-finishes https://stackoverflow.com/questions/34586744/os-path-getsize-on-windows-reports-full-file-size-while-copying Thanks for the URL above.
Recommended Posts