Ceci est un article de synthèse sur la bibliothèque de wrapper de Luigi gokart.
Les motifs de développement et d'utilisation de base sont résumés très attentivement dans le blog de M3, et l'utilisation de base est C'est une histoire que vous devriez lire ici, mais je voulais la résumer comme une référence inversée, alors j'en ai fait un article.
De plus, je n'expliquerai pas grand chose sur les fonctions de Luigi lui-même.
Un type d'OSS pour le framework Pipeline développé par Spotify. Implémenté en Python, héritant de luigi.Task
--requires ()
: Tâche dépendante
--run ()
: Processus à exécuter
--ʻOutput () `: Destination de sortie
Vous pouvez facilement créer un flux de travail en écrivant simplement les trois méthodes.
L'origine du nom
Also it should be mentioned that Luigi is named after the pipeline-running friend of Super Mario.
Apparemment
Gokart est une bibliothèque de wrapper qui rend Luigi plus facile à utiliser.
L'origine du nom est probablement Mario (Kart).
Les fonctions de gokart == 0.3.6
sont résumées ci-dessous.
Lors de la création d'une tâche, héritez de gokart.TaskOnKart
au lieu de luigi.Task
.
import gokart
class TaskA(gokart.TaskOnKart):
def run(self):
data= pd.DataFrame(load_iris()['data'])
self.dump(data)
class TaskB(gokart.TaskOnKart):
def reuires(self):
return TaskA()
#la sortie est facultative
def output(self):
return self.make_target('data.pkl')
def run(self):
df =self.load()
self.dump(df)
L'utilisation de base est la même que Luigi, mais vous n'avez qu'à faire self.dump (objet que vous voulez sauvegarder)
, donc cela peut être beaucoup plus simple que de faire le même traitement avec Luigi seul. De plus, la sortie def (self)
peut être omise, auquel cas elle sera sauvegardée au format pickle
.
Exécutez comme suit.
gokart.run(['TaskB', '--local-scheduler'])
Une fois exécuté, l'objet sera enregistré sous resources
comme indiqué ci-dessous.
resources
├── data_3eba828ec57403111694b0fb1e3f62e0.pkl
└── log
├── module_versions
│ └── TaskB_3eba828ec57403111694b0fb1e3f62e0.txt
├── processing_time
│ └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
├── task_log
│ └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
└── task_params
└── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
La valeur de hachage est attachée au nom de fichier spécifié et enregistrée. La valeur de hachage étant déterminée par les paramètres de la tâche, la tâche sera réexécutée si les paramètres sont modifiés, contrairement au cas de ** Luigi seul. ** C'est également l'un des avantages du gokart. (Bien que cela soit décrit plus tard, il est également possible de sauvegarder sans ajouter de valeur de hachage)
le journal a
--Version du module utilisé --Temps de traitement --log sortie par enregistreur
Est sauvé.
Par défaut, il est enregistré sous resources
, mais le répertoire de destination de sauvegarde se trouve dans le fichier de paramètres.
[TaskOnKart]
workspace_directory=./output
Il peut être modifié en spécifiant comme.
load
Bien sûr, vous pouvez également charger un DataFrame enregistré avec self.load ()
, mais si vous voulez charger un ensemble de DataFrames comme [df1, df2, df3 ...]
, vous pouvez utiliser load_dataframe
. Vous pouvez charger plusieurs DataFrames dans un état combiné verticalement.
Vous pouvez également éventuellement spécifier une colonne avec set
pour lever une exception si cette colonne n'existe pas dans le DataFrame
à charger.
class TaskA(gokart.TaskOnKart):
def run(self):
df1 = pd.DataFrame([1,2], columns=['target'])
df2 = pd.DataFrame([3,4], columns=['target'])
df3 = pd.DataFrame([5,6], columns=['target'])
self.dump([df1, df2, df3])
class TaskB(gokart.TaskOnKart):
def requires(self):
return TaskA()
def run(self):
#Chargé après avoir été concaté
df =self.load_data_frame(required_columns={'target'})
self.dump(df)
S'il existe plusieurs tâches dépendantes, vous pouvez définir les tâches dépendantes au format dictionnaire et les lire avec la clé comme indiqué ci-dessous. Luigi seul peut charger plusieurs tâches, mais il ne prend pas en charge les formulaires de dictionnaire, donc l'utilisation du format de dictionnaire peut améliorer la lisibilité du code.
class TrainModel(gokart.TaskOnKart):
def requires(self):
return {'data': LoadData(), 'target': LoadTarget()}
def run(self):
data = self.load('data')
target = self.load('target')
model = LogisticRegression()
model.fit(data, target)
self.dump(model)
Vous pouvez utiliser self.load_generator
pour charger et traiter les tâches de manière séquentielle.
from sklearn.datasets import load_iris
from sklearn.datasets import load_wine
class LoadWineData(gokart.TaskOnKart):
def run(self):
data = load_wine()['data']
self.dump(data)
class LoadIrisData(gokart.TaskOnKart):
def run(self):
data = load_iris()['data']
self.dump(data)
class LoadGenerator(gokart.TaskOnKart):
def requires(self):
return [LoadWineData(), LoadIrisData()]
def run(self):
for data in self.load_generator():
print(f'data_shape={data.shape}')
# data_shape=(178, 13)
# data_shape=(150, 4)
output
Si ʻuse_unique_id = False`, la valeur de hachage ne sera pas attachée au nom du fichier.
def output(self):
return self.make_target('data.pkl', use_unique_id=False)
Pour les formats tels que gensim et TensorFlow où les modèles sont enregistrés dans plusieurs fichiers, vous pouvez utiliser make_model_target
comme indiqué ci-dessous pour les compresser et les enregistrer tous en même temps.
def output(self):
return self.make_model_target(
'model.zip',
save_function=gensim.model.Word2Vec.save,
load_function=gensim.model.Word2Vec.load)
En passant une fonction de sauvegarde / restauration comme paramètre, le modèle et load_function
sont compressés et sauvegardés sous forme d'ensemble au format zip (dans ce cas), et la tâche appelante est particulièrement concernée. Vous pouvez restaurer le modèle avec self.load ()
sans avoir à le faire.
Si vous utilisez make_large_data_frame_target
comme indiqué ci-dessous, DataFrame
sera divisé en plusieurs enregistrements pour chaque capacité spécifiée par max_byte
, compressé en un, puis sauvegardé.
def output(self):
return self.make_large_data_frame_target('large_df.zip', max_byte=2**10)
À propos, le make_model_target
mentionné ci-dessus est utilisé en interne.
Si vous voulez convertir DataFrame
dans un format autre que pickle
et l'enregistrer, ajoutez simplement l'extension de ce format et le FileProcessor
interne le convertira au format cible et l'enregistrera.
Les formats actuellement pris en charge sont
- pickle
- npz
- gz
- txt
- csv
- tsv
- json
- xml
est.
class LoadWineData(gokart.TaskOnKart):
def run(self):
data = load_wine()['data']
self.dump(data)
class ToCSV(gokart.TaskOnKart):
def requires(self):
return LoadWineData()
def output(self):
#Définissez l'extension que vous souhaitez enregistrer dans le suffixe
return self.make_target('./wine.csv')
def run(self):
df = pd.DataFrame(self.load())
self.dump(df)
Si le chemin de work_space_directory
décrit dans le fichier de configuration commence par gs: //
, tous les résultats de sortie seront téléchargés vers GCS, et s'il s'agit de s3: //
, tous les résultats de sortie seront téléchargés vers S3.
[TaskOnKart]
#préfixe gs://Ou s3://Ensuite, toutes les sorties sont stockées dans le cloud
workspace_directory=gs://resources/
C'est très pratique car vous pouvez changer le code sans aucune modification contrairement au cas de Luigi seul.
Vous pouvez spécifier le paramètre de tâche dans la variable d'environnement en écrivant paramètre = $ {variable d'environnement}
dans le fichier de configuration.
Ceci est très pratique lorsque vous souhaitez séparer le test de la production, ou lorsque vous souhaitez modifier la sauvegarde dans le cloud pour chaque environnement à exécuter.
[TaskOnKart]
workspace_directory=${WORK_SPACE}
[feature.LoadTrainTask]
is_test=${IS_TEST}
-. zshrc
export IS_TEST=False
datetime=`date "+%m%d%H%Y"`
export WORK_SPACE="gs://data/output/${datetime}"
Personnellement, je voudrais le tourner un peu pour confirmation localement avant de le tourner étroitement avec GCE, mais luigi.cfg
est très utile car je veux en utiliser un commun.
Si vous utilisez gokart. (List) TaskInstanceParameter
, vous pouvez prendre Task comme paramètre de Task. Cela vous permet de réutiliser la même tâche en créant une tâche qui ne dépend pas d'une tâche spécifique, augmentant ainsi la possibilité d'écrire un code plus flexible.
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
class LoadWineData(gokart.TaskOnKart):
def run(self):
data = load_wine()['data']
self.dump(data)
class LoadWineTarget(gokart.TaskOnKart):
def run(self):
target = load_wine()['target']
self.dump(target)
class Trainer(gokart.TaskOnKart):
#Prenez Task comme argument
data_task = gokart.TaskInstanceParameter(description='data for train')
target_task= gokart.TaskInstanceParameter(description='target for train')
def requires(self):
return {'data': self.data_task, 'target': self.target_task}
def run(self):
data = self.load('data')
target = self.load('target')
model = LogisticRegression()
model.fit(data, target)
self.dump(model)
class ExcuteTrain(gokart.TaskOnKart):
def requires(self):
#Injecter la tâche
return Trainer(data_task=LoadWineData(), target_task=LoadWineTarget())
def run(self):
trained_model = self.load()
self.dump(trained_model)
Il est possible de notifier slack en écrivant ce qui suit dans le fichier de paramètres. Du point de vue de la sécurité, il est préférable de définir le jeton comme une variable d'environnement au lieu d'une écriture solide.
[SlackConfig]
token=${SLACK_TOKEN}
channel=channel_name
to_user=hase_hiro
Je vous serais reconnaissant si vous pouviez signaler toute différence de comportement.
Recommended Posts