[PYTHON] Luigi Reverse Reference

Dies ist ein Memo der 2.0-Serie Luigi, die ein Jobmanager von Python ist.

Basic

Grundlegende Art der Aufgabe

standard_task.py


import luigi

class MyTask(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return MyDependentTask(self.date)

    def run(self):
        with self.output().open('w') as output:
            with self.input().open('r') as input:
                for line in input:
                    ret = do_something(line)
                    output.write(ret)
                    output.write('\n')

    def output(self):
        return luigi.LocalTarget('./out2_{0}.txt'.format(self.date.isoformat()))


class MyDependentTask(luigi.Task):
    date = luigi.DateParameter()

    def run(self):
        with self.output().open('w') as output:
            output.write("line1\n")
            output.write("line2\n")

    def output(self):
        return luigi.LocalTarget('./out1_{0}.txt'.format(self.date.isoformat()))


if __name__ == '__main__':
    luigi.run()

Ich möchte eine Binärdatei ausgeben

Verwenden Sie luigi.format.Nop. Zum Beispiel, wenn Sie einlegen möchten.

import pickle

import luigi


class SomeTask(luigi.Task):
    def requires(self):
        return xxxx

    def run(self):
        some_obj = hoge()
        with self.output().open('w') as output:
            output.write(pickle.dumps(some_obj, protocol=pickle.HIGHEST_PROTOCOL))

    def output(self):
        return luigi.LocalTarget(
            format=luigi.format.Nop,
            path='xxxxxxxx')


class NextTask(luigi.Task):
    def requires(self):
        return SomeTask()

    def run(self):
        with self.input().open('r') as infile:
            ret = pickle.load(infile)

Geben Sie eine komprimierte Datei ein

Gibt Target mit luigi.format.GzipFormat zurück, das in der Ausgabe der abhängigen Task übergeben wurde.

Gzip die Ausgabedatei

Übergeben Sie luigi.format.GzipFormat wie bei der Eingabe an das Zielformat

class MyTask(luigi.Task):
    def run(self):
        with self.output().open('w') as output:
            output.write('aaaa')

    def output(self):
        return luigi.LocalTarget('./out.gz', format=luigi.format.GzipFormat())

Ich möchte Pandas DataFrame ausgeben

Geben Sie als Ausgabeformat luigi.format.Nop an und wählen Sie den DataFrame aus und schreiben Sie ihn. Verwenden Sie nicht to_csv usw., da der Typ verloren geht.


def run(self):
    result_df = do_something()
    with self.output().open('w') as output:
        output.write(pickle.dumps(result_df, protocol=pickle.HIGHEST_PROTOCOL))

Die Seite empfängt als Eingang


def run(self):
    with self.input().open('r') as infile:
        input_df: pd.DataFrame = pickle.load(infile)
        do_something(input_df)    

Aufgaben, die nur Aufgaben ausführen

luigi.WrapperTask implementiert weder run noch output.

class MyInvokerTask(luigi.WrapperTask):
    def requires(self):
        return [FugaTask(), BarTask(), BuzTask(), FooTask()]

Führen Sie abhängige Aufgaben parallel aus

class MyInvokerTask(luigi.WrapperTask):
    def requires(self):
        return [MyDepTask1(), MyDepTask2(), MyDepTask3()]

class MyDepTask1(luigi.Task):
    priority = 100

    #Folgendes wird weggelassen

Fügen Sie dann "--workers 2" oder etwas im Startbefehl hinzu. Schauen Sie sich die "Richtigkeit" jeder Aufgabe an und führen Sie sie mit Priorität von der höchsten aus.

Führen Sie abhängige Aufgaben der Reihe nach aus

Wenn Sie keine luigi-ähnlichen Abhängigkeiten definieren, sondern diese seriell verarbeiten möchten

class MyInvokerTask(luigi.WrapperTask):
    def requires(self):
        yield MyDepTask1()
        yield MyDepTask2()
        yield MyDepTask3()

Verketten Sie keine abhängigen Aufgaben

Wenn Sie das Task-Objekt "luigi.task.externalize" verwenden, wird es nicht "ausgeführt", sondern nur überprüft, ob die Ausgabe generiert wird.

class MyTask(luigi.Task):
    def requires(self):
        return externalize(MyDependencyTask())

    def run(self):
        print('Someone has finished MyDependencyTask')

Wiederholen Sie einen Moosjob manuell

Wenn die Aufgabe in Visualiser PENDING ist oder nicht angezeigt wird (vom Scheduler freigegeben), führen Sie den Befehl erneut aus. Es werden nur Aufgaben ausgeführt, für die im Abhängigkeitsbaum keine Ausgabe generiert wurde.

Einstellung zum automatischen Wiederholen von Moosjobs

Die Standardeinstellungen werden nicht wiederholt. Geben Sie daher die folgenden 4 Elemente in der Einstellungsdatei an. ** Hinweis: In Version 2.5 haben sich die Einstellungselemente für den Wiederholungsversuch geändert **

luigi.cfg


[core]
worker-keep-alive: true
max-reschedules: 20

[scheduler]
disable-num-failures: 10
retry-delay: 300

Beenden Sie die Wiederholung, wenn die Wiederholungsversuche weiterhin fehlschlagen

Deaktiviert die Aufgabe, wenn die Anzahl der "Disable-Num-Failures" innerhalb der durch "Disable-Window-Sekunden" angegebenen Zeit so oft überschritten wird.

luigi.cfg


disable-num-failures: 20
disable-window-seconds: 3600

Warten Sie weiter auf die Ausgabe externer Aufgaben

Wenn Sie in "luigi.cfg" "retry-external -asks: true" festlegen, wird ExternalTask ebenfalls wiederholt. Die Angabe von "Wiederholungsverzögerung" gilt für jeden Scheduler und kann nicht für jede Aufgabe angegeben werden.

Sammeln Sie die Bearbeitungszeit für Aufgaben

luigi.Task.event_handler Mit dem Dekorateur können Sie Haken machen. Wenn Sie die verstrichene Zeit der Aufgabe im Handler für "PROCESSING_TIME" erfassen, müssen Sie sie nur an einer Stelle implementieren.

@luigi.Task.event_handler(luigi.Event.PROCESSING_TIME)
def on_processing_time(task, duration):
    logger.debug('Task {} proceed {:.1f} sec'.format(task, duration))
    #Werfen, um irgendwo Metriken zu sammeln
    # ...

AWS

http://luigi.readthedocs.org/en/stable/_modules/luigi/s3.html

Geben Sie die Datei ein, die die externe Aufgabe in S3 abgelegt hat

Verwenden Sie luigi.s3.S3PathTask

class MyTask(luigi.Task):
    def requires(self):
        return luigi.s3.S3PathTask('s3://xxxxxxx')

Wenn gezippt

class MyTask(luigi.Task):
    def requires(self):
        return GzipS3FileTask('s3://hoge/fuga.gz')

    def run(self):
        input = self.input().open('r') #Kann mit gelesen werden

class GzipS3FileTask(luigi.s3.S3PathTask):
    path = luigi.Parameter()

    def output(self):
        return luigi.s3.S3Target(self.path, format=luigi.format.GzipFormat())

Geben Sie das Ergebnis an S3 aus

Schreiben Sie mit der Ausgabe auf "luigi.s3.S3Target".

class MyTask(luigi.Task):
    def run(self):
        with self.output().open('w') as output:
            output.write('Hey')

    def output(self):
        return luigi.s3.S3Target('s3://hoge/fuga.txt')

Verwenden Sie die STS-Verbindung (Security Token Service), um auf S3 zuzugreifen

Übergeben Sie den Client für die STS-Verbindung an den Client von "S3Target"

class MyS3FileTask(luigi.s3.S3PathTask):
    path = luigi.Parameter()

    def output(self):
        #Übergeben Sie den Schlüssel aus der angenommenen Rolle
        client = luigi.s3.S3Client(
            aws_access_key_id=xxxxx,
            aws_secret_access_key=yyyyy,
            security_token=zzzz)
        return luigi.s3.S3Target('s3://xxxx', client=client)

Senden Sie eine Fehlerbenachrichtigung an SNS

Nehmen Sie die Einstellungen so vor

luigi.cfg


[core]
error-email: arn:aws:sns:ap-northeast-1:0000000000:sns-LuigiError

[email]
type: sns
force-send: true #True, wenn Sie auch während der manuellen Ausführung überspringen möchten

Übergeben Sie "AWS_DEFAULT_REGION" usw. entsprechend an den Startbefehl. Sie müssen den Berechtigungsnachweis nicht angeben, wenn Sie die IAM-Rolle der EC2-Instanz verwenden.

AWS_DEFAULT_REGION=ap-northeast-1 python sns_test.py Invoke

GCP

Die GCP-Anmeldeinformationen werden in der Umgebungsvariablen "GOOGLE_APPLICATION_CREDENTIALS" übergeben.

Geben Sie eine GCS-Datei ein

Verwenden Sie luigi.contrib.gcs.GCSTarget

Da "GCSTarget" ohne Annahme eines Fehlers erstellt wird, obwohl beim Erstellen einer Instanz ein Netzwerkzugriff erfolgt, ist es besser, es erneut zu versuchen, wenn 503 zurückkehrt.

Ausgabe an GCS

Schreiben Sie an luigi.contrib.gcs.GCSTarget

import luigi
from luigi.contrib.gcs import GCSTarget

class MyTask(luigi.Task):
   def requires(self):
       return GCSPathTask(path='gs://hoge/fuga.txt')

    def run(self):
        with self.input().open('r') as input:
            #Etwas tun
        
        with self.output().open('w') as output:
            #Schreiben Sie etwas in die Ausgabe

    def output(self):
        return GCSTarget('gs://hoge/fuga_result.txt')

class GCSPathTask(luigi.ExternalTask):
    path = luigi.Parameter()
    
    def output(self):
        return GCSTarget(self.path)

Führen Sie einen BigQuery-Ladejob aus

luigi.contrib.bigquery ist schwer zu benutzen, daher ist es besser, es im Voraus zu schreiben. Insbesondere kann BigQuery Target eine Aufgabe nicht erneut ausführen, ohne die Tabelle zu löschen.

Recommended Posts

Luigi Reverse Reference
Reverse Pull Pytest
Reverse Reference der Python-Datums- / Zeitbibliothek
Tipps zur Servereinstellung Reverse Pull
Django Management Screen Reverse Memo