Construction de pipeline de données avec Python et Luigi

introduction

Cet article est une traduction japonaise de Building Data Pipelines with Python and Luigi. L'article original était bien fait, alors j'ai essayé de le traduire, même si je n'en étais pas sûr, pour ma propre compréhension. Si vous avez des erreurs, veuillez nous en informer dans les commentaires.

Construction de pipeline de données avec Python et Luigi

Pour les data scientists, les opérations quotidiennes sont souvent plus de recherche et développement que d'ingénierie. Néanmoins, le processus du prototype au produit nécessite pas mal d'efforts de reconfiguration, les décisions rapides et confuses étant la meilleure option suivante ^ 1. Cela retarde toujours l'innovation et, d'une manière générale, l'ensemble du projet.

Cet article décrit l'expérience de la création d'un pipeline de données: toutes les étapes courantes requises pour préparer les données pour un produit basé sur les données, telles que l'extraction, le nettoyage, la fusion et le prétraitement des données. L'accent est mis en particulier sur le ** transfert de données ** et sur la façon dont un gestionnaire de flux de travail comme Luigi peut être un sauveur sans vous déranger. Avec un minimum d'effort, la transition du prototype au produit se fera en douceur.

Un exemple de code est disponible sur GitHub Gist.

Prototype jusqu'à présent

Dans les prototypes précédents, le pipeline de données ressemblait à peu près à ceci:

$ python get_some_data.py
$ python clean_some_data.py
$ python join_other_data.py
$ python do_stuff_with_data.py

Dans la phase expérimentale préliminaire d'un projet de données, les éléments suivants sont assez courants: un pré-traitement est nécessaire, ce qui est susceptible de conduire à un piratage rapide ^ 2, il est donc en proie aux meilleures pratiques d'ingénierie. Et le nombre de scripts augmente et le pipeline de données pèle.

Cette approche n'a que l'avantage d'être rapide et pirate. L'inconvénient est que c'est ennuyeux: vous voudrez relancer le pipeline à chaque fois, et vous devrez appeler manuellement un tas de scripts l'un après l'autre. De plus, il y a beaucoup de malentendus lors du partage de ce prototype avec des collègues (comme "Pourquoi do_stuff_with_data ne fonctionne-t-il pas?", "Avez-vous d'abord fait" clean_some_data`? ", Etc.).

La solution apparemment pirate semble tout pousser dans un seul script. Après une rapide refactorisation, le script do_everything.py ressemblerait à ceci:

if __name__ == '__main__':
    get_some_data()
    clean_some_data()
    join_other_data()
    do_stuff_with_data()

Facile à faire:

$ python do_everything.py

(Remarque: vous pouvez tout rassembler dans un script bash qui appelle un tas de scripts en séquence, mais les inconvénients restent les mêmes)

Modèle de code

Lorsque nous entrons dans le pipeline prêt pour le produit, nous devons réfléchir un peu aux aspects du code qui exécutent tous les exemples. En particulier, la gestion des erreurs doit être envisagée:

try:
    get_some_data()
except GetSomeDataError as e:
    #La gestion des erreurs

Mais lorsque toutes les tâches sont réunies, cela se transforme en un essai / sauf le sapin de Noël:

try:
    get_some_data()
    try:
        clean_some_data()
        try:
            #Faites quelque chose ici...
        except EvenMoreErrors:
            # ...
    except CleanSomeDataError as e:
        #Gérer CleanSomeDataError
except GetSomeDataError as e:
    #Gérer GetSomeDataError

Une autre considération importante est de savoir comment restaurer le pipeline. Par exemple, si les premières tâches sont terminées, mais qu'une erreur se produit en cours de route, comment pouvez-vous réexécuter le pipeline sans réexécuter la première étape réussie?

#Vérifiez si la tâche est déjà réussie
if not i_got_the_data_already():
    #Sinon, fais-le
    try:
        get_some_date()
    except GetSomeDataError as e:
        #La gestion des erreurs

À Luigi

Luigi est un outil Python de gestion des flux de travail développé par Spotify pour aider à créer des pipelines de données complexes pour les travaux par lots. L'installation Luigi est:

pip install luigi

Les fonctionnalités utiles de Luigi sont:

--Gestion de la dépendance

Il existe deux concepts clés pour comprendre comment Luigi peut être appliqué au pipeline de données: les tâches et les cibles. Une tâche est un ensemble de tâches, représentées en héritant de la classe luigi.Task et en remplaçant certaines méthodes de base. La sortie de la tâche est la cible, qui peut être le système de fichiers local, Amazon S3 ou la base de données.

Les dépendances peuvent être définies sur les entrées et les sorties. Par exemple, si la tâche B dépend de la tâche A, cela signifie que la sortie de la tâche A est l'entrée de la tâche B.

Jetons un coup d'œil à quelques tâches typiques:

# Filename: run_luigi.py
import luigi
 
class PrintNumbers(luigi.Task):
 
    def requires(self):
        return []
 
    def output(self):
        return luigi.LocalTarget("numbers_up_to_10.txt")
 
    def run(self):
        with self.output().open('w') as f:
            for i in range(1, 11):
                f.write("{}\n".format(i))
 
class SquaredNumbers(luigi.Task):
 
    def requires(self):
        return [PrintNumbers()]
 
    def output(self):
        return luigi.LocalTarget("squares.txt")
 
    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))
                 
if __name__ == '__main__':
    luigi.run()

Ce code présente deux tâches: PrintNumbers, qui écrit les nombres de 1 à 10 ligne par ligne dans un fichier appelé number_up_to_10.txt, et squares, qui lit ce fichier et les paires avec des nombres carrés ligne par ligne. C'est un "Nombres au carré" qui écrit dans un fichier appelé .txt`.

Pour effectuer cette tâche:

$ python run_luigi.py SquaredNumbers --local-scheduler

Luigi considère la vérification des dépendances entre les tâches et constate qu'il n'y a pas d'entrée SquaredNumbers, alors lancez d'abord la tâche PrintNumbers, puis exécutez SquaredNumbers.

Le premier argument que vous passez à Luigi est le nom de la dernière tâche du pipeline que vous souhaitez effectuer. Le deuxième argument indique simplement à Luigi d'utiliser le planificateur local (nous en reparlerons plus tard).

Vous pouvez également utiliser la commande luigi:

$ luigi -m run_luigi.py SquaredNumbers --local-scheduler

Squelette de tâches

Pour créer une tâche Luigi, créez simplement une classe avec luigi.Task comme parent et remplacez certaines méthodes. En particulier:

--requires () est une liste de tâches dépendantes --ʻOutput () ʻest la cible de la tâche (par exemple, LocalTarget, S3Target, etc.) --run () est la logique d'exécution

Est. Luigi vérifie les valeurs de retour de requires () et ʻoutput () `et construit un graphe de dépendances en conséquence.

Passer les paramètres

Les noms de fichiers et les paramètres codés en dur sont généralement des anti-modèles. Une fois que vous avez compris la structure et la dynamique d'une tâche, vous devez paramétrer vos paramètres afin de pouvoir appeler dynamiquement le même script avec différents arguments.

C'est la classe luigi.Parameter (). Chaque tâche Luigi peut avoir plusieurs paramètres. Par exemple, disons dans l'exemple précédent que vous pouvez changer le nombre. Puisque nous utilisons des entiers comme paramètres pour la fonction range (), nous pouvons utiliser luigi.IntParameter au lieu de la classe de paramètres par défaut. La tâche modifiée ressemble à ceci:

class PrintNumbers(luigi.Task):
    n = luigi.IntParameter()
 
    def requires(self):
        return []
 
    def output(self):
        return luigi.LocalTarget("numbers_up_to_{}.txt".format(self.n))
 
    def run(self):
        with self.output().open('w') as f:
            for i in range(1, self.n+1):
                f.write("{}\n".format(i))
 
class SquaredNumbers(luigi.Task):
    n = luigi.IntParameter()
 
    def requires(self):
        return [PrintNumbers(n=self.n)]
 
    def output(self):
        return luigi.LocalTarget("squares_up_to_{}.txt".format(self.n))
 
    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))

Pour élever la tâche SquaredNumbers à 20 et l'appeler:

$ python run_luigi.py SquaredNumbers --local-scheduler --n 20

Les paramètres peuvent également avoir des valeurs par défaut. Par exemple:

n = luigi.IntParameter(default=10)

Dans ce cas, 10 est utilisé sauf si l'argument --n est spécifié.

Exemple vers GitHub Gist

Planificateur local ou global

Auparavant, j'ai utilisé l'option --local-scheduler lors de l'exécution des tâches de Luigi sur le planificateur local. Ceci est utile pour le développement, mais pour les environnements de produit, vous devez utiliser un planificateur centralisé (voir la documentation sur scheduler).

Cela présente plusieurs avantages:

Pour exécuter le démon du planificateur Luigi au premier plan:

$ luigid

En arrière-plan:

$ luigid --background

Il utilise le port 8082 par défaut, vous pouvez donc voir la visualisation en accédant à http: // localhost: 8082 / avec votre navigateur.

Lorsque le planificateur global Luigi est en cours d'exécution, il peut être réexécuté sans options pour le planificateur local:

$ python run_luigi.py SquaredNumbers --n [BIG_NUMBER]

L'exemple de code se termine en millisecondes, mais si vous voulez basculer vers le navigateur et voir le graphique de dépendances lorsque la tâche est toujours en cours d'exécution, vous en avez probablement un grand nombre, disons 10 000 000 ou plus, dans l'option --n. Vous devriez le donner.

Une capture d'écran du graphique de dépendance est: dependency-graph-screenshot.png

Résumé

Nous avons discuté de la définition d'un pipeline de données à l'aide de Luigi, un gestionnaire de flux de travail écrit en Python. Luigi fournit une belle abstraction du pipeline avec des tâches et des cibles, et prend également en compte les dépendances pour vous.

Du point de vue de la réutilisation du code et de la mentalité de passer du prototype au produit, j'ai des [packages Python] individuels (http://marcobonzanini.com/2015/07/01/how-to-develop) pour les tâches de logique métier. Je trouve utile de le définir comme -et-distribuer-python-packages /) (c'est-à-dire avec le fichier setup.py). De cette façon, vous pouvez simplement déclarer ʻimport your_package` à partir du script de Luigi et l'appeler à partir de là.

Il est possible pour une tâche de produire plusieurs fichiers en sortie, mais si c'est le cas, vous devriez probablement vous demander si la tâche peut être divisée en unités plus petites (c'est-à-dire plusieurs tâches). Leurs résultats sont-ils logiquement les mêmes? Y a-t-il des dépendances? Si vous ne pouvez pas diviser une tâche, je pense qu'il est simple et pratique de créer simplement ʻoutput () un fichier journal qui combine le nom de la tâche elle-même, l'horodatage, etc. Le nom du fichier journal serait quelque chose comme TaskName_timestamp_param1value_param2value_etc`.

Les gestionnaires de flux de travail comme Luigi gèrent les dépendances, réduisent la quantité de modèles de code pour la gestion des paramètres et des erreurs, gèrent la récupération des pannes et suivent des modèles clairs lors du développement de pipelines de données. De manière générale, c'est utile parce que c'est le cas.

Il est également important de prendre en compte les limites:

--Lugi est développé pour les travaux par lots, il est donc probablement inutile pour un traitement en temps quasi réel -Ne déclenche pas l'exécution. Vous devez exécuter le pipeline de données (par exemple via cronjob)


Recommended Posts

Construction de pipeline de données avec Python et Luigi
Construction d'interface graphique heureuse avec électron et python
Analyse de données avec python 2
Analyse de données avec Python
Exemple de données créées avec python
Programmation avec Python et Tkinter
Chiffrement et déchiffrement avec Python
Python et matériel - Utilisation de RS232C avec Python -
Obtenez des données Youtube avec python
Construction d'un environnement d'analyse de données avec Python (notebook IPython + Pandas)
Construction d'environnement Python et TensorFlow
Étudiez l'échange de données Java et Python avec Apache Arrow
python avec pyenv et venv
Fonctionne avec Python et R
Lire des données json avec python
Débarrassez-vous des données sales avec Python et les expressions régulières
Résolvez le livre en spirale (algorithme et structure de données) avec python!
Obtenez des données supplémentaires vers LDAP avec python (Writer et Reader)
Communiquez avec FX-5204PS avec Python et PyUSB
Construction d'environnement de python et opencv
Briller la vie avec Python et OpenCV
Commencez avec Python! ~ ① Construction de l'environnement ~
Robot fonctionnant avec Arduino et python
Installez Python 2.7.9 et Python 3.4.x avec pip.
Réseau neuronal avec OpenCV 3 et Python 3
Modulation et démodulation AM avec python
Scraping avec Node, Ruby et Python
Grattage avec Python, Selenium et Chromedriver
Grattage avec Python et belle soupe
[Python] Obtenez des données économiques avec DataReader
Encodage et décodage JSON avec python
Introduction à Hadoop et MapReduce avec Python
[GUI en Python] PyQt5-Glisser-déposer-
Structure de données Python apprise avec la chimioinfomatique
Hashing de données en R et Python
Lire et écrire NetCDF avec Python
J'ai joué avec PyQt5 et Python3
Construction de l'environnement Python3 avec pyenv-virtualenv (CentOS 7.3)
Visualisez facilement vos données avec Python seaborn.
Lire et écrire du CSV avec Python
Intégration multiple avec Python et Sympy
Traiter les données Pubmed .xml avec python
Construction de l'environnement pytorch @ python3.8 avec pipenv
Analyse de données à partir de python (visualisation de données 1)
Coexistence de Python2 et 3 avec CircleCI (1.0)
Analyse de données à partir de python (visualisation de données 2)
Application de Python: Nettoyage des données Partie 2: Nettoyage des données à l'aide de DataFrame
Jeu Sugoroku et jeu d'addition avec Python
Modulation et démodulation FM avec Python
Créer un environnement avec pyenv et pyenv-virtualenv
Obtenez des données de VPS MySQL avec Python 3 et SQL Alchemy
Déplacer les données vers LDAP avec python Change / Delete (Writer et Reader)
Construction de l'environnement LaTeX et R (un peu Python) avec SublimeText3 (Windows)
Communiquez entre Elixir et Python avec gRPC
Obtenez des données supplémentaires vers LDAP avec python
[Ubuntu 18.04] Créer un environnement Python avec pyenv + pipenv
Calculer et afficher le poids standard avec python
Recevoir des données textuelles de mysql avec python