** Wir werden auch die Standardbibliothek ändern ** ** Es sind keine Änderungen erforderlich, außer in Windows-Umgebungen oder wenn keine Parallelverarbeitung durchgeführt wird **
Eine Art Job Scheduler. Wenn es Abhängigkeiten zwischen mehreren Jobs gibt, werden diese in der richtigen Reihenfolge ausgeführt. Wenn zwischen den Jobs keine Abhängigkeiten bestehen, werden sie parallel ausgeführt.
Weitere Informationen finden Sie auf der offiziellen Seite. spotify/luigi
Parallelisierung ist nicht nur in der Windows-Umgebung möglich.
Der Grund dafür ist, dass luigi pickle verwendet, um Jobs zwischen Prozessen zu serialisieren, aber einige Objekte können von der pickle-Implementierung in der Windows-Umgebung nicht serialisiert werden. (Vielleicht)
Schreiben Sie die Bibliothek neu. Es gibt zwei Umschreibziele: luigi / worker.py und die Standardbibliothek multiprocessing / reduction.py.
Lib/site-packages/luigi/worker.py
#Import hinzufügen
from functools import partial
# TaskProcess.__init__intern
class TaskProcess(multiprocessing.Process):
...
def __init__(self, task, worker_id, result_queue, tracking_url_callback,
status_message_callback, use_multiprocessing=False, worker_timeout=0):
...
# self.tracking_url_callback = tracking_url_callback
self.tracking_url_callback = partial(tracking_url_callback, task)
# self.status_message_callback = status_message_callback
self.status_message_callback = partial(status_message_callback, task)
...
...
class worker(Config):
...
# Worker._create_task_Funktionen innerhalb des Prozesses verschieben
def _update_tracking_url(self, task, tracking_url):
self._scheduler.add_task(
task_id=task.task_id,
worker=self._id,
status=RUNNING,
tracking_url=tracking_url,
)
# Worker._create_task_Funktionen innerhalb des Prozesses verschieben
def _update_status_message(self, task, message):
self._scheduler.set_task_status_message(task.task_id, message)
def _create_task_process(self, task):
# def update_tracking_url(tracking_url):
# self._scheduler.add_task(
# task_id=task.task_id,
# worker=self._id,
# status=RUNNING,
# tracking_url=tracking_url,
# )
# def update_status_message(message):
# self._scheduler.set_task_status_message(task.task_id, message)
return TaskProcess(
task, self._id, self._task_result_queue, self._update_tracking_url, self._update_status_message,
use_multiprocessing=bool(self.worker_processes > 1),
worker_timeout=self._config.timeout
)
...
Lib/multiprocessing/reduction.py
#Teil am Anfang importieren
# import pickle
import dill as pickle
Dill kann mit Pip installiert werden.
Verwenden Sie die alte Version. In diesem Fall besteht kein großer Änderungsbedarf.
Dadurch wird die Standardbibliothek nicht geändert. Ich habe eine Fehlermeldung erhalten, als ich versucht habe, Luigi unter Windows parallel zu verarbeiten, aber die Lösung Pickle crashing when trying to pickle "update_tracking_url" in luigi.worker?
Soweit ich es tatsächlich benutze, ist es kein Problem, Bitte ändern Sie auf eigenes Risiko.
Recommended Posts