[PYTHON] Essayez Apache Spark avec Jupyter Notebook (sur Docker local

J'avais l'habitude d'utiliser Spark avant, mais maintenant je ne l'utilise plus Je l'oublierai bientôt, j'ai donc décidé de prendre note des bases.

(Puisqu'il s'agit d'une connaissance d'écoute de l'ensemble, j'attends des commentaires et des demandes d'édition pour erreurs)

utilisation

Une image Docker qui exécute un environnement Jupyter + PySpark est fournie, ce qui est pratique pour essayer localement: https://hub.docker.com/r/jupyter/pyspark-notebook/

Qu'est-ce que PySpark? Spark lui-même est Scala, Il y a une histoire selon laquelle il y a un gars qui peut être utilisé avec Python et c'est PySpark.

Cela aurait dû être un mécanisme pour faire de mon mieux avec IPC, alors Scala <-> Il y a aussi un sujet selon lequel le coût de conversion de Python est assez élevé.

Utilisons maintenant:

docker run -it -p 8888:8888 jupyter/pyspark-notebook

Lorsque vous faites cela, vous verrez une URL avec un jeton au numéro 8888 dans Terminal. (Autour de Pour accéder au notebook, ...) Lorsque vous accédez à la page principale, la page Jupyter apparaît. Vous disposez d'un environnement simple que vous pouvez coder avec Jupyter Notebook.

Home_Page_-_Select_or_create_a_notebook.png

Sélectionnez Notebook: Python3 dans New ici pour ouvrir le Notebook

Untitled_-_Jupyter_Notebook.png

essayer

Le code de test pour voir s'il fonctionne est tiré de l'exemple ci-dessous (https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#in-a-python-notebook):

from pyspark.sql import SparkSession

spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()

# do something to prove it works
spark.sql('SELECT "Test" as c1').show()

Je ne suis pas sûr de la Spark Session, mais C'est une reconnaissance que c'est comme une instance de Spark lui-même.

Si vous faites cela et que vous obtenez une table, c'est OK:

Untitled_-_Jupyter_Notebook.png

Gérer les données

Ciblons ce type de données:

id name gender age
1 Satoshi male 10
2 Shigeru male 10
3 Kasumi female 12

Entrée et définition

Voici une définition naïve des données en Python:

from typing import List, Tuple

Trainer = Tuple[int, str, str, int]
trainers: List[Trainer] = [
    (1, 'Satoshi', 'male',   10),
    (2, 'Shigeru', 'male',   10),
    (3, 'Kasumi', 'female', 12),
]

Le type de chaque ligne est «Tuple [int, str, str, int]» dans le «typage» de Python.

Et Spark a également une définition de schéma:

from pyspark.sql.types import StructField, StructType, StringType, IntegerType

trainers_schema = StructType([
    StructField('id',      IntegerType(), True),
    StructField('name',    StringType(),  True),
    StructField('gender',  StringType(),  True),
    StructField('age',     IntegerType(), True),
])

Vous pouvez maintenant définir le schéma de colonne du côté Spark.

Pour convertir des données définies par Python en DataFrame de Spark:

from pyspark.sql import DataFrame

trainers_df: DataFrame = spark.createDataFrame(
    spark.sparkContext.parallelize(trainers),
    trainers_schema
)

Vous avez maintenant un DataFrame appelé trainers_df.

Comme il peut être lu à partir de CSV, MySQL, etc. en tant que source de données, En réalité, il sera lu à partir d'une telle source de données plutôt que défini dans le code. (Dans certains cas, il est nécessaire de définir JDBC ou Hadoop, ce qui sera décrit plus tard.)

Si vous voulez vider ceci et le voir:

trainers_df.show()

Cela produira quelques lignes de texte formaté dans le tableau:

Untitled_-_Jupyter_Notebook.png

+---+------+------+---+
| id|  name|gender|age|
+---+------+------+---+
|  1|Satoshi|  male| 10|
|  2|Shigeru|  male| 10|
|  3|Kasumi|female| 12|
+---+------+------+---+

Agrégation et sortie

Pour obtenir la valeur au lieu du vidage, faites simplement .collect ():

result = trainers_df.collect()
print(result)

Lors de l'exportation au format CSV, exportez DataFrame dans cette atmosphère:

trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")

Comme l'entrée, il existe diverses autres destinations de sortie telles que S3, MySQL et Elasticsearch.

.coalesce (1) correspond aux données divisées pour chaque partition. Il s'agit de fusionner en une seule partition. Si vous ne le faites pas, le CSV sera émis tel qu'il est divisé.

Utilisation de la commande hdfs de Hadoop Il existe également un moyen de rassembler ceux qui sont divisés.

C'est essentiellement une évaluation de retard Puisqu'il n'est évalué qu'en effectuant une opération comme .collect () Vous ne devriez pas agréger si souvent.

De base

Cela seul n'a aucun sens pour l'afficher, alors faisons quelque chose de approprié:

trainers_df.createOrReplaceTempView('trainers');

male_trainers_df = spark.sql('''
    SELECT *
    FROM   trainers
    WHERE  gender = 'male'
''')
male_trainers_df.show()

Cela donne ce résultat:

id name gender age
1 Satoshi male 10
2 Shigeru male 10

DataFrame.createOrReplaceTempView (nom) est DataFrame, Il peut être enregistré en tant que vue SQL temporaire.

Vous pouvez maintenant obtenir le DF du résultat de l'opération SQL pour la vue enregistrée avec spark.sql (query). De cette façon, vous pouvez utiliser Spark avec le SQL auquel vous êtes habitué sans aucune hésitation. La magie est que les barrières psychologiques et les coûts d'apprentissage sont faibles.

Vous pouvez également écrire le code en tant que DataFrame sans l'enregistrer dans View:

male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male')

Il y a des cas où cela est plus facile à utiliser, donc c'est au cas par cas.

application

Puisque vous pouvez utiliser SQL, il n'y a aucun problème avec les opérations de base, La plupart du temps, le cas où vous souhaitez utiliser Spark semble être une situation dans laquelle vous souhaitez effectuer une opération définie par l'utilisateur.

Par exemple, comme cas que je voulais faire dans le passé Il y a une chose appelée "analyse morphologique du texte de l'article et l'écrire séparément" Ceci est difficile à réaliser avec SQL seul.

Cependant, comme il y a MeCab sur Python, Si vous faites une analyse morphologique à l'aide de la bibliothèque MeCab, elle sera décomposée sans réfléchir, donc Même si vous ne comprenez pas du tout comme moi, vous pouvez simplement le lancer sur MeCab pour le moment.

Comment puis-je faire cela pour DataFrame sur Spark? Il est bon de définir UDF (User-Defined Function).

(* Il existe une technique selon laquelle vous pouvez appliquer lambda directement à RDD au lieu de DataFrame. Cela a des performances médiocres).

Pour définir une UDF, effectuez la définition suivante:

from pyspark.sql.functions import udf

@udf(StringType())
def name_with_suffix(name: str, gender: str) -> str:
    return name + {'male': 'Kun', 'female': 'M.'}.get(gender, 'm')

spark.udf.register('name_with_suffix', name_with_suffix)

En appliquant le décorateur @udf (ReturnType) à la fonction qui devient l'UDF La fonction peut maintenant être définie comme un UDF. Pour l'utiliser avec Spark SQL, enregistrez-le avec spark.udf.register (udf_name, udf) Vous pouvez l'utiliser tel quel dans le même but que COUNT ().

D'ailleurs, même si vous n'utilisez pas de décorateur, vous pouvez appliquer une fonction existante en faisant ʻudf_fn = udf (fn) `.

Celui donné dans cet exemple dépend du «genre». Il s'agit d'ajouter un suffixe correspondant à "gender" à "name". Appliquons cette fonction en tant qu'UDF:

dearest_trainers = spark.sql('''
    SELECT name_with_suffix(name, gender)
    FROM   trainers
''')
dearest_trainers.show()

Le résultat est:

name_with_suffix(name, gender)
Satoshi-kun
Shigeru
Kasumi

Dans cet exemple, il y a une opinion que vous pouvez écrire en utilisant CASE même en SQL, mais c'est vrai.

Cela peut être utile selon ce que vous voulez faire.

UDF

À propos, l'analyse morphologique susmentionnée est effectuée et divisée. Ce serait une fonction comme celle-ci en tant qu'image (En fait, j'utilise MeCab froidement):

import re

#Moitié de la taille/Divisez la chaîne de caractères par des espaces pleine largeur et des contrats
@udf(ArrayType(StringType()))
def wakachi(text: str) -> List[str]:
    return [
        word
        for word
        in re.split('[  !…]+', text)
        if len(word) > 0
    ]

Il est normal de l'appliquer tel quel. Écrivons à nouveau l'exemple de code tout en modifiant les données:

Trainer = Tuple[int, str, str, int, str]
trainers: List[Trainer] = [
    (1, 'Satoshi', 'male',   10, 'Obtenez Pokemon'),
    (2, 'Shigeru', 'male',   10, 'C'est le meilleur de tous! Cela signifie être fort!'),
    (3, 'Kasumi', 'female', 12, 'Ma politique est ... au moins avec les Pokémon de type Mizu ... du moins!'),
]

trainers_schema = StructType([
    StructField('id',      IntegerType(), True),
    StructField('name',    StringType(),  True),
    StructField('gender',  StringType(),  True),
    StructField('age',     IntegerType(), True),
])

trainers_df = spark.createDataFrame(
    spark.sparkContext.parallelize(trainers),
    trainers_schema
)
trainers_df.createOrReplaceTempView('trainers');

wakachi_trainers_df = spark.sql('''
    SELECT id, name, wakachi(comment)
    FROM   trainers
''')
wakachi_trainers_df.show()

Le point ici est Cette fois, UDF reçoit «str» et le développe comme «List [str]». Quand j'essaye ceci, cela ressemble à ceci:

id name wakachi(comment)
1 Satoshi [Pokémon,avoir,C'est vrai]
2 Shigeru [C'est moi,Dans le monde,je...
3 Kasumi [ma,Politique,Mizu...

Les cellules développées sont dans une liste Il est dans un état imbriqué avec plus de lignes à l'intérieur des lignes.

Que faire si vous voulez développer cela comme une colonne pour chaque str? Vous pouvez appliquer plus de fonctions pour développer:

https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/functions.html#explode(org.apache.spark.sql.Column)

from pyspark.sql.functions import explode

wakachi_trainers_df = spark.sql('''
    SELECT id, name, explode(wakachi(comment))
    FROM   trainers
''')
wakachi_trainers_df.show()

Puisqu'il y a une fonction appelée ʻexplode` L'application de ceci développera les éléments imbriqués dans leurs colonnes respectives:

id name col
1 Satoshi Pokémon
1 Satoshi avoir
1 Satoshi C'est vrai
2 Shigeru C'est moi
2 Shigeru Dans le monde
2 Shigeru plus
2 Shigeru Fort
2 Shigeru C'est ce que c'est
3 Kasumi ma
3 Kasumi Politique
3 Kasumi Mizu
3 Kasumi type
3 Kasumi Avec Pokemon
3 Kasumi au moins
3 Kasumi au moins
3 Kasumi Fête
3 Kasumi Cette

joindre

En outre, vous pouvez créer un JOIN entre DataFrames. Spécifiez la colonne à utiliser pour la jointure de la même manière que JOIN comme MySQL ordinaire, Sur cette base, «DataFrame» est combiné.

Ajoutons plus d'exemple de code et utilisons JOIN:

Pkmn = Tuple[int, int, str, int]
pkmns: List[Pkmn] = [
    (1, 1, 'Pikachu', 99),
    (2, 1, 'Lizardon', 99),
    (3, 2, 'Evey',   50),
    (4, 3, 'Tosakinto', 20),
    (5, 3, 'Étoile de mer', 30),
    (6, 3, 'Star moi', 40),
]
pkmns_schema = StructType([
    StructField('id',         IntegerType(), True),
    StructField('trainer_id', IntegerType(), True),
    StructField('name',       StringType(),  True),
    StructField('level',      IntegerType(), True),
])
pkmns_df = spark.createDataFrame(
    spark.sparkContext.parallelize(pkmns),
    pkmns_schema
)
pkmns_df.createOrReplaceTempView('pkmns');

trainer_and_pkmns_df = spark.sql('''
    SELECT     *
    FROM       trainers
    INNER JOIN pkmns
          ON   trainers.id = pkmns.trainer_id
''')
trainer_and_pkmns_df.show()
id name gender age comment id trainer_id name level
1 Satoshi male 10 Obtenez Pokemon 1 1 Pikachu 99
1 Satoshi male 10 Obtenez Pokemon 2 1 Lizardon 99
3 Kasumi female 12 Ma politique est ... Mizuta... 4 3 Tosakinto 20
3 Kasumi female 12 Ma politique est ... Mizuta... 5 3 Étoile de mer 30
3 Kasumi female 12 Ma politique est ... Mizuta... 6 3 Star moi 40
2 Shigeru male 10 je suis le meilleur... 3 2 Evey 50

Au fait, il existe de nombreux types autres que ʻINNER JOIN et ʻOUTER JOIN. Cet article est facile à comprendre, je vais donc le citer:

https://qiita.com/ryoutoku/items/70c35cb016dcb13c8740

C'est pratique car vous pouvez effectuer des opérations collectives avec cela.

Le concept de chaque JOIN est cité car le diagramme de Ben sur cette page est facile à comprendre:

https://medium.com/@achilleus/https-medium-com-joins-in-apache-spark-part-1-dabbf3475690

En un mot, «JOIN» est toujours cher et lent. Si vous avez formé un cluster, il semble que des opérations telles que le trouver à partir des données distribuées à divers endroits, JOIN et le renvoyer sont effectuées.

Par conséquent, le réglage des performances, qui sera décrit plus loin, est nécessaire.

performance

Dans le monde réel, lutter avec d'énormes ensembles de données peut être une tâche ardue. Parce que si cela prend environ 4 heures, s'il tombe vers la fin, il faudra le refaire. Si vous vous trompez deux fois, vous aurez consacré une journée de travail et les heures supplémentaires seront confirmées.

Ainsi, afin d'améliorer ces performances, nous avons réduit les données pour augmenter l'efficacité de JOIN, Changer la méthode de partitionnement, Il est nécessaire de concevoir pour que les partitions ne soient pas autant que possible fragmentées sur le cluster.

Broadcast Join est une méthode pour placer audacieusement des ensembles de données en double dans tous les clusters. Il y a aussi des choses comme réduire le coût de la recherche d'un ensemble de données à JOIN.

En tant que technique importante En définissant DataFrame sur .cache () à chaque point de contrôle, Dans certains cas, les performances sont considérablement améliorées.

Si vous regardez la page officielle sur les performances, il existe une telle technique et elle vous sera utile:

https://spark.apache.org/docs/latest/sql-performance-tuning.html#broadcast-hint-for-sql-queries

MySQL

Eh bien, il est courant de vouloir lire à partir d'une base de données MySQL et de s'en débarrasser. Dans ce cas, vous devez avoir un connecteur JDBC MySQL pour travailler avec MySQL, L'entrée de cette personne et son image Docker seront utiles:

Cependant, il y a certaines choses que MySQL est difficile à gérer avec Spark. (Il y a plusieurs points addictifs)

Réellement

Spark est puissant:

Je crois que c'est.

En outre, Spark est la clé pour créer des clusters avec plusieurs unités et laisser les travailleurs faire le travail. En réalité, il semble préférable d'utiliser Amazon EMR ou AWS Glue car c'est laissé à AWS. En effet, s'il est local, cela fonctionne sans créer de cluster, donc même si vous tapez une énorme quantité de données sérieuses, il n'y aura aucune performance et vous n'en bénéficierez pas.

Vous avez atteint la limite de la mémoire, Même si vous pouvez économiser de l'argent, s'il s'agit de données volumineuses qu'il faut deux semaines pour passer par lots tout au long du processus, Même des choses simples peuvent être possibles si vous les divisez vous-même, les divisez en plusieurs processus et les exécutez. C'est une bonne idée de laisser Spark à Spark si possible.

Recommended Posts

Essayez Apache Spark avec Jupyter Notebook (sur Docker local
Essayez d'exécuter Jupyter Notebook sur Mac
Utilisation d'Apache Spark avec le notebook Jupyter (notebook IPython)
Essayez SVM avec scikit-learn sur Jupyter Notebook
Essayez les opérations de base sur Pandas DataFrame sur Jupyter Notebook
Approvisionnement EC2 avec Vagrant + Jupyter (IPython Notebook) sur Docker
Essayez d'utiliser Jupyter Notebook de manière dynamique
Graphiques élevés dans le notebook Jupyter
Afficher le PDF sur Jupyter Notebook
Exécutez IPython Notebook sur Docker
Exécutez Jupyter Notebook sous Windows
Impossible d'afficher le tensorboard dans le notebook Jupyter sur Docker (résolu)
Essayez d'exécuter tensorflow sur Docker + anaconda
Premiers pas sur Docker Apache Hadoop
Essayez de démarrer Jupyter Notebook ~ Formation Esper
Résolvez les caractères japonais brouillés dans matplotlib de Jupyter Notebook sur Docker
Faire de Jupyter Notebook un service sur CentOS
Utilisez BigQuery à partir de votre bloc-notes Jupyter local
Cloner le référentiel github sur le notebook Jupyter
Vérification du GPU du PC sur le notebook Jupyter
Afficher l'histogramme / diagramme de dispersion sur Jupyter Notebook
Construire un notebook jupyter sur un serveur distant (CentOS)
Utiliser les raccourcis clavier vim dans Jupyter Notebook lancé avec Docker
Exécuter le notebook Jupyter sur un serveur distant
Installer matplotlib et afficher le graphique dans Jupyter Notebook
Créez un environnement LAMP sur votre Docker local
Essayez le modèle d'espace d'état (Jupyter Notebook + noyau IR)
[Jupyter Notebook / Lab] Trois méthodes de débogage sur Jupyter [Pdb]
Construire un environnement d'analyse avec Docker (jupyter notebook + PostgreSQL)
Activer Jupyter Notebook avec conda sur un serveur distant
Essayez d'utiliser l'environnement virtuel conda avec Jupyter Notebook
[Pythonocc] J'ai essayé d'utiliser la CAO sur un notebook Jupyter
Affichage simple du graphique linéaire sur Jupyter Notebook
Notebook Jupyter ouvert à distance lancé sur le serveur
Essayez d'utiliser le bloc-notes Jupyter à partir d'Azure Machine Learning
jupyter notebook ne commence pas par fish sur mac
L'histoire du démarrage du notebook Jupyter de python2.x à l'aide de docker (écrasé samedi et dimanche)
pykintone sur Docker
Mémo Jupyter Notebook
Présentation de Jupyter Notebook
Puissant ordinateur portable Jupyter
Golang avec Jupyter
Jupyter sur AWS
Mot de passe du notebook Jupyter
Construction d'environnement virtuel avec Docker + Flask (Python) + notebook Jupyter
Lancez facilement jupyter notebook sur AWS et accédez localement
Exécutez Tensorflow à partir de Jupyter Notebook sur Bash sur Ubuntu sous Windows
[Windows] [Python3] Installer python3 et Jupyter Notebook (anciennement ipython notebook) sous Windows
Comment afficher la barre de progression sur Jupyter Notebook pour voir la progression