[GO] Exécutez XGBoost avec Cloud Dataflow (Python)

Article précédent Un mémorandum de la procédure d'exécution du pipeline d'apprentissage automatique avec Cloud Dataflow.

Chose que tu veux faire

La dernière fois, j'ai fait du machine learning à l'aide de scikit-learn et de pandas préinstallés sur le nœud utilisé dans Cloud Dataflow, mais dans le pipeline de machine learning, j'aime mettre une bibliothèque pour le prétraitement telle qu'OpenCV. Vous souhaiterez installer et analyser diverses bibliothèques d'apprentissage automatique. Cette fois, à titre d'exemple de la procédure d'installation d'une bibliothèque facultative dans Cloud Dataflow, j'aimerais écrire comment installer et exécuter XGBoost, qui est également populaire dans Kaggle.

Comment installer la bibliothèque dans Dataflow

Comme vous pouvez le voir dans le Document officiel, il existe trois méthodes principales pour installer votre bibliothèque préférée dans Cloud Dataflow (Python). est.

Les bibliothèques comme XBGoost nécessitent une troisième étape. Ci-dessous, nous verrons comment créer un setup.py personnalisé pour XB Goost.

Comment créer un setup.py personnalisé

Comme vous pouvez le voir dans l 'exemple de code du référentiel officiel, écrivez en fonction des outils de configuration. Je vais. S'il existe une bibliothèque dépendante de Python, décrivez-la dans install_requires, mais si vous souhaitez exécuter votre propre commande shell pour l'installation, créez une classe de commande qui hérite de la classe Command de setuptools et décrivez la procédure d'exécution de la commande. Voici la commande d'installation de XGBoost écrite sur la base de l'exemple de code officiel ci-dessus. Fondamentalement, la procédure d'installation de XGBoost est suivie, mais pour exécuter avec subprocess.Popen, le chemin d'exécution est spécifié pour la commande qui implique le déplacement du répertoire.

setup.py


from distutils.command.build import build as _build
import subprocess
import setuptools


class build(_build):
    sub_commands = _build.sub_commands + [('CustomCommands', None)]

CUSTOM_COMMANDS = [(["sudo","apt-get","update"],"."),
                   (["sudo","apt-get","install","git","build-essential","libatlas-base-dev","-y"],"."), #"python-dev","gfortran"
                   (["git","clone","--recursive","https://github.com/dmlc/xgboost"],"."),
                   (["sudo","make"],"xgboost"),
                   (["sudo","python","setup.py","install"],"xgboost/python-package"),
                   (["sudo","pip","install","xgboost"],".")]


class CustomCommands(setuptools.Command):

    def initialize_options(self):
        pass

    def finalize_options(self):
        pass

    def RunCustomCommand(self, command_list):
        print 'Running command: %s' % command_list[0]
        p = subprocess.Popen(command_list[0], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=command_list[1])
        stdout_data, _ = p.communicate()
        print 'Command output: %s' % stdout_data
        if p.returncode != 0:
            raise RuntimeError('Command %s failed: exit code: %s' % (command_list[0], p.returncode))

    def run(self):
        for command in CUSTOM_COMMANDS:
            self.RunCustomCommand(command)


REQUIRED_PACKAGES = []

setuptools.setup(
    name='xgboost-install-examble',
    version='0.0.1',
    description='xgboost workflow packages.',
    packages=setuptools.find_packages(),
    #install_requires=REQUIRED_PACKAGES,
    cmdclass={
        'build': build,
        'CustomCommands': CustomCommands,
    }
)

Après avoir créé setup.py, spécifiez-le avec l'option setup_file de pipeline. Voici un exemple de spécification d'options dans le code Python. (Dans mon environnement, cela ne semblait pas fonctionner à moins que je ne le spécifie avec le chemin complet)

xgboost_pipeline.py


setup_options = options.view_as(beam.utils.pipeline_options.SetupOptions)
setup_options.setup_file = "/home/orfeon/dataflow-sample/python/xgboost/setup.py" # set fullpath

Exécutez Pipeline avec l'option, et si l'installation réussit, le code xgboost s'exécutera dans le pipeline. Le code suivant que j'ai remplacé la partie qui utilisait le régresseur de sklearn par xgboost dans le code de article précédent fonctionnait.

xgboost_pipeline.py(learn_à l'intérieur de prévoir)


~ Omis ~

year_max = data["year"].max()
train = data[data["year"] <  year_max]
test  = data[data["year"] == year_max]

dtrain = xgb.DMatrix(train[learn_attrs].values, label=train[target_attr].values, feature_names=learn_attrs)
dtest  = xgb.DMatrix(test[learn_attrs].values, label=test[target_attr].values, feature_names=learn_attrs)

evals_result = {}
watchlist = [(dtrain, 'train'),(dtest, 'test')]
best_params = {'objective': 'reg:linear',
               'eval_metric': 'rmse',
               'learning_rate': 0.01,
               'max_depth': 3,
               'colsample_bytree': 0.65,
               'subsample': 0.55,
               'min_child_weight': 7.0,
               'reg_alpha': 0.6,'reg_lambda': 0.7,'gamma': 1.2}

model = xgb.train(best_params, dtrain,
                  num_boost_round=1000,
                  evals=watchlist,
                  evals_result=evals_result,
                  verbose_eval=True)

test.loc[:, "predict"] = model.predict(dtest)

en conclusion

Si vous créez un setup.py personnalisé, vous pourrez exécuter diverses bibliothèques sur Dataflow. Dataflow ne sera pas utilisé uniquement pour une exécution en masse à l'aide de votre bibliothèque d'apprentissage automatique préférée, mais également pour une exécution facile du prétraitement qui nécessite une grande quantité de ressources informatiques telles que le traitement des données d'image à l'aide d'une bibliothèque de traitement d'image. Je pense. Il y a encore quelques restrictions, comme travailler uniquement avec Python2 et ne pas fonctionner en mode streaming, mais il semble qu'il sera progressivement pris en charge.

* Prise en charge de Dataflow pour la version distribuée XGBoost

Bien que le boost XG qui a été exécuté cette fois soit distribué, l'apprentissage lui-même était qu'il fonctionne sur un seul nœud, mais la version que l'algorithme qui distribue et apprend à plusieurs nœuds fonctionne (XGBoost4J 2016/03/14 / xgboost4j-portable-shared-xgboost-in-spark-flink-and-dataflow.html)) est également en cours de développement. Actuellement, il semble fonctionner sur Spark, Flink en tant que moteur d'exécution, mais Development Roadmap inclut également la prise en charge de Cloud Dataflow (Apache Beam). J'ai hâte à l'avenir.

Recommended Posts

Exécutez XGBoost avec Cloud Dataflow (Python)
Exécutez un pipeline de machine learning avec Cloud Dataflow (Python)
Exécutez Cloud Dataflow (Python) depuis AppEngine
Exécutez Python avec VBA
Exécutez prepDE.py avec python3
Exécutez Blender avec python
Tutoriel Cloud Run (python)
Exécutez iperf avec python
Exécutez python avec PyCharm (Windows)
Exécutez Python avec CloudFlash (arm926ej-s)
Exécuter Label avec tkinter [Python]
[Package cloud] Gérez les packages python avec le package cloud
Exécutez Rotrics DexArm avec l'API Python
Exécutez mruby avec Python ou Blender
Exécutez Aprili depuis Python sur Orange
Exécutez python3 Django1.9 avec mod_wsgi (déployer)
Jusqu'à ce que Python fonctionne sur Apache
Modifications pour exécuter "Utilisation de Cloud Dataflow + Cloud Pub / Sub + Fluentd ..." avec le SDK 2.1
Exécutez IDCF Cloud CLI sur Docker
Exécutez le servo avec Python sur ESP32 (Windows)
Utiliser Tabpy avec Cloud Run (sur GKE)
Essayez d'utiliser Python avec Google Cloud Functions
[GCP] Exploitez Google Cloud Storage avec Python
Text mining avec Python ② Visualisation avec Word Cloud
Exécutez une application Web Python avec Docker
Traitez le Big Data avec Dataflow (ApacheBeam) + Python3
FizzBuzz en Python3
Grattage avec Python
Statistiques avec python
Grattage avec Python
Python avec Go
Twilio avec Python
Intégrer avec Python
Jouez avec 2016-Python
AES256 avec python
Testé avec Python
python commence par ()
avec syntaxe (Python)
Bingo avec python
Zundokokiyoshi avec python
Excel avec Python
Micro-ordinateur avec Python
Cast avec python
Extraction de texte avec l'API GCP Cloud Vision (Python3.6)
Essayez-le avec JupyterLab en Python japonais Word Cloud.
Python> Exécuter avec des arguments d'exécution> Utiliser import argparse
Réalisez facilement des micro-services avec Cloud Run x Flask
Exécutez l'API vSphere de VMware vSphere 6 avec le script Python (pyvmomi)
Exécutez Flask sur CentOS avec python3.4, Gunicorn + Nginx.
Utilisez Python / Django avec Windows Azure Cloud Service!
[Cloud102] # 1 Premiers pas avec Python (première partie des premiers pas de Python)
Comment activer python3 pour exécuter des tâches lors de l'envoi de tâches de GCP Cloud Composer vers Dataflow
Communication série avec Python
Zip, décompressez avec python
Django 1.11 a démarré avec Python3.6
Jugement des nombres premiers avec Python
Communication de socket avec Python
Analyse de données avec python 2
Essayez de gratter avec Python.