Bis zum letzten Mal habe ich langsam vom Scraping bis zur Dateiverarbeitung geschrieben, aber zu diesem Zeitpunkt habe ich die Datei aufgeteilt.
Einer der Zwecke besteht darin, einen Testmodus zu erstellen, da "Verbindung zu S3 = eine große Anzahl von Wörtern, die zu Input werden = es dauert einige Zeit, um den Test auszuführen".
Diesmal habe ich versucht, es so zu teilen.
Dateiorganisation
├── app
│ ├ Treiber Selen Treiber platzieren
│ └── source
│ ├── run.py ausführbare Hauptdatei
│ ├── scraping.Py Scraping-Prozess(2 ..)
│ ├── make_outputs.py Verarbeitung heruntergeladener Dateien(3 ..)
│ ├── s3_operator.Verarbeitung bezogen auf py S3(diesmal:4 ..)
│ └── configs.py Umgebungsvariablen, Passwörter usw.
└── tmp
├── files
│ ├ aus S3 Hauptausführung
│ ├ zur S3-Hauptausführung
│ └ download Laden Sie die durch Scraping heruntergeladene Datei ab
└── protokolliert Protokolle(Selen Log etc.)
Rufen Sie die Verarbeitung jeder Datei aus der ausführbaren Hauptdatei = run.py auf. Da die aus S3 abgerufenen Wörter sehr groß sind, können Sie den "Testausführungsmodus" so einstellen, dass die Ausführungszeit zum Zeitpunkt des Tests kurz ist.
run.py
if __name__ == '__main__':
#Bereiten Sie sich darauf vor, einen Testausführungsmodus zu erstellen
parser = argparse.ArgumentParser(description='scraping batch')
parser.add_argument('--run_mode', dest='run_mode', default = 'normal', help='run_mode: test | normal')
args = parser.parse_args()
#Abrufen von Variablen für jede Umgebung
env = os.getenv('BATCH_ENV', 'LOCAL')
config = configs.load(env)
#Hauptverarbeitung
words = s3_operator.getFromS3(config) #Holen Sie sich die Datei von S3 und erhalten Sie die Wörter, die INPUT sind
scraping.main(config,args,words) #Kratzprozess(Wenn der Test ausgeführt wird, endet er mit 2 Fällen)
make_outputs.main(config,words) #Dateiverarbeitung
s3_operator.sendToS3(config) #Datei an S3 senden
Übrigens sieht configs.py so aus
configs.py
class Config:
login_url = "XXXXXXXXXX"
login_id = "XXXXXXXXXX"
login_password = "XXXXXXXXX"
#S3
region_name ="XXXXXXXXXX"
input_path = "XXXXXXXXXXX"
output_path = "XXXXXXXXXX"
@staticmethod
def load(env_name):
if env_name == 'PRD':
return PrdConfig()
elif env_name == 'STG':
return StgConfig()
elif env_name == 'DEV':
return DevConfig()
else:
return LocalConfig()
class LocalConfig(Config):
access_key_id = 'XXXXXXXXX'
secret_access_key = 'XXXXXXXXXX'
bucket_name = 'XXXXX'
#...Unterhalb von DevConfig,StgConfig,Gleiches gilt für PrdConfig
Ich werde den Prozess schreiben, um die Datei von S3 zu erhalten.
--Erstellen Sie einen Speicherort für Download-Dateien
/ YYYYMMDD / words.csv
, daher ist es etwas redundant. was auch immer.s3_operator
def getFromS3(config):
#Erstellen Sie einen Speicherort für Dateien, die von S3 heruntergeladen wurden
date = datetime.now().strftime('%Y%m%d')
dldir_name = os.path.abspath(__file__ + '/../../../tmp/files/fromS3/'.format(date))
dldir_path = Path(dldir_name)
dldir_path.mkdir(exist_ok=True)
download_dir = str(dldir_path.resolve())
#Definieren Sie Clientinformationen, um eine Verbindung zu S3 herzustellen
s3_client = boto3.client(
's3',
region_name=config.region_name,
aws_access_key_id=config.access_key_id,
aws_secret_access_key=config.secret_access_key,
)
#Rufen Sie eine Liste der Dateien mit dem angegebenen Pfad im angegebenen Bucket ab
key_prefix = config.output_path + '/{}/'.format(date)
response = s3_client.list_objects_v2(Bucket=config.bucket_name,Prefix=key_prefix)
if response["KeyCount"] == 0:
print('Objekt existiert nicht:{}'.format(key_prefix))
return
#Laden Sie die Dateien einzeln herunter
for object in response["Contents"]:
object_name = object["Key"]
download_file_name = os.path.basename(object["Key"])
if len(download_file_name) == 0:
continue
download_file_name = download_dir + '/' + download_file_name
s3_client.download_file(Bucket=config.bucket_name, Key=object_name, Filename=download_file_name)
print('Objekt"{}Ich habe heruntergeladen. Download-Ziel:{}'.format(object_name,download_file_name))
#Holen Sie sich Worte
download_file_name = download_dir + '/words.csv'
return pd.read_csv(download_file_name).values.tolist()
Dies ist der Vorgang zum Senden einer Datei. Dies ist einfach, weil Sie nur das senden, was Sie gemacht haben
s3_operator
def exportToS3(config):
date = datetime.now().strftime('%Y%m%d')
s3_client = boto3.client(
's3',
region_name=config.region_name,
aws_access_key_id=config.access_key_id,
aws_secret_access_key=config.secret_access_key,
)
upfiles = glob.glob(os.path.abspath(__file__ + '/../../../tmp/files/toS3/{}/*'.format(date)))
for f in upfiles :
filename = os.path.split(f)[1]
object_name = config.input_path + "/{}/{}".format(date,filename)
s3_client.upload_file(f,config.bucket_name,object_name)
Mit python run.py
kann das Programm nun seinen Zweck erreichen.
Aber…
Also werde ich es ab dem nächsten Mal schaffen.
Recommended Posts