Luigi:2.5.0 python:3.6
Luigi has an article that avoids the problem that parallel processing is not possible in the Windows environment. Force luigi to do parallel processing in windows environment
~~ There is a problem with the generator, ~~ The requires method and run method of the task that returns the dependent task are called multiple times by the scheduler. This means that the process written in run or requires will be executed multiple times depending on the situation. Therefore, it is safer not to write expensive processes or processes that affect the outside in the method that returns dependent tasks.
When transitioning from the dependent task to the processing of the dependent task, the generator object of the dependent task is overwritten, and when returning to the dependent task, a new generator object is acquired again, so the beginning of the method each time It will be restarted from. https://github.com/mtoriumi/luigi/blob/5678b6119ed260e8fb43410675be6d6daea445d1/luigi/worker.py#L130
Sample:
from luigi import Task, run
from luigi.mock import MockTarget
from inspect import currentframe
class DependentTask(Task):
def output(self):
print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
return MockTarget('out.txt')
def run(self):
print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
with self.output().open('w') as fout:
fout.write('DependentTask is succeeded')
def on_success(self):
print("Reached {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
def on_failure(self, exception):
print("Reached {}.{} {}".format(self.__class__.__name__, currentframe().f_code.co_name, exception))
class StartTask(Task):
def output(self):
print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
return MockTarget("StartTaskOut.txt")
def run(self):
print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
with self.output().open('w') as fout:
fout.write('StartTask is succeeded')
yield DependentTask()
def on_success(self):
print("Reached {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
def on_failure(self, exception):
print("Reached {}.{} {}".format(self.__class__.__name__, currentframe().f_code.co_name, exception))
if __name__ == '__main__':
run(main_task_cls=StartTask)
Output:
running StartTask.output
running StartTask.run
running StartTask.output
running DependentTask.output
running DependentTask.output
running DependentTask.run
running DependentTask.output
Reached DependentTask.on_success
running StartTask.run
running StartTask.output
running DependentTask.output
running DependentTask.output
Reached StartTask.on_success
~~ There is a bug in the process of retry_count, which specifies the number of retries for each task, and it is said that the scheduler's retry count limit setting will be used for the second and subsequent retries. Currently, it is not working properly, so we recommend that you do not use it. The cause is that the retry policy (retry_policy_dict) is not specified in the task registration location from the second time onward below. https://github.com/spotify/luigi/blob/b33aa3033405bfe41d9f5a806d70fb8e98214d86/luigi/worker.py#L974-L984~~
~~ Similar issues have been asked below in the past. http://stackoverflow.com/questions/39595786/luigi-per-task-retry-policy~~
~~ This is an issue that is currently sending a pull request and is awaiting review. https://github.com/spotify/luigi/pull/2012~~
It has been merged.
The LUIGI_CONFIG_PATH environment variable must also be specified when starting luigid. luigid works independently, so if you want to reflect the contents of the scheduler section, you have to specify the configuration file when you start luigid.
Tasks are processed in the order ʻoutput ()
=>
requires`` =>
run ()
`.
When passing parameters from the command line, the underscores in the parameter names are changed to hyphens.
If you take a parameter with DictParameter, it will be a unique type called FrozenOrderedDict. Therefore, some methods that could be used in the built-in type cannot be used.
Parameters other than built-in type cannot be routed between parallel tasks. If an object is given, it will be converted to a class name string. If it is in series, it can be handed over. This is a specification that parallel tasks operate in multiple processes, and the parameters are once converted to JSON format.
There are two conditions for completing a task.
--Create file to target specified by output = open / close target (successful task) --Exception in task (task failure)
If any of the above are not met, the task will not end forever until killed.
If an exception occurs while the target is open in write mode, the temporary write file will remain as garbage due to File busy, so do not write a process that can cause an exception after opening the target.
Recommended Posts