Handle Parquet in Python

At the beginning

The in-house production team I belong to is migrating from Unicage, and the issue is how to handle large-scale transaction data in text files by methods other than Unicage commands.

I've tried handling Unicage files with pandas before, and the result is that it's overwhelmingly slower than the Unicage command at the 1 million line level. After that, there was a talk about converting it to parquet, so I decided to give it a try.

environment

Install pyspark

pipenv install pyspark

First read as it is with pyspark

Prepare the data of 890,000 lines actually used and display the first 5 lines

from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName('SparkSample').getOrCreate()

#Since it is in Unicage format, it is read as a space delimiter.
df = spark.read.csv('./SAMPLE_DATA', header=True, sep=' ')
df.show(5)

This alone takes more than 30 seconds. It's slower than I expected. It may be faster if you devise a reading method.

Convert Unicage format files to parquet format

$ pyspark

(Omission)

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.1
      /_/

Using Python version 3.8.2 (default, Aug 25 2020 09:23:57)
SparkSession available as 'spark'.
>>> df = spark.read.csv('./SAMPLE_DATA', header=True, sep=' ')
>>> df.write.save('./SAMPLE_DATA.parquet')

The python environment will be launched with pyspark, so load it again and save it in parquet format this time.

 $ ls SAMPLE_DATA.parquet/
_SUCCESS
part-00000-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet
part-00001-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet
part-00002-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet
part-00003-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet

A folder is created, and a parquet file is created under the folder.

from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName('SparkSample').getOrCreate()
df = spark.read.parquet('./SAMPLE_DATA.parquet')
df.show(5)

I read it in parquet format, but it took more than 30 seconds as before. Even if the number of file lines is 10, it takes the same amount of time, and it seems that it takes a long time to start pyspark.

$ python sample.py 
20/12/24 00:50:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

If you look at the message when the script starts, you can't use the native library ... so maybe it's slow.

I checked it, but it doesn't seem to be easy to solve, so I'm sorry but I'll skip it.

See or update

from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName('SparkSample').getOrCreate()

df = spark.read.parquet('./SAMPLE_DATA.parquet')

#Filter by SQL
df.createOrReplaceTempView('sample_data')
df = spark.sql("select SCAN_CODE,BUNRUI2_CODE from sample_data where BUNRUI2_CODE = '29' limit 5").show()

I was able to refer to it in SQL. Since the schema is not set when creating the parquet, all are STRING type. Be careful when dealing with numeric data.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

spark = SparkSession.builder.appName('SparkSample').getOrCreate()

df = spark.read.parquet('./SAMPLE_DATA.parquet',)

# Conditionally branch and update the value
new_df = df.withColumn('BUNRUI2_CODE', F.when(F.col('BUNRUI2_CODE') == '29','9999').otherwise(F.col('BUNRUI2_CODE')))

I tried to update with SQL, but Spark SQL doesn't seem to have an Update statement. I was able to update it by creating a new column of new values.

new_df.write.save('./SAMPLE_DATA.new.parquet')

Update and save.

Install parquet-tool

Install parquet-tools to view parquet files.

$ brew install parquet-tools

$ parquet-tools cat --json SAMPLE_DATA.parquet/part-00000-1a94dff0-2de3-4ee8-acf0-667643aff8b7-c000.snappy.parquet

It's painful that I can't check it in csv format ...

At the end

This time it was a rudimentary content. In the future, I would like to proceed with a combination such as AWS Glue. For reference purposes, it is preferable to store it in S3 as a parquet rather than storing it in RDS or DynamoDB in terms of cost.

The just-announced AWS Glue Elastic Views is also expected internally as a promising service for uni-cage migration, and we would like to proceed with verification as well.

reference

-I heard about Ryuji Tamagawa's Parquet at "db analytics showcase Sapporo 2018" -Introduction to Spark SQL and DataFrame API | Hadoop Advent Calendar 2016 # 16 -PySpark data manipulation --Qita Spark Dataframe Sample Code Collection --Qiita -Memo of the one often used when handling data with pyspark --Qiita -Installation procedure (on Mac OS X) and how to use the command line tool "parquet-tools" for Parquet files | Developers.IO

Recommended Posts

Handle Parquet in Python
Handle markdown in python
Handle Ambient data in Python
Handle environment variables in Python
Handle complex numbers in Python
Handle posix message queues in python
Handle NetCDF format data in Python
Handle GDS II format in Python
ORC, Parquet file operations in Python
How to handle Japanese in Python
Quadtree in Python --2
Python in optimization
Metaprogramming in Python
Python 3.3 in Anaconda
Geocoding in python
SendKeys in Python
Meta-analysis in Python
Unittest in python
Epoch in Python
Discord in Python
Sudoku in Python
nCr in python
N-Gram in Python
Programming in python
Plink in Python
Lifegame in Python.
FizzBuzz in Python
Sqlite in python
N-gram in python
LINE-Bot [0] in Python
Csv in python
Disassemble in Python
Reflection in Python
Constant in python
nCr in Python.
format in python
Scons in Python3
Puyo Puyo in python
python in virtualenv
PPAP in Python
Quad-tree in Python
Reflection in Python
Chemistry in Python
Hashable in python
DirectLiNGAM in Python
LiNGAM in Python
Flatten in python
flatten in python
Handle multiple python versions in one jupyter
Sorted list in Python
Daily AtCoder # 36 in Python
Daily AtCoder # 2 in Python
Implement Enigma in python
Daily AtCoder # 32 in Python
Daily AtCoder # 18 in Python
Singleton pattern in Python
File operations in Python
Handle signals in C
Key input in Python
Daily AtCoder # 33 in Python
Logistic distribution in Python