[PYTHON] Leistungsüberprüfung der Datenvorverarbeitung für maschinelles Lernen (numerische Daten) (Teil 2)

Erstausgabe: 2020/3/18
Verfasser: Soichi Takashige, Masahiro Ito, Hitachi, Ltd.

Einführung

In diesem Beitrag werden wir das Design-Know-how der Datenvorverarbeitung und die Ergebnisse der Leistungsüberprüfung der Datenvorverarbeitung beim Entwurf eines Systems vorstellen, das ein Modell für maschinelles Lernen enthält.

In der dritten Folge werden wir das Know-how zur Leistungsverbesserung und die Überprüfungsergebnisse bei der Datenvorverarbeitung mit Spark, einer parallel verteilten Verarbeitungsplattform, vorstellen.

** Beitragsliste: **

  1. Informationen zur Datenvorverarbeitung des Systems mithilfe von maschinellem Lernen
  2. Leistungsüberprüfung der Datenvorverarbeitung für maschinelles Lernen (numerische Daten) (Teil 1)
  3. Leistungsüberprüfung der Datenvorverarbeitung für maschinelles Lernen (numerische Daten) (Teil 2) (veröffentlicht)

Verwendung von Spark bei der Datenvorverarbeitung

In meinem letzten Beitrag habe ich gezeigt, dass Ihnen bei einer großen Datenmenge der Arbeitsspeicher ausgeht, wenn Sie die Datenvorverarbeitung mit Python unter Verwendung eines einzelnen Knotens durchführen. In solchen Fällen ist es häufig effektiv, eine parallele verteilte Verarbeitungsplattform zu verwenden. Dieses Mal werden wir die Datenvorverarbeitung durch Spark einführen, eine typische parallele verteilte Verarbeitungsplattform.

Umschreiben der Verarbeitung gemäß der Datenverarbeitungsmethode

Bei der Entwicklung eines Systems, das maschinelles Lernen verwendet, wie im ersten Beitrag beschrieben, wird der Fall, in dem zuerst PoC durchgeführt wird, um die Wirksamkeit des maschinellen Lernens zu bestätigen, und dann das Produktionssystem basierend auf dem Ergebnis entwickelt. In dieser PoC-Phase wird die Datenvorverarbeitung jedoch häufig in Python implementiert. Wenn Sie Spark bei der Entwicklung eines Produktionssystems verwenden möchten, müssen Sie daher Ihren Python-Code für Spark neu schreiben. Dieses Mal wurde die Datenvorverarbeitung durch den Pandas-Datenrahmen für das BigBench-Geschäftsszenario Nr. 5 gemäß den folgenden Richtlinien in die Spark-Verarbeitung umgeschrieben.

import numpy as np
import pandas as pd


```python:After(spark)
import pyspark
import pyspark.sql
form pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
spark = SparkSession.builder.master(…) #Beschrieb die Verbindungseinstellungen mit Spark
sc = spark.getContext()
hive_context = HiveContext(sc)

df = pd.DataFrame()


```python:After(spark)
df = spark.createDataFrame(data, schema)

df = pd.read_csv(filename)


```python:After(spark)
df = spark.read.csv(filename)

df.loc[df[“columnname”]notnull(),:]


```python:After(spark)
df.filter(df[“columnname”].isNotNull())

df.loc[rownumber]


```python:After(spark)
#Ändern Sie die Logik so, dass der Zugriff auf eine bestimmte Zeile zeitaufwändig ist und keinen Zeilenzugriff erfordert

Die Art und Weise, wie Sie auf eine bestimmte Datenfolge zugreifen, ist für Pandas und Spark gleich.

df[‘columnname’]

df.loc[df[‘columnname’] == value]


```python:After(spark)
df.filter(df[‘columnname’] == value)

pd.merge(df1, df2, how=’inner’, left_on=”l-key” right_on=”r-key”)


```python:After(spark)
df1.join(df2, df1.l_key == df2.r_key, ‘inner’)

def function(df): #Etwas tun return data df.groupby(“key”).apply(function)


```python:After(spark)
out_schema = StructType(...) #out_Schema ist die Definition des von der Funktion ausgegebenen Datenschemas
@pandas_udf(out_schema, PandasUDFType.GROUPED_MAP) 
def function(df):
 #Tun Sie etwas * Abhängig von den Spezifikationen von Pandas UDF kann es erforderlich sein, neu zu schreiben (später beschrieben).
 return data
df.groupby(“key”).apply(function)

Umschreiben von Pandas UDF

Pandas UDF ist eine der Möglichkeiten, die Python-Verarbeitung mit Pandas in Spark auszuführen. Pandas UDF ist ein Mechanismus der Zusammenarbeit zwischen Python und Spark, der von Spark bereitgestellt wird, und es ist möglich, eine parallele verteilte Verarbeitung in Spark für die in Python (Pandas) implementierte Verarbeitung durchzuführen. Die drei in Tabelle 1 gezeigten Funktionen sind in Pandas UDF verfügbar und können über die Datenrahmenoperationen von Spark aufgerufen werden.

Tabelle 1 Arten von Python-Funktionen, die mit Pandas UDF verwendet werden können

# Verarbeitungsfunktionstyp Erläuterung der Verarbeitung, die von der Funktion ausgeführt werden kann Szene verwenden
1 SCALAR Einerpandas.Seriesを受け取り、処理を行ったうえで、Einerpandas.SeriesGib es zurück. Der Elementname und der Elementtyp müssen auf der Ein- und Ausgabe übereinstimmen. Gilt für die nächste Iteration für eine Datenzeichenfolge
  • map
  • apply_along_axis
2 AGG Ein oder mehrpandas.SeriesErhalten,Nach einiger Verarbeitung einepandas.SeriesGib es zurück. Die Anzahl der Eingabe- und Ausgabedaten muss nicht übereinstimmen. Gilt für die Aggregationsverarbeitung
  • agg
3 GROUPED_MAP Einerpandas.DataFrameWird empfangen, wird für jedes Element eine Verarbeitung ausgeführt, und als Ergebnis für ein anderespandas.DataFrameGibt einen zurück. Eingabe- und Ausgabedatenzeilen, Spaltennamen usw. müssen nicht übereinstimmen Gilt für jeden Datensatz im gruppierten Datensatz
  • groupby.apply

Beim Schreiben der groupby.apply-Verarbeitung in Pandas durch Registrieren der an diese` apply übergebenen Python-Funktion als obige GROUPED_MAP-Funktion, parallele Ausführung durch Spark ohne den Aufwand einer erneuten Implementierung usw. Kann realisiert werden. Die Funktionen "Anwenden" von Pandas und Spark haben jedoch leicht unterschiedliche Spezifikationen und müssen möglicherweise geändert werden. In dieser Implementierung mussten die Unterschiede zwischen den folgenden beiden Spezifikationen berücksichtigt werden, daher haben wir sie neu geschrieben. In der Erklärung ist "df" der Variablenname, der an den Datenrahmen übergeben wird, "keyname" ist der Spaltenname des Datenrahmens und "function" ist der Funktionsname, der den iterativen Prozess definiert.

  1. ** Unterschied in den Argumentwerten **
Verhaltensunterschied Wenn Sie
`df.groupby (" keyname ") .apply (function)` ausführen, werden die Daten zuerst nach dem gleichen Wert von` keyname` gruppiert und dann in einem Datenrahmen gruppiert. .. Das "DataFrame" -Objekt jeder Bibliothek wird an das Argument der Funktion "function" (df) übergeben, die durch die Funktion "apply" sowohl in Pandas als auch in Spark angegeben wird. In Pandas hat dieser "DataFrame" eine Eigenschaft namens "` df.name` ", mit der Sie den Wert von" keyname "in einer Gruppe abrufen können. Auf der anderen Seite hat Spark keine solche Eigenschaft.
Gegenmaßnahmen Der an die
-Funktion `function (df )` übergebene Datenrahmen enthält im Prinzip auch das Element` keyname`. Sie können den Wert von `name` erhalten, indem Sie` name = df [“ keyname ”] [0]` setzen.
  1. ** Unterschied in der Handhabung des Rückgabewerts **
Verhaltensunterschied In
Pandas ändert `df.groupby (" Schlüsselname "). Apply (Funktion)` die Interpretation entsprechend dem Spaltennamen und Datentyp, der im Rückgabewert des Ergebnisses enthalten ist. Insbesondere erstellt die Funktion "groupby" automatisch Daten mit der Spalte "keyname" als Index für die Ausgabedaten, und durch Aggregation wird eine "Series" oder "DataFrame" erstellt. Andererseits führt Spark eine solche automatische Spaltenvervollständigung nicht durch.
Gegenmaßnahmen Konfigurieren Sie den Rückgabewert von
`function (df)` immer so, dass` keyname` als Spaltenname verwendet wird, und geben Sie den `keyname`-Wert der Originaldaten als diesen Wert an.

BigBench Business-Szenario Nr. 5 Spark-Code-Beispiel

Als Ergebnis der Umschreibung der Implementierung der Datenvorverarbeitung des BigBench-Geschäftsszenarios Nr. 5 durch Python, die im zweiten Beitrag an Spark unter Verwendung des bisher erwähnten Know-hows gezeigt wurde, ist das Endergebnis wie in Abbildung 1 unten dargestellt. Es wurde ein Code.

import pandas as pd
import numpy as np

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType, when
from pyspark.sql.types import *
sc = SparkContext.getOrCreate()
hive_context = HiveContext(sc)

web_clickstreams = hive_context.read.table("bigbench.web_clickstreams")
item = hive_context.read.table("bigbench.item")
customer = hive_context.read.table("bigbench.customer")
customer_demographics = hive_context.read.table("bigbench.customer_demographics")

#Verarbeitung ①
data = web_clickstreams.filter(web_clickstreams[“wcs_user_sk”].isNotNull())
data = data.join(item, data["wcs_item_sk"] == item["i_item_sk"], 'inner')

#Prozess (2): Nach Benutzer-ID gruppieren
grouped_users = data.groupby('wcs_user_sk')

#Prozess ③ Typdefinition: Definieren Sie den Ausgabedatentyp der iterativen Verarbeitung
types =  ["wcs_user_sk", "clicks_in_category"]+["clicks_in_%d"%i for i in range(1,8)]
out_schema = StructType([StructField(i, IntegerType(), True) for i in  types])

#Prozess ③ Registrierung: Registrieren Sie den Inhalt des iterativen Prozesses in Pandas UDF als Funktion
@pandas_udf(out_schema, PandasUDFType.GROUPED_MAP)
def summarize_per_user(wcs_user_sk_contents):
    wcs_user_sk_index = wcs_user_sk_contents['wcs_user_sk'][0]
    #Verarbeitung ③-1, ③-2
    clicks_in_category = \
        len(wcs_user_sk_contents[wcs_user_sk_contents['i_category'] == i_category_index])
    clicks_in = [0] * 8
    for name, df in wcs_user_sk_contents.groupby('i_category_id'):#Schleife einmal optimieren
        if name < len(clicks_in):
            clicks_in[name] = len(df.index)
    #Verarbeitung ③-3
    return pd.DataFrame([wcs_user_sk_index, clicks_in_category] + clicks_in[1:],\
                        columns=types)

#Prozess ③ Ausführung
i_category_index = 'Books'
data = grouped_users.apply(summarize_per_user)

#Verarbeitung ④
data = data.join(customer, data["wcs_user_sk"] == customer["c_customer_sk"], 'inner')

#Verarbeitung ⑤
data = data.join(customer_demographics, \
     data["c_current_cdemo_sk"] == customer_demographics["cd_demo_sk"], 'inner')

#Verarbeitung ⑥
data.withColumn('college_education', 
                      when(data["cd_education_status"] == 'Advanced Degree', 1)\
                     .when(data["cd_education_status"] == 'College', 1)\
                     .when(data[“cd_education_status”] == '4 yr Degree', 1)\
                     .when(data[“cd_education_status”] == '2 yr Degree', 1)\
                     .otherwise(0))
data.withColumn('male', when(data[“cd_gender”] == 'M', 1).otherwise(0))

#Ergebnisse speichern
data.write.mode('append').parquet('answer_q05_0100.parquet')

Abbildung 1 Quellcode für die Datenvorverarbeitung für BigBench-Szenario Nr. 5 von Spark

Effektüberprüfung mit Spark

Von hier aus werden wir die Leistung der in Abb. 1 implementierten Datenvorverarbeitung durch Spark überprüfen. Abbildung 2 unten zeigt das Servicelayout des Spark-Clusters, das in dieser Überprüfung als Umgebung für die parallele verteilte Verarbeitung erstellt wurde. Dieses Mal erstellen wir einen Spark-Cluster mithilfe der Cloudera-Distribution unter der Annahme eines lokalen Anwendungsfalls. Darüber hinaus wird ein Knoten des Worker Node für die Überprüfung der Verarbeitung einzelner Knoten durch Python verwendet, die im zweiten Beitrag durchgeführt wurde. Bei der Verarbeitung in Spark wird die eigentliche parallele verteilte Verarbeitung auf 3 Arbeiterknoten ausgeführt.

spark

Abbildung 2 Servicelayout dieser Überprüfungsumgebung

Als nächstes zeigt Tabelle 2 die Spezifikationen dieser Verifizierungsumgebung. Dieses Mal wird IaaS (EC2-Instanz) unter AWS als Verifizierungsmaschine verwendet. Fünf 1-TB-Festplatten (EBS) sind mit der EC2-Instanz für Worker Node verbunden, und pro Knoten ist eine Kapazität von 5 TB angeschlossen. Da HDFS jedoch Daten in 3 Multiplexen schreibt, beträgt die effektive Datenkapazität etwa 5 TB.

Tabelle 2 Hardwarespezifikationen der Verifizierungsumgebung

Manager Node Master Node Worker Node×3
Überprüfungsumgebung AWS EC2 AWS EC2 AWS EC2
OS CentOS 7 64bit CentOS 7 64bit CentOS 7 64bit
CPU(Zahl der Kerne) 2 4 96 (32 x 3 Knoten)
Memory(GB) 16 32 768 (256 GB x 3 Knoten)
HDD(GB) 80GB 80GB 15TB※(1 TB x 5 HDD x 3 Knoten)

Die zur Überprüfung verwendeten Softwareversionen sind in der folgenden Tabelle 3 aufgeführt.

Tabelle 3 Softwareversion der Überprüfungsumgebung

Software Ausführung
Cloudera Verteilung CDH 6.3.0
Spark 2.4.0
Hive 2.1.1
YARN 2.5.0
HDFS 3.0.0
Python 3.7.3
Pandas 0.24.2
Numpy 1.16.4

Verarbeitungsmethode zu vergleichen

Zusätzlich zu den Ergebnissen der Leistungsmessung der Verarbeitungsmethoden 1 und 2, die im vorherigen 2. Beitrag überprüft wurden, werden wir die Leistung (3.) messen und vergleichen, wenn eine parallele verteilte Verarbeitung durch Spark durchgeführt wird. ..

  1. Einzelknotenverarbeitung mit Python (ohne Logikoptimierung in Abbildung 5 in Teil 2)

Führen Sie den Code in Abbildung 4 aus Teil 2 unter Python aus.

  1. Einzelknotenverarbeitung durch Python (mit Logikoptimierung in Abbildung 5 des zweiten Beitrags)

Führen Sie den in Abbildung 5 des zweiten Beitrags optimierten Code mit dem Code in Abbildung 4 des zweiten Beitrags unter Python aus.

  1. Parallele verteilte Verarbeitung durch Spark (mit Logikoptimierung in Abbildung 5 des zweiten Beitrags)

Führen Sie die parallele verteilte Verarbeitung auf Spark mit dem in 2. verwendeten logisch optimierten Python-Einzelknoten-Verarbeitungscode aus, der für die Spark-Verarbeitung geändert wurde (siehe Abbildung 1). ..

Ausführungsparameter während der Verarbeitung in Spark

Die Spark-Ausführungsparameter zum Ausführen von Aufgaben werden wie in Tabelle 4 gezeigt festgelegt. Jeder Worker-Knoten startet einen Worker-Prozess (Executor) und weist ihn zu, sodass Speicher und Kern ausschließlich darin verwendet werden können.

Tabelle 4 Spark-Ausführungsparameter

# Artikel Wert einstellen Bemerkungen
1 Anzahl der Ausführenden 3 Angenommen, jeder Knoten wird einzeln gestartet
2 Executor-Speichergröße 128 GB
3 Anzahl der Kerne pro Executor 30 Ordnen Sie 30 Kerne von 32 Kernen der Maschine zu und verwenden Sie sie

Verarbeitung der zu messenden Inhalte und Daten

Bei der Messung wird die für die folgenden drei Prozesse erforderliche Gesamtzeit auf dieselbe Weise gemessen wie die Messung für den zweiten Pfosten.

  1. Lesen von Daten aus einer Datenquelle in den Speicher

  2. Vorverarbeitung wie Datenkombination und Aggregation für die gelesenen Daten

  3. Schreiben Sie das Verarbeitungsergebnis in den Datenspeicher

Außerdem wurden die zu messenden Daten auf die gleichen Einstellungen wie in Teil 2 eingestellt (Tabelle 3 im zweiten Beitrag).

Ergebnisse der Leistungsmessung

Abbildung 3 zeigt die Ergebnisse der Bewertung der Verarbeitungszeit, indem jeder der vier Verarbeitungstypen für jede Datengröße für das Geschäftsszenario Nr. 5 von BigBench ausgeführt wird.

performance

Abbildung 3 Messergebnisse der Datenvorverarbeitungszeit für jede Eingangsdatengröße

Bei der Einzelknotenverarbeitung durch Python ist als Ergebnis des zweiten Beitrags die Größe des Datensatzes kleiner als die für die Produktion angenommene Größe des Datensatzes (ca. 50 GB), wenn die Eingabedatengröße erhöht wird Eine Verarbeitung ist aufgrund von Speichermangel nicht möglich. Andererseits wurde im Fall einer parallel verteilten Verarbeitung mit Spark die Verarbeitung normal selbst mit dem für die Produktion angenommenen Datensatz abgeschlossen, und die Verarbeitung konnte sogar mit einer größeren Größe abgeschlossen werden.

Abbildung 4 zeigt den Fortschritt der CPU-, Speicher- und Festplatten-E / A-Nutzung bei der Verarbeitung von 22 GB Daten mit Spark. Es kann bestätigt werden, dass die Berechnungszeit stark reduziert wurde, da alle CPUs jedes Arbeitsknotens zu 100% ausgelastet waren. Der Speicher wird auch auf jeden Knoten verteilt. Da jeder der drei Knoten den Overhead der Basisbetriebssystemfunktion hat, ist die Speichernutzung des gesamten Clusters größer als die eines einzelnen Computers, und der nach dem Starten des Programms des Geschäftsszenarios Nr. 5 verwendete Speicher ist pro Arbeiterknoten. Es ist ungefähr 130-140 GB, und die Summe von 3 Knoten ist ungefähr 410 GB. Darüber hinaus können Sie als Ergebnis der Verarbeitung (1) der inneren Verknüpfung und der Verarbeitung (2) der Gruppe durch Verarbeitung (siehe Abb. 3 in Teil 2) sehen, wie E / A zum Schreiben auf die Festplatte ausgeführt werden. Diese E / A tritt auf, weil die kombinierten oder sortierten Daten auf der Festplatte gespeichert sind.

resource

Abbildung 4 Zeitliche Änderungen der CPU-, Speicher- und E / A-Nutzung in einer Spark-Umgebung

Berücksichtigung der Ergebnisse der Leistungsbewertung

Wirkung der Einführung von Spark

[Effekt der parallelen Dezentralisierung (1): Unterstützung umfangreicher Daten]

Mit der Einführung von Spark ist es möglich geworden, eine Vorverarbeitung für umfangreiche Daten durchzuführen, die aufgrund von Speichermangel bei Ausführung auf Python auf einem einzelnen Server zwangsweise beendet werden. Bei Daten, die die Speicherkapazitätsgrenze eines Computers überschreiten, wird die Einführung von Spark als angemessen erachtet.

[Effekt der parallelen Dezentralisierung (2): Verkürzung der Bearbeitungszeit]

Da Python nicht mehrere CPUs nutzen kann, selbst wenn die Datengröße zunimmt, wird die Verarbeitung nacheinander auf einer einzelnen CPU ausgeführt. Andererseits werden in Spark die Daten für jeden Knoten aufgeteilt, und die Verarbeitung für jede Daten wird verschiedenen Knoten und CPU-Kernen für die parallele Verarbeitung zugewiesen, so dass die Verarbeitungszeit erheblich reduziert werden kann.

Über die Funktionen von Python und Spark

  • Bei Verwendung von Pandas unter Python werden alle zu verarbeitenden Daten in den Speicher eingelesen und nacheinander von einer einzelnen CPU verarbeitet. Im Beispiel des Geschäftsszenarios Nr. 5 erhöht sich die Wiederholungszeit dieses Mal nur proportional zur Datengröße.

  • Bei der Spark-Verarbeitung werden Daten in den Speicher eingelesen und für jeden Abschnitt geteilt durch eine bestimmte Größe auf die Festplatte geschrieben. Wenn eine Verarbeitung durchgeführt wird, die einen Datenaustausch zwischen geteilten Knoten (JOIN, GROUP BY, Sortieren usw.) erfordert, wird das Ergebnis auf die Festplatte geschrieben. Durch die Verarbeitung großer Datenmengen für jede Partition auf diese Weise kann die Verarbeitung abgeschlossen werden, selbst wenn die zu verarbeitende Datengröße die installierte Kapazität des physischen Speichers überschreitet. Die Verarbeitungszeit wird jedoch zu einem Festplatten-E / A-Engpass und einer plötzlichen Verarbeitung Die Zeit kann mehr als linear ansteigen.

  • In Abb. 3 erhöht sich die Verarbeitungszeit plötzlich nichtlinear, wenn die Eingangsdatengröße etwa 150 GB beträgt. Dies liegt jedoch daran, dass die zu verarbeitende Datengröße die installierte Speichermenge überschreitet, Festplatten-E / A auftritt und es zu einem Engpass kommt. Ich bin.

Zusammenfassung

Wir haben die Ergebnisse der Leistungsbewertung und das damalige Know-how zur Leistungsverbesserung vorgestellt und uns auf Beispielarbeiten konzentriert, die Vorverarbeitungen wie Datenbindung und Aggregation für umfangreiche Tabellendaten durchführen.

Wenn die zu verarbeitenden Daten nicht so groß sind (bis zu mehreren zehn GB), kann die Verarbeitung auch bei der Vorverarbeitung mit Python auf einem einzelnen Knoten in wenigen Stunden abgeschlossen werden, sodass die Datenvorverarbeitung mit dem Vorverarbeitungscode in Python durchgeführt wird. Ist möglich. Zu diesem Zeitpunkt ist es möglich, die Verarbeitungszeit zu verkürzen, indem die in dieser Reihe eingeführte Logikoptimierung implementiert wird.

Wenn andererseits die zu verarbeitende Datenmenge groß ist, kann die Verarbeitung von Python zu lange dauern oder der Prozess kann fehlschlagen. Daher ist die Anwendung einer verteilten Verarbeitungsplattform wie Spark eine realistische Option. Wenn die beim maschinellen Lernen zu lernenden Daten etwa 22 GB betragen, kann die Verarbeitungszeit im Vergleich zu Python, das auf einem einzelnen Knoten (einem einzelnen Thread) ausgeführt wird, um etwa 94% verkürzt werden, indem die parallele verteilte Verarbeitung von Spark für die Vorverarbeitung verwendet wird. Ich bestätige. Wir haben außerdem bestätigt, dass Python bei Verwendung des Spark of Worker 3-Knotens bis zu viermal so viele Daten verarbeiten kann, die von einem einzelnen Knoten verarbeitet werden können, während die Leistung erhalten bleibt.

Recommended Posts