Luigi:2.5.0 python:3.6
Luigi hat einen Artikel, der das Problem vermeidet, dass eine parallele Verarbeitung in der Windows-Umgebung nicht möglich ist. Luigi zwingen, eine parallele Verarbeitung in einer Windows-Umgebung durchzuführen
Es gibt ein Problem bei der Handhabung des Generators ~~, und die erforderlichen und ausgeführten Methoden der Task, die die abhängige Task zurückgibt, werden vom Scheduler mehrmals aufgerufen. Dies bedeutet, dass der in run oder require geschriebene Prozess je nach Situation mehrmals ausgeführt wird. Daher ist es sicherer, keine teuren Prozesse oder Prozesse, die sich auf das Äußere auswirken, in die Methode zu schreiben, die abhängige Aufgaben zurückgibt.
Beim Übergang von der abhängigen Aufgabe zur Verarbeitung der abhängigen Aufgabe wird das Generatorobjekt der abhängigen Aufgabe überschrieben, und bei der Rückkehr zur abhängigen Aufgabe wird erneut ein neues Generatorobjekt erfasst, also jedes Mal der Beginn der Methode. Es wird von neu gestartet. https://github.com/mtoriumi/luigi/blob/5678b6119ed260e8fb43410675be6d6daea445d1/luigi/worker.py#L130
Sample:
from luigi import Task, run
from luigi.mock import MockTarget
from inspect import currentframe
class DependentTask(Task):
def output(self):
print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
return MockTarget('out.txt')
def run(self):
print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
with self.output().open('w') as fout:
fout.write('DependentTask is succeeded')
def on_success(self):
print("Reached {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
def on_failure(self, exception):
print("Reached {}.{} {}".format(self.__class__.__name__, currentframe().f_code.co_name, exception))
class StartTask(Task):
def output(self):
print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
return MockTarget("StartTaskOut.txt")
def run(self):
print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
with self.output().open('w') as fout:
fout.write('StartTask is succeeded')
yield DependentTask()
def on_success(self):
print("Reached {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
def on_failure(self, exception):
print("Reached {}.{} {}".format(self.__class__.__name__, currentframe().f_code.co_name, exception))
if __name__ == '__main__':
run(main_task_cls=StartTask)
Output:
running StartTask.output
running StartTask.run
running StartTask.output
running DependentTask.output
running DependentTask.output
running DependentTask.run
running DependentTask.output
Reached DependentTask.on_success
running StartTask.run
running StartTask.output
running DependentTask.output
running DependentTask.output
Reached StartTask.on_success
~~ Es gibt einen Fehler im Prozess von retry_count, der die Anzahl der Wiederholungsversuche für jede Aufgabe angibt, und es wird gesagt, dass die Einstellung für die Anzahl der Wiederholungsversuche des Schedulers für die zweiten und nachfolgenden Wiederholungsversuche verwendet wird. Derzeit funktioniert es nicht ordnungsgemäß. Wir empfehlen daher, es nicht zu verwenden. Die Ursache ist, dass die Wiederholungsrichtlinie (retry_policy_dict) ab dem zweiten Mal unten nicht mehr im Speicherort der Aufgabenregistrierung angegeben ist. https://github.com/spotify/luigi/blob/b33aa3033405bfe41d9f5a806d70fb8e98214d86/luigi/worker.py#L974-L984~~
~~ Ähnliche Probleme wurden in der Vergangenheit unten gestellt. http://stackoverflow.com/questions/39595786/luigi-per-task-retry-policy~~
~~ Dies sendet derzeit eine Pull-Anfrage und wartet auf die Überprüfung. https://github.com/spotify/luigi/pull/2012~~
Es wurde zusammengeführt.
Die Umgebungsvariable LUIGI_CONFIG_PATH muss auch beim Starten von luigid angegeben werden. luigid funktioniert unabhängig. Wenn Sie also den Inhalt des Scheduler-Abschnitts widerspiegeln möchten, müssen Sie die Konfigurationsdatei angeben, wenn Sie luigid starten.
Aufgaben werden in der Reihenfolge output ()
=> require
=> run ()
verarbeitet.
Wenn Sie einen Parameter über die Befehlszeile übergeben, wird der Unterstrich im Parameternamen in Bindestrich geändert.
Wenn Sie einen Parameter mit DictParameter verwenden, wird dieser zu einem eindeutigen Typ mit dem Namen FrozenOrderedDict. Daher können einige Methoden, die im integrierten Typ verwendet werden könnten, nicht verwendet werden.
Andere Parameter als der integrierte Typ können nicht zwischen parallelen Tasks weitergeleitet werden. Wenn ein Objekt angegeben wird, wird es in eine Klassennamenzeichenfolge konvertiert. Wenn es in Serie ist, kann es übergeben werden. Dies ist eine Spezifikation, bei der parallele Aufgaben in mehreren Prozessen ausgeführt werden und die Parameter einmal in das JSON-Format konvertiert werden.
Es gibt zwei Bedingungen für die Ausführung einer Aufgabe.
Wenn eine der oben genannten Bedingungen nicht erfüllt ist, endet die Aufgabe nicht für immer, bis sie getötet wird.
Wenn eine Ausnahme auftritt, während das Ziel im Schreibmodus geöffnet ist, bleibt die temporäre Schreibdatei als Müll in Datei beschäftigt. Schreiben Sie daher keinen Prozess, der nach dem Öffnen des Ziels eine Ausnahme verursachen kann.
Recommended Posts