Dies ist ein Memo der 2.0-Serie Luigi, die ein Jobmanager von Python ist.
standard_task.py
import luigi
class MyTask(luigi.Task):
date = luigi.DateParameter()
def requires(self):
return MyDependentTask(self.date)
def run(self):
with self.output().open('w') as output:
with self.input().open('r') as input:
for line in input:
ret = do_something(line)
output.write(ret)
output.write('\n')
def output(self):
return luigi.LocalTarget('./out2_{0}.txt'.format(self.date.isoformat()))
class MyDependentTask(luigi.Task):
date = luigi.DateParameter()
def run(self):
with self.output().open('w') as output:
output.write("line1\n")
output.write("line2\n")
def output(self):
return luigi.LocalTarget('./out1_{0}.txt'.format(self.date.isoformat()))
if __name__ == '__main__':
luigi.run()
Verwenden Sie luigi.format.Nop
.
Zum Beispiel, wenn Sie einlegen möchten.
import pickle
import luigi
class SomeTask(luigi.Task):
def requires(self):
return xxxx
def run(self):
some_obj = hoge()
with self.output().open('w') as output:
output.write(pickle.dumps(some_obj, protocol=pickle.HIGHEST_PROTOCOL))
def output(self):
return luigi.LocalTarget(
format=luigi.format.Nop,
path='xxxxxxxx')
class NextTask(luigi.Task):
def requires(self):
return SomeTask()
def run(self):
with self.input().open('r') as infile:
ret = pickle.load(infile)
Gibt Target
mit luigi.format.GzipFormat
zurück, das in der Ausgabe der abhängigen Task übergeben wurde.
Übergeben Sie luigi.format.GzipFormat
wie bei der Eingabe an das Zielformat
class MyTask(luigi.Task):
def run(self):
with self.output().open('w') as output:
output.write('aaaa')
def output(self):
return luigi.LocalTarget('./out.gz', format=luigi.format.GzipFormat())
Geben Sie als Ausgabeformat luigi.format.Nop
an und wählen Sie den DataFrame aus und schreiben Sie ihn. Verwenden Sie nicht to_csv
usw., da der Typ verloren geht.
def run(self):
result_df = do_something()
with self.output().open('w') as output:
output.write(pickle.dumps(result_df, protocol=pickle.HIGHEST_PROTOCOL))
Die Seite empfängt als Eingang
def run(self):
with self.input().open('r') as infile:
input_df: pd.DataFrame = pickle.load(infile)
do_something(input_df)
luigi.WrapperTask
implementiert weder run
noch output
.
class MyInvokerTask(luigi.WrapperTask):
def requires(self):
return [FugaTask(), BarTask(), BuzTask(), FooTask()]
class MyInvokerTask(luigi.WrapperTask):
def requires(self):
return [MyDepTask1(), MyDepTask2(), MyDepTask3()]
class MyDepTask1(luigi.Task):
priority = 100
#Folgendes wird weggelassen
Fügen Sie dann "--workers 2" oder etwas im Startbefehl hinzu. Schauen Sie sich die "Richtigkeit" jeder Aufgabe an und führen Sie sie mit Priorität von der höchsten aus.
Wenn Sie keine luigi-ähnlichen Abhängigkeiten definieren, sondern diese seriell verarbeiten möchten
class MyInvokerTask(luigi.WrapperTask):
def requires(self):
yield MyDepTask1()
yield MyDepTask2()
yield MyDepTask3()
Wenn Sie das Task-Objekt "luigi.task.externalize" verwenden, wird es nicht "ausgeführt", sondern nur überprüft, ob die Ausgabe generiert wird.
class MyTask(luigi.Task):
def requires(self):
return externalize(MyDependencyTask())
def run(self):
print('Someone has finished MyDependencyTask')
Wenn die Aufgabe in Visualiser PENDING ist oder nicht angezeigt wird (vom Scheduler freigegeben), führen Sie den Befehl erneut aus. Es werden nur Aufgaben ausgeführt, für die im Abhängigkeitsbaum keine Ausgabe generiert wurde.
Die Standardeinstellungen werden nicht wiederholt. Geben Sie daher die folgenden 4 Elemente in der Einstellungsdatei an. ** Hinweis: In Version 2.5 haben sich die Einstellungselemente für den Wiederholungsversuch geändert **
luigi.cfg
[core]
worker-keep-alive: true
max-reschedules: 20
[scheduler]
disable-num-failures: 10
retry-delay: 300
Deaktiviert die Aufgabe, wenn die Anzahl der "Disable-Num-Failures" innerhalb der durch "Disable-Window-Sekunden" angegebenen Zeit so oft überschritten wird.
luigi.cfg
disable-num-failures: 20
disable-window-seconds: 3600
Wenn Sie in "luigi.cfg" "retry-external -asks: true" festlegen, wird ExternalTask ebenfalls wiederholt. Die Angabe von "Wiederholungsverzögerung" gilt für jeden Scheduler und kann nicht für jede Aufgabe angegeben werden.
luigi.Task.event_handler Mit dem Dekorateur können Sie Haken machen. Wenn Sie die verstrichene Zeit der Aufgabe im Handler für "PROCESSING_TIME" erfassen, müssen Sie sie nur an einer Stelle implementieren.
@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def on_processing_time(task, duration):
logger.debug('Task {} proceed {:.1f} sec'.format(task, duration))
#Werfen, um irgendwo Metriken zu sammeln
# ...
AWS
http://luigi.readthedocs.org/en/stable/_modules/luigi/s3.html
Verwenden Sie luigi.s3.S3PathTask
class MyTask(luigi.Task):
def requires(self):
return luigi.s3.S3PathTask('s3://xxxxxxx')
Wenn gezippt
class MyTask(luigi.Task):
def requires(self):
return GzipS3FileTask('s3://hoge/fuga.gz')
def run(self):
input = self.input().open('r') #Kann mit gelesen werden
class GzipS3FileTask(luigi.s3.S3PathTask):
path = luigi.Parameter()
def output(self):
return luigi.s3.S3Target(self.path, format=luigi.format.GzipFormat())
Schreiben Sie mit der Ausgabe auf "luigi.s3.S3Target".
class MyTask(luigi.Task):
def run(self):
with self.output().open('w') as output:
output.write('Hey')
def output(self):
return luigi.s3.S3Target('s3://hoge/fuga.txt')
Übergeben Sie den Client für die STS-Verbindung an den Client von "S3Target"
class MyS3FileTask(luigi.s3.S3PathTask):
path = luigi.Parameter()
def output(self):
#Übergeben Sie den Schlüssel aus der angenommenen Rolle
client = luigi.s3.S3Client(
aws_access_key_id=xxxxx,
aws_secret_access_key=yyyyy,
security_token=zzzz)
return luigi.s3.S3Target('s3://xxxx', client=client)
Nehmen Sie die Einstellungen so vor
luigi.cfg
[core]
error-email: arn:aws:sns:ap-northeast-1:0000000000:sns-LuigiError
[email]
type: sns
force-send: true #True, wenn Sie auch während der manuellen Ausführung überspringen möchten
Übergeben Sie "AWS_DEFAULT_REGION" usw. entsprechend an den Startbefehl. Sie müssen den Berechtigungsnachweis nicht angeben, wenn Sie die IAM-Rolle der EC2-Instanz verwenden.
AWS_DEFAULT_REGION=ap-northeast-1 python sns_test.py Invoke
GCP
Die GCP-Anmeldeinformationen werden in der Umgebungsvariablen "GOOGLE_APPLICATION_CREDENTIALS" übergeben.
Verwenden Sie luigi.contrib.gcs.GCSTarget
Da "GCSTarget" ohne Annahme eines Fehlers erstellt wird, obwohl beim Erstellen einer Instanz ein Netzwerkzugriff erfolgt, ist es besser, es erneut zu versuchen, wenn 503 zurückkehrt.
Schreiben Sie an luigi.contrib.gcs.GCSTarget
import luigi
from luigi.contrib.gcs import GCSTarget
class MyTask(luigi.Task):
def requires(self):
return GCSPathTask(path='gs://hoge/fuga.txt')
def run(self):
with self.input().open('r') as input:
#Etwas tun
with self.output().open('w') as output:
#Schreiben Sie etwas in die Ausgabe
def output(self):
return GCSTarget('gs://hoge/fuga_result.txt')
class GCSPathTask(luigi.ExternalTask):
path = luigi.Parameter()
def output(self):
return GCSTarget(self.path)
luigi.contrib.bigquery
ist schwer zu benutzen, daher ist es besser, es im Voraus zu schreiben.
Insbesondere kann BigQuery Target eine Aufgabe nicht erneut ausführen, ohne die Tabelle zu löschen.