[PYTHON] Vérification des performances du prétraitement des données pour l'apprentissage automatique (données numériques) (partie 2)

Première édition: 2020/3/18
Auteur: Soichi Takashige, Masahiro Ito, Hitachi, Ltd.

introduction

Dans cet article, nous présenterons le savoir-faire en matière de conception du prétraitement des données et les résultats de la vérification des performances du prétraitement des données lors de la conception d'un système intégrant un modèle d'apprentissage automatique.

Dans le troisième volet, nous présenterons le savoir-faire en matière d'amélioration des performances et les résultats de vérification dans le prétraitement des données à l'aide de Spark, qui est une plate-forme de traitement distribuée parallèle.

** Liste des messages: **

  1. À propos du prétraitement des données du système à l'aide de l'apprentissage automatique
  2. Vérification des performances du prétraitement des données pour l'apprentissage automatique (données numériques) (Partie 1)
  3. Vérification des performances du prétraitement des données pour l'apprentissage automatique (données numériques) (Partie 2) (Publié)

Utilisation de Spark dans le prétraitement des données

Dans mon dernier article, j'ai montré que si vous avez une grande quantité de données et que vous souhaitez prétraiter les données avec Python en utilisant un seul nœud, vous manquerez de mémoire. Dans de tels cas, il est souvent efficace d'utiliser une plate-forme de traitement distribuée parallèle. Cette fois, nous allons introduire le prétraitement des données par Spark, qui est une plate-forme de traitement distribuée parallèle typique.

Traitement de réécriture selon la méthode de traitement des données

Dans le développement d'un système qui utilise l'apprentissage automatique, comme décrit dans le premier article, le cas où PoC est d'abord effectué pour confirmer l'efficacité de l'apprentissage automatique, puis le système de production est développé en fonction du résultat. Cependant, à ce stade du PoC, le prétraitement des données est souvent implémenté en Python. Par conséquent, si vous souhaitez utiliser Spark lors du développement d'un système de production, vous devez réécrire votre code Python pour Spark. Cette fois, le prétraitement des données par la trame de données Pandas pour le scénario commercial BigBench n ° 5 a été réécrit en traitement Spark selon les directives suivantes.

import numpy as np
import pandas as pd


```python:After(spark)
import pyspark
import pyspark.sql
form pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
spark = SparkSession.builder.master(…) #Décrit les paramètres de connexion avec Spark
sc = spark.getContext()
hive_context = HiveContext(sc)

df = pd.DataFrame()


```python:After(spark)
df = spark.createDataFrame(data, schema)

df = pd.read_csv(filename)


```python:After(spark)
df = spark.read.csv(filename)

df.loc[df[“columnname”]notnull(),:]


```python:After(spark)
df.filter(df[“columnname”].isNotNull())

df.loc[rownumber]


```python:After(spark)
#Modifiez la logique pour que l'accès à une ligne spécifique prenne du temps et ne nécessite pas d'accès à la ligne

La façon dont vous accédez à une chaîne de données particulière est la même pour les pandas et les étincelles.

df[‘columnname’]

df.loc[df[‘columnname’] == value]


```python:After(spark)
df.filter(df[‘columnname’] == value)

pd.merge(df1, df2, how=’inner’, left_on=”l-key” right_on=”r-key”)


```python:After(spark)
df1.join(df2, df1.l_key == df2.r_key, ‘inner’)

def function(df): #Faire quelque chose return data df.groupby(“key”).apply(function)


```python:After(spark)
out_schema = StructType(...) #out_schema est la définition du schéma de données généré par la fonction
@pandas_udf(out_schema, PandasUDFType.GROUPED_MAP) 
def function(df):
 #Faites quelque chose * Selon les spécifications de Pandas UDF, il peut être nécessaire de réécrire (décrit plus loin)
 return data
df.groupby(“key”).apply(function)

Traitement de réécriture par Pandas UDF

Pandas UDF est l'un des moyens d'exécuter le traitement Python à l'aide de Pandas dans Spark. Pandas UDF est un mécanisme de coopération entre Python et Spark fourni par Spark, et il est possible de traiter les traitements implémentés en Python (Pandas) en parallèle et distribués dans Spark. Les trois fonctions présentées dans le tableau 1 sont disponibles dans Pandas UDF et peuvent être appelées à partir des opérations de trame de données de Spark.

Tableau 1 Types de fonctions Python pouvant être utilisées avec Pandas UDF

# Type de fonction de traitement Explication du traitement pouvant être effectué par la fonction Utiliser la scène
1 SCALAR Unpandas.Seriesを受け取り、処理を行ったうえで、Unpandas.Seriesrends le. Le nom et le type de l'élément doivent correspondre à l'entrée et à la sortie. S'applique à l'itération suivante pour une chaîne de données
  • map
  • apply_along_axis
2 AGG Un ou pluspandas.SeriesRecevoir,Après quelques traitements, unpandas.Seriesrends le. Le nombre de données d'entrée et de sortie ne doit pas nécessairement correspondre. S'applique au traitement d'agrégation
  • agg
3 GROUPED_MAP Unpandas.DataFrameEst reçu, un traitement est effectué sur chaque élément, et par conséquent,pandas.DataFrameRenvoie un. Les lignes de données d'entrée et de sortie, les noms de colonnes, etc. ne doivent pas nécessairement correspondre S'applique à chaque ensemble de données dans l'ensemble de données groupé
  • groupby.apply

Lors de l'écriture du traitement groupby.apply dans Pandas, en enregistrant la fonction Python passée à cette ʻapply comme fonction GROUPED_MAP ci-dessus, exécution parallèle par Spark sans le travail de réimplémentation, etc. Peut être réalisé. Cependant, les fonctions Pandas et Spark ʻapply ont des spécifications légèrement différentes et peuvent nécessiter quelques modifications. Dans cette implémentation, il était nécessaire de traiter les différences entre les deux spécifications suivantes, nous l'avons donc réécrit. Dans l'explication, «df» est le nom de la variable transmis au bloc de données, «keyname» est le nom de la colonne du bloc de données et «fonction» est le nom de la fonction qui définit le processus itératif.

  1. ** Différence dans les valeurs des arguments **
Différence de comportement Lorsque vous exécutez
`df.groupby (“ keyname ”) .apply (function)`, les données sont d'abord regroupées par la même valeur de` keyname`, puis regroupées dans un bloc de données. .. Dans Pandas et Spark, l'objet `DataFrame` de chaque bibliothèque est passé à l'argument de la fonction` function` function (df)` spécifié par la fonction ʻapply`. Dans Pandas, ce `DataFrame` a une propriété appelée" `df.name`" qui vous permet d'obtenir la valeur de `keyname` dans un groupe. D'autre part, Spark ne possède pas une telle propriété.
Contre-mesures La trame de données passée à la fonction
`function (df)` contient également, en principe, l'élément `keyname`. Vous pouvez obtenir la valeur de `nom` en définissant` name = df [“ keyname ”] [0]`.
  1. ** Différence de traitement de la valeur de retour **
Différence de comportement Dans
Pandas, `df.groupby (“ keyname ”). Apply (function)` change l'interprétation en fonction du nom de colonne et du type de données inclus dans la valeur de retour du résultat. En particulier, la fonction `groupby` crée automatiquement des données avec la colonne` keyname` comme index pour les données de sortie, et une` Series` ou` DataFrame` est créée en les agrégeant. D'autre part, Spark n'effectue pas une telle complétion automatique des colonnes.
Contre-mesures Configurez toujours la valeur de retour de
`function (df)` pour avoir` keyname` comme nom de colonne et incluez la valeur `keyname` des données d'origine comme cette valeur.

Exemple de code Spark du scénario d'affaires # 5 BigBench

À la suite de la réécriture de la mise en œuvre du prétraitement des données du scénario commercial BigBench n ° 5 par Python montré dans le deuxième article de Spark en utilisant le savoir-faire mentionné jusqu'à présent, le résultat final est comme indiqué dans la figure 1 ci-dessous. C'est devenu un code.

import pandas as pd
import numpy as np

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType, when
from pyspark.sql.types import *
sc = SparkContext.getOrCreate()
hive_context = HiveContext(sc)

web_clickstreams = hive_context.read.table("bigbench.web_clickstreams")
item = hive_context.read.table("bigbench.item")
customer = hive_context.read.table("bigbench.customer")
customer_demographics = hive_context.read.table("bigbench.customer_demographics")

#Traitement ①
data = web_clickstreams.filter(web_clickstreams[“wcs_user_sk”].isNotNull())
data = data.join(item, data["wcs_item_sk"] == item["i_item_sk"], 'inner')

#Processus (2): Regrouper par ID utilisateur
grouped_users = data.groupby('wcs_user_sk')

#Processus ③ Définition du type: définissez le type de données de sortie du traitement itératif
types =  ["wcs_user_sk", "clicks_in_category"]+["clicks_in_%d"%i for i in range(1,8)]
out_schema = StructType([StructField(i, IntegerType(), True) for i in  types])

#Processus Enregistrement: enregistrez le contenu du processus itératif dans Pandas UDF en tant que fonction
@pandas_udf(out_schema, PandasUDFType.GROUPED_MAP)
def summarize_per_user(wcs_user_sk_contents):
    wcs_user_sk_index = wcs_user_sk_contents['wcs_user_sk'][0]
    #Traitement ③-1, ③-2
    clicks_in_category = \
        len(wcs_user_sk_contents[wcs_user_sk_contents['i_category'] == i_category_index])
    clicks_in = [0] * 8
    for name, df in wcs_user_sk_contents.groupby('i_category_id'):#Optimiser la boucle une fois
        if name < len(clicks_in):
            clicks_in[name] = len(df.index)
    #Traitement ③-3
    return pd.DataFrame([wcs_user_sk_index, clicks_in_category] + clicks_in[1:],\
                        columns=types)

#Processus ③ Exécution
i_category_index = 'Books'
data = grouped_users.apply(summarize_per_user)

#Traitement ④
data = data.join(customer, data["wcs_user_sk"] == customer["c_customer_sk"], 'inner')

#Traitement ⑤
data = data.join(customer_demographics, \
     data["c_current_cdemo_sk"] == customer_demographics["cd_demo_sk"], 'inner')

#Traitement ⑥
data.withColumn('college_education', 
                      when(data["cd_education_status"] == 'Advanced Degree', 1)\
                     .when(data["cd_education_status"] == 'College', 1)\
                     .when(data[“cd_education_status”] == '4 yr Degree', 1)\
                     .when(data[“cd_education_status”] == '2 yr Degree', 1)\
                     .otherwise(0))
data.withColumn('male', when(data[“cd_gender”] == 'M', 1).otherwise(0))

#Enregistrer les résultats
data.write.mode('append').parquet('answer_q05_0100.parquet')

Figure 1 Code source de prétraitement des données pour le scénario BigBench # 5 par Spark

Vérification des effets à l'aide de Spark

À partir de là, nous vérifierons les performances du prétraitement des données par Spark implémenté dans la figure 1. La figure 2 ci-dessous montre la disposition du service du cluster Spark construit comme un environnement pour le traitement distribué parallèle dans cette vérification. Cette fois, nous construisons un cluster Spark à l'aide de la distribution Cloudera, en supposant un cas d'utilisation sur site. De plus, un nœud de Worker Node est utilisé pour la vérification du traitement de nœud unique par Python, ce qui a été fait dans le deuxième article. De plus, lors du traitement dans Spark, le traitement distribué parallèle réel est effectué sur 3 nœuds de travail.

spark

Figure 2 Disposition du service de cet environnement de vérification

Ensuite, le tableau 2 présente les spécifications de cet environnement de vérification. Cette fois, IaaS (instance EC2) sur AWS est utilisée comme machine de vérification. Cinq disques durs de 1 To (EBS) sont connectés à l'instance EC2 pour Worker Node, et une capacité de 5 To est connectée par nœud. Cependant, comme HDFS écrit les données dans 3 multiplexes, la capacité de données effective est d'environ 5 To.

Tableau 2 Spécifications matérielles de l'environnement de vérification

Manager Node Master Node Worker Node×3
Environnement de vérification AWS EC2 AWS EC2 AWS EC2
OS CentOS 7 64bit CentOS 7 64bit CentOS 7 64bit
CPU(Nombres de coeurs) 2 4 96 (32 x 3 nœuds)
Memory(GB) 16 32 768 (256 Go x 3 nœuds)
HDD(GB) 80GB 80GB 15TB※(1 To x 5 disques durs x 3 nœuds)

Les versions de logiciel utilisées pour la vérification sont indiquées dans le tableau 3 ci-dessous.

Tableau 3 Version du logiciel de l'environnement de vérification

Logiciel version
Distribution de Cloudera CDH 6.3.0
Spark 2.4.0
Hive 2.1.1
YARN 2.5.0
HDFS 3.0.0
Python 3.7.3
Pandas 0.24.2
Numpy 1.16.4

Méthode de traitement pour comparer

En plus des résultats de mesure des performances des méthodes de traitement 1 et 2 ci-dessous, qui ont été vérifiés dans le 2ème post précédent, nous mesurerons et comparerons les performances (3.) lors du traitement distribué en parallèle par Spark. ..

  1. Traitement à nœud unique avec Python (sans optimisation logique dans la figure 5 de la partie 2)

Exécutez le code de la figure 4 de la partie 2 sur Python.

  1. Traitement à un nœud par Python (avec optimisation logique dans la figure 5 du deuxième article)

Exécutez le code optimisé dans la figure 5 du deuxième article par rapport au code de la figure 4 du deuxième article sur Python.

  1. Traitement distribué parallèle par Spark (avec optimisation logique dans la figure 5 du deuxième article)

Exécutez un traitement distribué parallèle sur Spark à l'aide du code de traitement à nœud unique Python à optimisation logique utilisé en 2., modifié pour le traitement Spark (illustré à la figure 1). ..

Paramètres d'exécution lors du traitement dans Spark

Les paramètres d'exécution de Spark pour l'exécution des tâches sont définis comme indiqué dans le tableau 4. Chaque nœud de travail démarre un processus de travail (exécuteur) et l'alloue avec la politique d'utiliser exclusivement la mémoire et les cœurs qu'il contient.

Tableau 4 Paramètres d'exécution de Spark

# article Définir la valeur Remarques
1 Nombre d'exécuteurs 3 En supposant que chaque nœud est démarré un par un
2 Taille de la mémoire de l'exécuteur 128 GB
3 Nombre de cœurs par exécuteur 30 Allouez et utilisez 30 cœurs sur 32 cœurs de la machine

Traitement du contenu et des données à mesurer

Dans la mesure, le temps total requis pour les trois processus suivants sera mesuré de la même manière que la mesure pour le deuxième poste.

  1. Lecture des données d'une source de données en mémoire

  2. Prétraitement tel que combinaison de données et agrégation pour les données lues

  3. Ecrire le résultat du traitement dans la banque de données

De plus, les données à mesurer ont été réglées sur les mêmes paramètres que dans la partie 2 (tableau 3 dans le deuxième article).

Résultats de la mesure du rendement

La figure 3 montre les résultats de l'évaluation du temps de traitement en exécutant chacun des quatre types de traitement pour chaque taille de données pour le scénario commercial n ° 5 de BigBench.

performance

Figure 3 Résultats de la mesure du temps de prétraitement des données pour chaque taille de données d'entrée

Dans le traitement à nœud unique par Python, comme résultat du deuxième article, lorsque la taille des données d'entrée est augmentée, la taille de l'ensemble de données est inférieure à la taille de l'ensemble de données (environ 50 Go) supposée pour la production. Le traitement ne sera pas possible en raison d'un manque de mémoire. D'un autre côté, dans le cas du traitement distribué en parallèle avec Spark, le traitement était terminé normalement même avec l'ensemble de données supposé pour la production, et le traitement pouvait être terminé même avec une taille plus grande.

La figure 4 montre la progression de l'utilisation du processeur, de la mémoire et des E / S disque lors du traitement de 22 Go de données avec Spark. Il peut être confirmé que le temps de calcul a été considérablement réduit car toutes les CPU de chaque nœud de travail ont été utilisées à 100%. La mémoire est également distribuée à chaque nœud. Étant donné que chacun des trois nœuds a la surcharge de la fonction du système d'exploitation de base, l'utilisation de la mémoire de l'ensemble du cluster est plus grande que celle d'une seule machine et la mémoire utilisée après le démarrage du programme du scénario métier n ° 5 est par nœud de travail. Il fait environ 130-140 Go et le total de 3 nœuds est d'environ 410 Go. De plus, à la suite du traitement (1) Jointure interne et traitement (2) groupe par traitement (voir la figure 3 publiée dans la partie 2), vous pouvez voir comment les E / S d'écriture sur disque se produisent. Ces E / S se produisent car les données combinées ou triées sont stockées sur le disque.

resource

Figure 4 Changements temporels dans l'utilisation du processeur, de la mémoire et des E / S dans un environnement Spark

Prise en compte des résultats de l'évaluation des performances

Effet de l'introduction de Spark

[Effet de la décentralisation parallèle (1): prise en charge des données à grande échelle]

Avec l'introduction de Spark, il est devenu possible d'effectuer un prétraitement sur des données à grande échelle qui seront interrompues de force en raison d'un manque de mémoire lorsqu'elles sont exécutées sur Python sur un seul serveur. Lorsqu'il s'agit de données qui dépassent la limite de capacité de mémoire d'une machine, l'introduction de Spark est considérée comme appropriée.

[Effet de la décentralisation parallèle (2): réduction du temps de traitement]

Puisque Python ne peut pas tirer parti de plusieurs processeurs, même si la taille des données augmente, le traitement est exécuté séquentiellement sur un seul processeur. D'autre part, dans Spark, les données sont divisées pour chaque nœud et le traitement de chaque donnée est attribué à différents nœuds et cœurs de processeur pour un traitement parallèle, de sorte que le temps de traitement peut être considérablement réduit.

À propos des fonctionnalités de Python et Spark

  • Lors de l'utilisation de Pandas sur Python, toutes les données à traiter sont lues en mémoire et traitées séquentiellement par un seul processeur. De plus, dans l'exemple du scénario commercial n ° 5, cette fois, le temps de répétition augmente uniquement proportionnellement à la taille des données.

  • Dans le traitement Spark, les données sont lues en mémoire et écrites sur le disque pour chaque section divisée par une certaine taille. De plus, lors d'un traitement nécessitant un échange de données entre nœuds divisés (JOIN, GROUP BY, tri, etc.), le résultat est écrit sur le disque. En traitant de grandes données pour chaque partition de cette manière, même si la taille des données à gérer dépasse la quantité de mémoire physique installée, le traitement peut être terminé, mais le temps de traitement devient un goulot d'étranglement d'E / S disque et un traitement soudain Le temps peut augmenter plus que linéaire.

  • Sur la figure 3, lorsque la taille des données d'entrée est d'environ 150 Go, le temps de traitement augmente soudainement de manière non linéaire, mais c'est parce que la taille des données à gérer dépasse la quantité de mémoire installée, le disque IO se produit et cela devient un goulot d'étranglement. Je suis.

Résumé

Nous avons présenté les résultats de l'évaluation des performances et le savoir-faire en matière d'amélioration des performances à l'époque, en ciblant des exemples de travail qui effectuent un prétraitement tel que la liaison de données et l'agrégation pour les données de table à grande échelle.

Si les données à traiter ne sont pas si volumineuses (jusqu'à plusieurs dizaines de Go), le traitement peut être terminé en quelques heures même avec le prétraitement avec Python sur un seul nœud, de sorte que le prétraitement des données est effectué avec le code de prétraitement en Python. Est possible. A ce moment, il est possible de raccourcir le temps de traitement en mettant en œuvre l'optimisation logique introduite dans cette série.

D'un autre côté, si la quantité de données à traiter est importante, le traitement de Python peut prendre trop de temps ou le processus peut échouer, donc l'application d'une plate-forme de traitement distribuée telle que Spark est une option réaliste. Lorsque les données à apprendre utilisées dans l'apprentissage automatique sont d'environ 22 Go, le temps de traitement peut être raccourci d'environ 94% par rapport à Python qui fonctionne sur un seul nœud (thread unique) en utilisant le traitement distribué en parallèle par Spark pour le prétraitement. Je confirme. Nous avons également confirmé que lors de l'utilisation du nœud Spark of Worker 3, Python peut traiter jusqu'à environ 4 fois la quantité de données pouvant être traitées par un seul nœud tout en maintenant les performances.

Recommended Posts