[PYTHON] Précautions à prendre lors de la manipulation de Luigi

Luigi:2.5.0 python:3.6

Précautions à prendre lors de la manipulation de Luigi

Luigi a un article qui évite le problème que le traitement parallèle n'est pas possible dans l'environnement Windows. Forcer luigi à effectuer un traitement parallèle dans un environnement Windows

À propos du nombre d'exécutions de require et run

Il y a un problème avec la gestion du générateur ~~, et les méthodes require et run de la tâche qui retourne la tâche dépendante sont appelées plusieurs fois par le planificateur. Cela signifie que le processus écrit en cours d'exécution ou requiert sera exécuté plusieurs fois selon la situation. Par conséquent, il est plus sûr de ne pas écrire des processus coûteux ou des processus qui affectent l'extérieur dans la méthode qui renvoie des tâches dépendantes.

Lors de la transition de la tâche de dépendance au traitement de la tâche de dépendance, l'objet générateur de la tâche de dépendance est écrasé, et lors du retour à la tâche de dépendance, un nouvel objet générateur est à nouveau acquis, donc le début de la méthode à chaque fois. Il sera redémarré à partir de. https://github.com/mtoriumi/luigi/blob/5678b6119ed260e8fb43410675be6d6daea445d1/luigi/worker.py#L130

regenerated_task_gen.gif

Sample:

from luigi import Task, run
from luigi.mock import MockTarget
from inspect import currentframe


class DependentTask(Task):

    def output(self):
        print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
        return MockTarget('out.txt')

    def run(self):
        print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
        with self.output().open('w') as fout:
            fout.write('DependentTask is succeeded')

    def on_success(self):
        print("Reached {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))

    def on_failure(self, exception):
        print("Reached {}.{} {}".format(self.__class__.__name__, currentframe().f_code.co_name, exception))


class StartTask(Task):

    def output(self):
        print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
        return MockTarget("StartTaskOut.txt")

    def run(self):
        print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
        with self.output().open('w') as fout:
            fout.write('StartTask is succeeded')
        yield DependentTask()

    def on_success(self):
        print("Reached {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))

    def on_failure(self, exception):
        print("Reached {}.{} {}".format(self.__class__.__name__, currentframe().f_code.co_name, exception))


if __name__ == '__main__':
    run(main_task_cls=StartTask)

Output:

running StartTask.output
running StartTask.run
running StartTask.output
running DependentTask.output
running DependentTask.output
running DependentTask.run
running DependentTask.output
Reached DependentTask.on_success
running StartTask.run
running StartTask.output
running DependentTask.output
running DependentTask.output
Reached StartTask.on_success

À propos de la limite du nombre de tentatives pour chaque tâche

~~ Il y a un bogue dans le processus de retry_count, qui spécifie le nombre de tentatives pour chaque tâche, et il est dit que le paramètre de limite du nombre de tentatives du planificateur sera utilisé pour la deuxième tentative et les suivantes. Actuellement, il ne fonctionne pas correctement, nous vous recommandons donc de ne pas l'utiliser. La cause est que la stratégie de nouvelle tentative (retry_policy_dict) n'est pas spécifiée dans le deuxième emplacement d'enregistrement de tâche et les suivants ci-dessous. https://github.com/spotify/luigi/blob/b33aa3033405bfe41d9f5a806d70fb8e98214d86/luigi/worker.py#L974-L984~~

~~ Des questions similaires ont été posées ci-dessous dans le passé. http://stackoverflow.com/questions/39595786/luigi-per-task-retry-policy~~

~~ Ceci envoie actuellement une pull request et est en attente d'examen. https://github.com/spotify/luigi/pull/2012~~

Il a été fusionné.

À propos des paramètres de luigid (planificateur central)

La variable d'environnement LUIGI_CONFIG_PATH doit également être spécifiée lors du démarrage de luigid. luigid fonctionne indépendamment, donc si vous souhaitez refléter le contenu de la section du planificateur, vous devez spécifier le fichier de configuration lorsque vous démarrez luigid.

À propos de l'ordre de traitement des méthodes de tâche

Les tâches sont traitées dans l'ordre ʻoutput () => requires`` => run () `.

Passer des paramètres depuis la ligne de commande

Lors du passage d'un paramètre à partir de la ligne de commande, le trait de soulignement dans le nom du paramètre est remplacé par un trait d'union.

Conversion de type lors de la prise de paramètres

Lorsque vous prenez un paramètre avec DictParameter, il devient un type unique appelé FrozenOrderedDict. Par conséquent, certaines méthodes qui pourraient être utilisées dans le type intégré ne peuvent pas être utilisées.

Bonne valeur à passer au paramètre

Les paramètres autres que le type intégré ne peuvent pas être acheminés entre des tâches parallèles. Si un objet est donné, il sera converti en une chaîne de nom de classe. S'il est en série, il peut être remis. Il s'agit d'une spécification selon laquelle les tâches parallèles fonctionnent dans plusieurs processus et les paramètres sont une fois convertis au format JSON.

Condition de fin de tâche

Il y a deux conditions pour terminer une tâche.

--Créer un fichier sur la cible spécifiée par output = ouvrir / fermer la cible (tâche réussie) --Exception dans la tâche (échec de la tâche)

Si l'un des éléments ci-dessus n'est pas satisfait, la tâche ne se terminera pas pour toujours tant qu'elle ne sera pas tuée.

Une exception s'est produite lors de l'ouverture de la cible

Si une exception se produit alors que la cible est ouverte en mode écriture, le fichier d'écriture temporaire restera en tant que garbage en raison du fichier occupé, donc n'écrivez pas un processus qui peut provoquer une exception après l'ouverture de la cible.

Recommended Posts

Précautions à prendre lors de la manipulation de Luigi
Précautions lors de l'installation de fbprophet
Précautions lors de l'utilisation de Chainer
Réponse aux erreurs lors de l'installation de mecab-python
Précautions lors de la sortie vers une table BigQuery toutes les heures avec Luigi
Précautions lors de l'utilisation de Pit avec Python
Précautions lors de l'héritage de la classe DatasetMixin
Précautions lors de l'utilisation de l'analyse des traits TextBlob
Précautions lors de l'installation de tensorflow avec anaconda
Précautions lors de l'utilisation de codecs et de pandas
Précautions lors de l'utilisation de la fonction urllib.parse.quote
Précautions lors de la création d'un générateur Python
Gestion des erreurs lors de la mise à jour de Fish shell
Précautions lors de l'utilisation de phantomjs de python
Précautions lors de l'utilisation de six avec Python 2.5
Précautions et gestion des erreurs lors de l'appel de la DLL .NET à partir de python à l'aide de pythonnet
Précautions à prendre lors de la manipulation des images png et jpg
Précautions lors du décapage d'une fonction en python
Points à noter lors de la résolution de problèmes DP avec Python
Précautions lors de l'utilisation de l'instruction for dans les pandas