Wie der Titel schon sagt, verfügt Redshift über ein Data Warehouse, das normalerweise von ELT verarbeitet wird. In einigen Fällen ist jedoch eine Datenverarbeitung durch Programmierung erforderlich.
Mit UNLOAD von Redshift können Sie eine gzip-Datei von Redshift nach S3 mit dem Ergebnis von SQL erstellen. Es wird also gesagt, dass sie von Lambda mit dem put-Ereignis in S3 als Trigger verarbeitet und im Status von gzip erneut in S3 hochgeladen wird. Das habe ich versucht.
UNLOAD Lambda hat derzeit maximal 3008 MB. Eine solche Verarbeitung erhöht zwangsläufig den Speicherbedarf mit zunehmender Dateigröße. Passen Sie daher die an Lambda übergebene Dateigröße an, indem Sie den Parameter [MAXFILESIZE] festlegen (https://docs.aws.amazon.com/ja_jp/redshift/latest/dg/r_UNLOAD.html). Es ist von Fall zu Fall vollständig, aber dieses Mal habe ich es auf 50 MB eingestellt.
Triggereinstellungen werden weggelassen.
import json
import boto3
import urllib.parse
import os
import sys
import csv
import re
import traceback
import gzip
import subprocess
s3client = boto3.client('s3')
s3resource = boto3.resource('s3')
SEP = '\t'
L_SEP = '\n'
S3OUTBACKET='XXXXXXXX'
S3OUTBASE='athena/preprocessing/XXXXXXtmp/'
def lambda_handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
taragetfile=os.path.split(key)[1]
outputprefixA=os.path.split(key)[0].split("/")[-1]
outputprefixB=os.path.split(key)[0].split("/")[-2]
outputdata = "";
try:
dlfilename ='/tmp/'+key.replace("/","")
s3client.download_file(bucket, key, dlfilename)
gzipfile = gzip.open(dlfilename, 'rt')
csvreader = csv.reader(gzipfile, delimiter=SEP, lineterminator=L_SEP, quoting=csv.QUOTE_NONE)
for line in csvreader:
#Verschiedene Verarbeitungen werden zeilenweise durchgeführt und in Ausgabedaten gespeichert.
#Unter den ausgelassenen Prozessen wird ein Import verwendet.
#bitte beachten Sie
except Exception as e:
print(e)
raise e
print("memory size at outputdata:"+str(sys.getsizeof(outputdata)))
os.remove(dlfilename)
uploadbinary = gzip.compress(bytes(outputdata , 'utf-8'))
print("memory size at uploadbinary:"+str(sys.getsizeof(uploadbinary)))
uploadfilename='processed_'+taragetfile
try:
bucket = S3OUTBACKET
key = S3OUTBASE+outputprefixA+"/"+outputprefixB+"/"+uploadfilename
obj = s3resource.Object(bucket,key)
obj.put( Body=uploadbinary )
except Exception as e:
print(e)
raise e
return 0
Als ich es mit einer tatsächlichen Datei getestet habe, ist ein Speicherfehler aufgetreten.
Das str (sys.getsizeof (outputdata))
in der Mitte des Codes dient zur Bestätigung, und ich habe die Situation anhand der Speichergröße erfasst. Obwohl es nicht im Code geschrieben ist, ist es gut, das Komprimierungsverhältnis von gzip selbst zu den Zieldaten zu sehen.
Die Daten, die ich dieses Mal verarbeitet habe, waren 50 MB nach der gzip-Komprimierung, aber es wurden 1000 MB Speicher für die verarbeiteten Daten + die komprimierten Daten benötigt. Schließlich ist es etwas, das Sie nicht verstehen können, wenn Sie es nicht tatsächlich versuchen. Es ist möglicherweise besser, die Speichersituation von Python etwas genauer zu untersuchen.
Wenn Sie die Speichergröße von Lambda erhöhen, erhöhen sich auch die CPU-Ressourcen. Dies hängt vom Verarbeitungsinhalt und der Dateigröße ab. Überprüfen Sie jedoch, wie schnell die Verarbeitung sein wird, sobald das Maximum 3008 MB beträgt. Wieder gab es Fälle, in denen eine Verdoppelung des Speichers die Verarbeitungszeit halbierte.
Wenn der Prozess regelmäßig durchgeführt wird, ist die Optimierung hier sehr wichtig, da sie direkt mit den laufenden Kosten zusammenhängt.
Lambda sehr praktisch