Unter dem Ärger von "Spark? Es ist cool mit Donnerattributen!" Habe ich ein einfaches maschinelles Lernen mit Docker + Spark + Jupyter Notebook versucht. Unter Verwendung der bekannten Titanic-Daten haben wir eine Vorhersage durch lineare Regression gemacht.
Spark ist eine der verteilten Verarbeitungsbibliotheken. Viele Leute denken an verteilte Verarbeitung als Hadoop, aber aus meiner Sicht ist Spark eine Bibliothek, die die Mängel von Hadoop ausgleicht. Hadoop erschien zuerst im Jahr 2006 und dann im Jahr 2014.
Hadoop VS Spark
Der obige Funke soll die Mängel von Hadoop kompensiert haben, aber da beide Vor- und Nachteile haben, werden sie kurz in einer Tabelle zusammengefasst.
verdienen | Fehler | |
---|---|---|
Hadoop | Kann große Datenmengen verarbeiten | Aufgrund des Speicherzugriffs nicht gut in Echtzeit zu verarbeiten |
Spark | Gute Echtzeitverarbeitung durch On-Memory-Verarbeitung | Kann nicht so große Datenmengen verarbeiten wie Hadoop |
Mit anderen Worten, verwenden Sie Hadoop für zu große Daten und Spark, wenn Sie in Echtzeit verarbeiten möchten.
Die Abfrage-Engines von Hadoop sind Presto und Hive, aber Spark verfügt über eine Vielzahl von APIs, die problemlos aus Sprachen wie Python und Scala aufgerufen werden können.
Docker Setup
Laden Sie zuerst das Bild unten herunter und erstellen Sie es.
$ docker pull jupyter/pyspark-notebook
$ docker run --name spark_test -it -p 8888:8888 -v $PWD:/home/jovyan/work jupyter/pyspark-notebook
Sie können die Notiz öffnen, indem Sie auf die oben angezeigte URL zugreifen.
Python
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, QuantileDiscretizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#Daten gelesen
titanic_df = spark.read.csv('./titanic/train.csv', header='True', inferSchema='True')
#Fehlende Wertkorrespondenz
titanic_df = titanic_df.na.fill({"Embarked" : 'S'})
#Löschen Sie unnötige Spalten
titanic_df = titanic_df.drop("Cabin")
#Spaltenaddition durch Konstante
titanic_df = titanic_df.withColumn('Alone', lit(0))
#Bedingte Werteinfügung
titanic_df = titanic_df.withColumn("Alone", when(titanic_df["Family_Size"] == 0, 1).otherwise(titanic_df["Alone"]))
#Etikettencodierung
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic_df) for column in ["Sex", "Embarked", "Initial"]]
pipeline = Pipeline(stages=indexers)
titanic_df = pipeline.fit(titanic_df).transform(titanic_df)
#Testaufteilung
trainingData, testData = feature_vector.randomSplit([0.8, 0.2], seed=9)
Es gibt verschiedene andere Prozesse, aber ich werde nur die bemerkenswerten lassen. Siehe unten für eine detailliertere Datenverarbeitung. https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5722190290795989/3865595167034368/8175309257345795/latest.html
#Lernen
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
lrModel = lr.fit(trainingData)
#Inferenz
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Survived", "features").show()
#Auswertung
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))
Darüber hinaus können die folgenden Modelle als Bibliothek verwendet werden.
Es ist langsam vor Ort!
Offensichtlich hatte diese Datenebene keinen Vorteil, da es sich nur bei der Verarbeitung großer Datenmengen um einen verteilten Prozess handelt. Ich möchte Geschwindigkeit und Genauigkeit vergleichen, wenn ich auf zu große Daten stoße.
Recommended Posts