J'avais l'habitude d'utiliser Spark avant, mais maintenant je ne l'utilise plus Je l'oublierai bientôt, j'ai donc décidé de prendre note des bases.
(Puisqu'il s'agit d'une connaissance d'écoute de l'ensemble, j'attends des commentaires et des demandes d'édition pour erreurs)
Une image Docker qui exécute un environnement Jupyter + PySpark est fournie, ce qui est pratique pour essayer localement: https://hub.docker.com/r/jupyter/pyspark-notebook/
Qu'est-ce que PySpark? Spark lui-même est Scala, Il y a une histoire selon laquelle il y a un gars qui peut être utilisé avec Python et c'est PySpark.
Cela aurait dû être un mécanisme pour faire de mon mieux avec IPC, alors Scala <-> Il y a aussi un sujet selon lequel le coût de conversion de Python est assez élevé.
Utilisons maintenant:
docker run -it -p 8888:8888 jupyter/pyspark-notebook
Lorsque vous faites cela, vous verrez une URL avec un jeton au numéro 8888 dans Terminal.
(Autour de Pour accéder au notebook, ...
)
Lorsque vous accédez à la page principale, la page Jupyter apparaît.
Vous disposez d'un environnement simple que vous pouvez coder avec Jupyter Notebook.
Sélectionnez Notebook: Python3
dans New
ici pour ouvrir le Notebook
Le code de test pour voir s'il fonctionne est tiré de l'exemple ci-dessous (https://jupyter-docker-stacks.readthedocs.io/en/latest/using/specifics.html#in-a-python-notebook):
from pyspark.sql import SparkSession
spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# do something to prove it works
spark.sql('SELECT "Test" as c1').show()
Je ne suis pas sûr de la Spark Session
, mais
C'est une reconnaissance que c'est comme une instance de Spark lui-même.
Si vous faites cela et que vous obtenez une table, c'est OK:
Ciblons ce type de données:
id |
name |
gender |
age |
---|---|---|---|
1 | Satoshi | male | 10 |
2 | Shigeru | male | 10 |
3 | Kasumi | female | 12 |
Voici une définition naïve des données en Python:
from typing import List, Tuple
Trainer = Tuple[int, str, str, int]
trainers: List[Trainer] = [
(1, 'Satoshi', 'male', 10),
(2, 'Shigeru', 'male', 10),
(3, 'Kasumi', 'female', 12),
]
Le type de chaque ligne est «Tuple [int, str, str, int]» dans le «typage» de Python.
Et Spark a également une définition de schéma:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
Vous pouvez maintenant définir le schéma de colonne du côté Spark.
Pour convertir des données définies par Python en DataFrame
de Spark:
from pyspark.sql import DataFrame
trainers_df: DataFrame = spark.createDataFrame(
spark.sparkContext.parallelize(trainers),
trainers_schema
)
Vous avez maintenant un DataFrame
appelé trainers_df
.
Comme il peut être lu à partir de CSV, MySQL, etc. en tant que source de données, En réalité, il sera lu à partir d'une telle source de données plutôt que défini dans le code. (Dans certains cas, il est nécessaire de définir JDBC ou Hadoop, ce qui sera décrit plus tard.)
Si vous voulez vider ceci et le voir:
trainers_df.show()
Cela produira quelques lignes de texte formaté dans le tableau:
+---+------+------+---+
| id| name|gender|age|
+---+------+------+---+
| 1|Satoshi| male| 10|
| 2|Shigeru| male| 10|
| 3|Kasumi|female| 12|
+---+------+------+---+
Pour obtenir la valeur au lieu du vidage, faites simplement .collect ()
:
result = trainers_df.collect()
print(result)
Lors de l'exportation au format CSV, exportez DataFrame
dans cette atmosphère:
trainers_df.coalesce(1).write.mode('overwrite').csv("path/to/output.csv")
Comme l'entrée, il existe diverses autres destinations de sortie telles que S3, MySQL et Elasticsearch.
.coalesce (1)
correspond aux données divisées pour chaque partition.
Il s'agit de fusionner en une seule partition.
Si vous ne le faites pas, le CSV sera émis tel qu'il est divisé.
Utilisation de la commande hdfs
de Hadoop
Il existe également un moyen de rassembler ceux qui sont divisés.
C'est essentiellement une évaluation de retard
Puisqu'il n'est évalué qu'en effectuant une opération comme .collect ()
Vous ne devriez pas agréger si souvent.
Cela seul n'a aucun sens pour l'afficher, alors faisons quelque chose de approprié:
trainers_df.createOrReplaceTempView('trainers');
male_trainers_df = spark.sql('''
SELECT *
FROM trainers
WHERE gender = 'male'
''')
male_trainers_df.show()
Cela donne ce résultat:
id |
name |
gender |
age |
---|---|---|---|
1 | Satoshi | male | 10 |
2 | Shigeru | male | 10 |
DataFrame.createOrReplaceTempView (nom)
est DataFrame
,
Il peut être enregistré en tant que vue SQL temporaire.
Vous pouvez maintenant obtenir le DF du résultat de l'opération SQL pour la vue enregistrée avec spark.sql (query)
.
De cette façon, vous pouvez utiliser Spark avec le SQL auquel vous êtes habitué sans aucune hésitation.
La magie est que les barrières psychologiques et les coûts d'apprentissage sont faibles.
Vous pouvez également écrire le code en tant que DataFrame
sans l'enregistrer dans View:
male_trainers_df = trainers_df.filter(trainers_df['gender'] == 'male')
Il y a des cas où cela est plus facile à utiliser, donc c'est au cas par cas.
Puisque vous pouvez utiliser SQL, il n'y a aucun problème avec les opérations de base, La plupart du temps, le cas où vous souhaitez utiliser Spark semble être une situation dans laquelle vous souhaitez effectuer une opération définie par l'utilisateur.
Par exemple, comme cas que je voulais faire dans le passé Il y a une chose appelée "analyse morphologique du texte de l'article et l'écrire séparément" Ceci est difficile à réaliser avec SQL seul.
Cependant, comme il y a MeCab sur Python, Si vous faites une analyse morphologique à l'aide de la bibliothèque MeCab, elle sera décomposée sans réfléchir, donc Même si vous ne comprenez pas du tout comme moi, vous pouvez simplement le lancer sur MeCab pour le moment.
Comment puis-je faire cela pour DataFrame
sur Spark?
Il est bon de définir UDF (User-Defined Function).
(* Il existe une technique selon laquelle vous pouvez appliquer lambda
directement à RDD au lieu de DataFrame
.
Cela a des performances médiocres).
Pour définir une UDF, effectuez la définition suivante:
from pyspark.sql.functions import udf
@udf(StringType())
def name_with_suffix(name: str, gender: str) -> str:
return name + {'male': 'Kun', 'female': 'M.'}.get(gender, 'm')
spark.udf.register('name_with_suffix', name_with_suffix)
En appliquant le décorateur @udf (ReturnType)
à la fonction qui devient l'UDF
La fonction peut maintenant être définie comme un UDF.
Pour l'utiliser avec Spark SQL, enregistrez-le avec spark.udf.register (udf_name, udf)
Vous pouvez l'utiliser tel quel dans le même but que COUNT ()
.
D'ailleurs, même si vous n'utilisez pas de décorateur, vous pouvez appliquer une fonction existante en faisant ʻudf_fn = udf (fn) `.
Celui donné dans cet exemple dépend du «genre». Il s'agit d'ajouter un suffixe correspondant à "gender" à "name". Appliquons cette fonction en tant qu'UDF:
dearest_trainers = spark.sql('''
SELECT name_with_suffix(name, gender)
FROM trainers
''')
dearest_trainers.show()
Le résultat est:
name_with_suffix(name, gender) |
---|
Satoshi-kun |
Shigeru |
Kasumi |
Dans cet exemple, il y a une opinion que vous pouvez écrire en utilisant CASE
même en SQL, mais c'est vrai.
Cela peut être utile selon ce que vous voulez faire.
UDF
À propos, l'analyse morphologique susmentionnée est effectuée et divisée. Ce serait une fonction comme celle-ci en tant qu'image (En fait, j'utilise MeCab froidement):
import re
#Moitié de la taille/Divisez la chaîne de caractères par des espaces pleine largeur et des contrats
@udf(ArrayType(StringType()))
def wakachi(text: str) -> List[str]:
return [
word
for word
in re.split('[ !…]+', text)
if len(word) > 0
]
Il est normal de l'appliquer tel quel. Écrivons à nouveau l'exemple de code tout en modifiant les données:
Trainer = Tuple[int, str, str, int, str]
trainers: List[Trainer] = [
(1, 'Satoshi', 'male', 10, 'Obtenez Pokemon'),
(2, 'Shigeru', 'male', 10, 'C'est le meilleur de tous! Cela signifie être fort!'),
(3, 'Kasumi', 'female', 12, 'Ma politique est ... au moins avec les Pokémon de type Mizu ... du moins!'),
]
trainers_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('gender', StringType(), True),
StructField('age', IntegerType(), True),
])
trainers_df = spark.createDataFrame(
spark.sparkContext.parallelize(trainers),
trainers_schema
)
trainers_df.createOrReplaceTempView('trainers');
wakachi_trainers_df = spark.sql('''
SELECT id, name, wakachi(comment)
FROM trainers
''')
wakachi_trainers_df.show()
Le point ici est Cette fois, UDF reçoit «str» et le développe comme «List [str]». Quand j'essaye ceci, cela ressemble à ceci:
id |
name |
wakachi(comment) |
---|---|---|
1 | Satoshi | [Pokémon,avoir,C'est vrai] |
2 | Shigeru | [C'est moi,Dans le monde,je... |
3 | Kasumi | [ma,Politique,Mizu... |
Les cellules développées sont dans une liste Il est dans un état imbriqué avec plus de lignes à l'intérieur des lignes.
Que faire si vous voulez développer cela comme une colonne pour chaque str
?
Vous pouvez appliquer plus de fonctions pour développer:
https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/functions.html#explode(org.apache.spark.sql.Column)
from pyspark.sql.functions import explode
wakachi_trainers_df = spark.sql('''
SELECT id, name, explode(wakachi(comment))
FROM trainers
''')
wakachi_trainers_df.show()
Puisqu'il y a une fonction appelée ʻexplode` L'application de ceci développera les éléments imbriqués dans leurs colonnes respectives:
id |
name |
col |
---|---|---|
1 | Satoshi | Pokémon |
1 | Satoshi | avoir |
1 | Satoshi | C'est vrai |
2 | Shigeru | C'est moi |
2 | Shigeru | Dans le monde |
2 | Shigeru | plus |
2 | Shigeru | Fort |
2 | Shigeru | C'est ce que c'est |
3 | Kasumi | ma |
3 | Kasumi | Politique |
3 | Kasumi | Mizu |
3 | Kasumi | type |
3 | Kasumi | Avec Pokemon |
3 | Kasumi | au moins |
3 | Kasumi | au moins |
3 | Kasumi | Fête |
3 | Kasumi | Cette |
En outre, vous pouvez créer un JOIN
entre DataFrame
s.
Spécifiez la colonne à utiliser pour la jointure de la même manière que JOIN
comme MySQL ordinaire,
Sur cette base, «DataFrame» est combiné.
Ajoutons plus d'exemple de code et utilisons JOIN
:
Pkmn = Tuple[int, int, str, int]
pkmns: List[Pkmn] = [
(1, 1, 'Pikachu', 99),
(2, 1, 'Lizardon', 99),
(3, 2, 'Evey', 50),
(4, 3, 'Tosakinto', 20),
(5, 3, 'Étoile de mer', 30),
(6, 3, 'Star moi', 40),
]
pkmns_schema = StructType([
StructField('id', IntegerType(), True),
StructField('trainer_id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('level', IntegerType(), True),
])
pkmns_df = spark.createDataFrame(
spark.sparkContext.parallelize(pkmns),
pkmns_schema
)
pkmns_df.createOrReplaceTempView('pkmns');
trainer_and_pkmns_df = spark.sql('''
SELECT *
FROM trainers
INNER JOIN pkmns
ON trainers.id = pkmns.trainer_id
''')
trainer_and_pkmns_df.show()
id |
name |
gender |
age |
comment |
id |
trainer_id |
name |
level |
---|---|---|---|---|---|---|---|---|
1 | Satoshi | male | 10 | Obtenez Pokemon | 1 | 1 | Pikachu | 99 |
1 | Satoshi | male | 10 | Obtenez Pokemon | 2 | 1 | Lizardon | 99 |
3 | Kasumi | female | 12 | Ma politique est ... Mizuta... | 4 | 3 | Tosakinto | 20 |
3 | Kasumi | female | 12 | Ma politique est ... Mizuta... | 5 | 3 | Étoile de mer | 30 |
3 | Kasumi | female | 12 | Ma politique est ... Mizuta... | 6 | 3 | Star moi | 40 |
2 | Shigeru | male | 10 | je suis le meilleur... | 3 | 2 | Evey | 50 |
Au fait, il existe de nombreux types autres que ʻINNER JOIN et ʻOUTER JOIN
.
Cet article est facile à comprendre, je vais donc le citer:
https://qiita.com/ryoutoku/items/70c35cb016dcb13c8740
C'est pratique car vous pouvez effectuer des opérations collectives avec cela.
Le concept de chaque JOIN
est cité car le diagramme de Ben sur cette page est facile à comprendre:
https://medium.com/@achilleus/https-medium-com-joins-in-apache-spark-part-1-dabbf3475690
En un mot, «JOIN» est toujours cher et lent.
Si vous avez formé un cluster, il semble que des opérations telles que le trouver à partir des données distribuées à divers endroits, JOIN
et le renvoyer sont effectuées.
Par conséquent, le réglage des performances, qui sera décrit plus loin, est nécessaire.
Dans le monde réel, lutter avec d'énormes ensembles de données peut être une tâche ardue. Parce que si cela prend environ 4 heures, s'il tombe vers la fin, il faudra le refaire. Si vous vous trompez deux fois, vous aurez consacré une journée de travail et les heures supplémentaires seront confirmées.
Ainsi, afin d'améliorer ces performances, nous avons réduit les données pour augmenter l'efficacité de JOIN
,
Changer la méthode de partitionnement,
Il est nécessaire de concevoir pour que les partitions ne soient pas autant que possible fragmentées sur le cluster.
Broadcast Join est une méthode pour placer audacieusement des ensembles de données en double dans tous les clusters.
Il y a aussi des choses comme réduire le coût de la recherche d'un ensemble de données à JOIN
.
En tant que technique importante
En définissant DataFrame sur .cache ()
à chaque point de contrôle,
Dans certains cas, les performances sont considérablement améliorées.
Si vous regardez la page officielle sur les performances, il existe une telle technique et elle vous sera utile:
https://spark.apache.org/docs/latest/sql-performance-tuning.html#broadcast-hint-for-sql-queries
MySQL
Eh bien, il est courant de vouloir lire à partir d'une base de données MySQL et de s'en débarrasser. Dans ce cas, vous devez avoir un connecteur JDBC MySQL pour travailler avec MySQL, L'entrée de cette personne et son image Docker seront utiles:
Cependant, il y a certaines choses que MySQL est difficile à gérer avec Spark. (Il y a plusieurs points addictifs)
Spark est puissant:
Je crois que c'est.
En outre, Spark est la clé pour créer des clusters avec plusieurs unités et laisser les travailleurs faire le travail. En réalité, il semble préférable d'utiliser Amazon EMR ou AWS Glue car c'est laissé à AWS. En effet, s'il est local, cela fonctionne sans créer de cluster, donc même si vous tapez une énorme quantité de données sérieuses, il n'y aura aucune performance et vous n'en bénéficierez pas.
Vous avez atteint la limite de la mémoire, Même si vous pouvez économiser de l'argent, s'il s'agit de données volumineuses qu'il faut deux semaines pour passer par lots tout au long du processus, Même des choses simples peuvent être possibles si vous les divisez vous-même, les divisez en plusieurs processus et les exécutez. C'est une bonne idée de laisser Spark à Spark si possible.
Recommended Posts