Vorheriger Artikel Ein Memorandum über das Verfahren zum Ausführen der Pipeline für maschinelles Lernen mit Cloud Dataflow.
Das letzte Mal habe ich maschinelles Lernen mit Scikit-Learn und Pandas durchgeführt, die auf dem in Cloud Dataflow verwendeten Knoten vorinstalliert waren. In der eigentlichen Pipeline für maschinelles Lernen möchte ich jedoch eine Bibliothek für die Vorverarbeitung wie OpenCV einrichten. Sie möchten verschiedene Bibliotheken für maschinelles Lernen installieren und analysieren. Dieses Mal möchte ich als Beispiel für die Installation einer optionalen Bibliothek in Cloud Dataflow schreiben, wie XGBoost installiert und ausgeführt wird, was auch in Kaggle beliebt ist.
Wie Sie im offiziellen Dokument sehen können, gibt es drei Möglichkeiten, Ihre Lieblingsbibliothek in Cloud Dataflow (Python) zu installieren. ist.
Bibliotheken wie XBGoost erfordern einen dritten Schritt. Im Folgenden erfahren Sie, wie Sie eine benutzerdefinierte setup.py für XB Goost erstellen.
Wie Sie im Beispielcode des offiziellen Repositorys sehen können, schreiben Sie gemäß den Setup-Tools. Ich werde. Wenn es eine Python-abhängige Bibliothek gibt, beschreiben Sie diese in install_requires. Wenn Sie jedoch Ihren eigenen Shell-Befehl zur Installation ausführen möchten, erstellen Sie eine Befehlsklasse, die die Befehlsklasse von setuptools erbt, und beschreiben Sie die Befehlsausführungsprozedur. Das Folgende ist der XGBoost-Installationsbefehl, der basierend auf dem obigen offiziellen Beispielcode geschrieben wurde. Grundsätzlich folgt es der Installationsprozedur von XGBoost, aber um es mit subprocess.Popen auszuführen, wird der Ausführungspfad für den Befehl angegeben, der das Verschieben des Verzeichnisses beinhaltet.
setup.py
from distutils.command.build import build as _build
import subprocess
import setuptools
class build(_build):
sub_commands = _build.sub_commands + [('CustomCommands', None)]
CUSTOM_COMMANDS = [(["sudo","apt-get","update"],"."),
(["sudo","apt-get","install","git","build-essential","libatlas-base-dev","-y"],"."), #"python-dev","gfortran"
(["git","clone","--recursive","https://github.com/dmlc/xgboost"],"."),
(["sudo","make"],"xgboost"),
(["sudo","python","setup.py","install"],"xgboost/python-package"),
(["sudo","pip","install","xgboost"],".")]
class CustomCommands(setuptools.Command):
def initialize_options(self):
pass
def finalize_options(self):
pass
def RunCustomCommand(self, command_list):
print 'Running command: %s' % command_list[0]
p = subprocess.Popen(command_list[0], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=command_list[1])
stdout_data, _ = p.communicate()
print 'Command output: %s' % stdout_data
if p.returncode != 0:
raise RuntimeError('Command %s failed: exit code: %s' % (command_list[0], p.returncode))
def run(self):
for command in CUSTOM_COMMANDS:
self.RunCustomCommand(command)
REQUIRED_PACKAGES = []
setuptools.setup(
name='xgboost-install-examble',
version='0.0.1',
description='xgboost workflow packages.',
packages=setuptools.find_packages(),
#install_requires=REQUIRED_PACKAGES,
cmdclass={
'build': build,
'CustomCommands': CustomCommands,
}
)
Geben Sie die Datei setup.py nach dem Erstellen mit der Option setup_file der Pipeline an. Das Folgende ist ein Beispiel für die Angabe von Optionen in Python-Code. (In meiner Umgebung schien es nicht zu funktionieren, es sei denn, ich habe es mit dem vollständigen Pfad angegeben.)
xgboost_pipeline.py
setup_options = options.view_as(beam.utils.pipeline_options.SetupOptions)
setup_options.setup_file = "/home/orfeon/dataflow-sample/python/xgboost/setup.py" # set fullpath
Führen Sie Pipeline mit der Option aus. Wenn die Installation erfolgreich ist, wird der xgboost-Code in der Pipeline ausgeführt. Der folgende Code, mit dem ich den Teil, der den Regressor von sklearn verwendete, durch xgboost im Code von vorheriger Artikel ersetzt habe, hat funktioniert.
xgboost_pipeline.py(learn_innen vorhersagen)
~ Ausgelassen ~
year_max = data["year"].max()
train = data[data["year"] < year_max]
test = data[data["year"] == year_max]
dtrain = xgb.DMatrix(train[learn_attrs].values, label=train[target_attr].values, feature_names=learn_attrs)
dtest = xgb.DMatrix(test[learn_attrs].values, label=test[target_attr].values, feature_names=learn_attrs)
evals_result = {}
watchlist = [(dtrain, 'train'),(dtest, 'test')]
best_params = {'objective': 'reg:linear',
'eval_metric': 'rmse',
'learning_rate': 0.01,
'max_depth': 3,
'colsample_bytree': 0.65,
'subsample': 0.55,
'min_child_weight': 7.0,
'reg_alpha': 0.6,'reg_lambda': 0.7,'gamma': 1.2}
model = xgb.train(best_params, dtrain,
num_boost_round=1000,
evals=watchlist,
evals_result=evals_result,
verbose_eval=True)
test.loc[:, "predict"] = model.predict(dtest)
Wenn Sie eine benutzerdefinierte setup.py erstellen, können Sie verschiedene Bibliotheken in Dataflow ausführen. Der Datenfluss wird nicht nur für die Massenausführung unter Verwendung Ihrer bevorzugten Bibliothek für maschinelles Lernen verwendet, sondern auch für die einfache Ausführung der Vorverarbeitung, die eine große Menge an Computerressourcen erfordert, z. B. die Bilddatenverarbeitung unter Verwendung einer Bildverarbeitungsbibliothek. Ich denke. Es gibt noch einige Einschränkungen, z. B. nur mit Python2 zu arbeiten und nicht im Streaming-Modus, aber es scheint, dass dies schrittweise unterstützt wird.
Obwohl der diesmal ausgeführte XG-Boost verteilt ist, bestand das Lernen selbst darin, dass er auf einem einzelnen Knoten ausgeführt wurde, aber die Version, die der Algorithmus, der auf mehrere Knoten verteilt und lernt, funktioniert (XGBoost4J 2016/03/14 / xgboost4j-portable-verteilte-xgboost-in-spark-flink-and-dataflow.html)) wird ebenfalls entwickelt. Derzeit scheint es auf Spark, Flink als Ausführungsmodul zu laufen, aber Development Roadmap unterstützt auch Cloud Dataflow (Apache Beam). Ich freue mich auf die Zukunft.
Recommended Posts