(Translation) Native connection from Python to Hadoop file system (HDFS)

INTRODUCTION: Wes McKinney, the author of pandas, wrote a very interesting blog about Python data tools, so I asked if I could translate it and publish it to the Japanese PyData community. I received this, so I will translate it little by little and publish it.

Translated by: Native Hadoop file system (HDFS) connectivity in Python

2017/1/3

So far, many Python libraries have been developed for interacting with HDFS, also known as the Hadoop File System. Some are via the HDFS Web HDFS gateway, while others are native Protocol Buffer-based RPC interfaces. In this post, I'll give you an overview of existing libraries and show you what I've done to provide a high-performance HDFS interface in Arrow's ecosystem development.

This blog is a follow-up to a post on the 2017 roadmap.

Hadoop file system protocol

HDFS is part of Apache Hadoop, and its design was originally based on the Google File System described in the original MapReduce paper. HDFS uses Google's Protocol Buffers (sometimes called "protobufs" for short) as a native wire protocol for remote procedure calls, or RPCs.

Systems that interact with HDFS will typically implement Protobuf's messaging format and RPC protocol, similar to the main Java client. WebHDFS was developed to make it easier for low-load applications to read and write files, and it provides an HTTP or HTTPS gateway that allows PUT and GET requests to be used instead of protobufs RPC.

For light-duty applications, WebHDFS and native protobufs RPC have comparable data throughput, but native connectivity is generally considered to be highly scalable and suitable for production use.

Python has two WebHDFS interfaces that I've used:

Later in this article, we'll focus on the native RPC client interface.

Access from Python with native RPC

If you want to connect to HDFS in a native way from a language that works well with C, like Python, the "official" way in Apache Hadoop is to use libhdfs. libhdfs is a JNI-based C wrapper for HDFS Java clients. The main benefit of libhdfs is that it is distributed and supported by major Hadoop vendors and is part of the Apache Hadoop project. The downside is that you are using JNI (the JVM is launched from within a Python process) and you need a complete Hadoop Java distribution on the client side. This is an unacceptable condition for some clients, and unlike other clients, it requires production-level support. For example, the C ++ application Apache Impala (incubation project) uses libhdfs to access data on HDFS.

Due to the heavy nature of libhdfs, alternative native interfaces to HDFS have been developed.

--libhdfs3 is a purely C ++ library that is now part of the Apache HAWQ (Incubation Project). libhdfs3 was developed by Pivotal Labs for use in HAWQ on SQL-on-Hadoop systems. The nice thing about libhdfs3 is that it is highly compatible with libhdfs at the C API level. At one point libhdfs3 was officially likely to be part of Apache Hadoop, but now it's unlikely (see HDFS-8707 as a new C ++ library is under development).

--snakebite: A pure Python implementation of Hadoop's protobuf RPC interface, developed by Spotify.

Snakebite doesn't provide a comprehensive client API (for example, you can't write files) and it doesn't perform well (implemented purely in Python), so from now on we'll focus on libhdfs and libhdfs3. I will continue to do it.

Python instar face to libhdfs and libhdfs3

There have been many attempts to build a C-level interface to the JNI library's libhdfs. Among them are cyhdfs (using Cython), libpyhdfs (normal Python C extension), and pyhdfs (using SWIG). One of the challenges in building a C extension to libhdfs is that the shared library libhdfs.so is included in the Hdoop distribution and is distributed, so $ LD_LIBRARY_PATH is appropriate to load this shared library. Is to be set to. In addition, the JVM's libjvm.so must also be able to be loaded during the import. When these conditions are combined, you will fall into the "setting hell".

When I was thinking of building a C ++ HDFS interface for use with Apache Arrow (and also Python via PyArrow), I found an implementation of libhdfs in Turi's SFrame project. It was supposed to find both in a wise approach when loading the JVM and libhdfs at runtime. I took this approach with Arrow and it worked. Using this implementation, Arrow's data serialization tools (like Apache Parquet) have very low I / O overhead and also provide a convenient Python file interface.

The C APIs in the libhdfs and libhdfs3 driver libraries are pretty much the same, so I was able to switch drivers according to Python's keyword arguments.

from pyarrow import HdfsClient

#Use libhdfs
hdfs = HdfsClient(host, port, username, 	driver='libhdfs')

#Use libhdfs3
hdfs_alt = HdfsClient(host, port, username, 	driver='libhdfs3')

with hdfs.open('/path/to/file') as f:
    ...

In parallel with this, the developers of the Dask project created hdfs3, a pure Python interface to libhdfs3. It used ctypes to avoid C extensions. hdfs3 provides access to other features of libhdfs3 as well as a Python file interface.

from hdfs3 import HDFileSystem

hdfs = HDFileSystem(host, port, user)
with hdfs.open('/path/to/file', 'rb') as f:
    ...

Data access performance of pyarrow.HdfsClient and hdfs3

For a local CDH 5.6.0 HDFS cluster, I calculated the collective mean of read performance for files sized from 4KB to 100MB with three different settings.

--hdfs3 (always use libhdfs3) --pyarrow.HdfsClient with driver ='libhdfs' --pyarrow.HdfsClient with driver ='libhdfs3'

You can get all of these packages by doing the following:

conda install pyarrow hdfs3 libhdfs3 -c conda-forge

Note: The pyarrow conda-forge package is currently only available on Linux. In theory, this issue should have been resolved on January 20, 2017. Please let us know if anyone can help with Windows support.

The performance number is megabytes / second ("throughput"). Benchmark code is at the end of this post. I'm curious to see what this result looks like in a wider variety of production environments and Hadoop settings.

HDFS RPC data perflibhdfs_perf_linear.png

At least in the tests I did, I got the following interesting results:

--Libhdfs showed the highest throughput in this test, even though it is Java and JNI based. --libhdfs3 did not perform well for small size reads. This may be due to RPC latency or a problem I'm not aware of in the settings. --Strictly compared to libhdfs3, pyarrow is about hdfs310-15% higher. This seems to be mainly due to memory handling / copying due to the difference between ctypes (hdfs3) and C ++ (pyarrow).

The following is the logarithmic axis of time.

HDFS RPC data perflibhdfs_perf_log.png

Native C ++ I / O in Apache Arrow

One of the reasons for building HDFS-like I / O interfaces within the pyarrow library is that they all use a common layer of memory management and only have very low (possibly zero) copy overhead. , Because you can pass the data. On the other hand, a library that exposes only the Python file interface incurs some overhead because the memory is processed by the byte string object in the Python interpreter.

The details of Arrow's C ++ I / O system are beyond the scope of this article, but I'll post about it on this blog in the future.

Benchmark code

import gc
import random
import time
import pyarrow as pa
import hdfs3
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

DATA_SIZE = 200 * (1 << 20)
data = 'a' * DATA_SIZE

hdfs = pa.HdfsClient('localhost', 20500, 'wesm')
hdfscpp = pa.HdfsClient('localhost', 20500, 'wesm', driver='libhdfs3')
hdfs3_fs = hdfs3.HDFileSystem('localhost', port=20500, user='wesm')

hdfs.delete(path)
path = '/tmp/test-data-file-1'
with hdfs.open(path, 'wb') as f:
    f.write(data)

def read_chunk(f, size):
    # do a random seek
    f.seek(random.randint(0, size))
    return f.read(size)

def ensemble_average(runner, niter=10):
    start = time.clock()
    gc.disable()
    data_chunks = []
    for i in range(niter):
        data_chunks.append(runner())
    elapsed = (time.clock() - start) / niter
    gc.enable()
    return elapsed

def make_test_func(fh, chunksize):
    def runner():
        return read_chunk(fh, chunksize)
    return runner

KB = 1024
MB = 1024 * KB
chunksizes = [4 * KB, MB, 10 * MB, 100 * MB]
iterations = [100, 100, 100, 10]

handles = {
    ('pyarrow', 'libhdfs'): hdfs.open(path),
    ('pyarrow', 'libhdfs3'): hdfscpp.open(path),
    ('hdfs3', 'libhdfs3'): hdfs3_fs.open(path, 'rb')
}

timings = []
for (library, driver), handle in handles.items():
    for chunksize, niter in zip(chunksizes, iterations):
        tester = make_test_func(handle, chunksize)
        timing = ensemble_average(tester, niter=niter)
        throughput = chunksize / timing

        result = (library, driver, chunksize, timing, throughput)
        print(result)
        timings.append(result)

results = pd.DataFrame.from_records(timings, columns=['library', 'driver', 'read_size', 'timing', 'throughput'])
results['MB/s'] = results['throughput'] / MB
results
results['type'] = results['library'] + '+' + results['driver']
	
plt.figure(figsize=(12, 6))
g = sns.factorplot(y='read_size', x='MB/s', hue='type', data=results, kind='bar', orient='h', size=(10))
g.despine(left=True)
#g.fig.get_axes()[0].set_xscale('log', basex=2)
g.fig.set_size_inches(12, 4)

plt.savefig('results2.png')

Recommended Posts

(Translation) Native connection from Python to Hadoop file system (HDFS)
Import Excel file from Python (register to DB)
From file to graph drawing in Python. Elementary elementary
Changes from Python 3.0 to Python 3.5
Changes from Python 2 to Python 3.0
[Python] Change standard input from keyboard to text file
[Python] Conversion from WGS84 to plane orthogonal coordinate system
Procedure to exe python file from Ubunts environment construction
Python script to create a JSON file from a CSV file
Post from Python to Slack
Cheating from PHP to Python
[For beginners] Script within 10 lines (4. Connection from python to sqlite3)
Switch from python2.7 to python3.6 (centos7)
Connect to sqlite from python
Python OCR System Raise characters from images to improve work efficiency
Execute Python script from batch file
[Python] Write to csv file with Python
Create folders from '01' to '12' with python
Output to csv file with Python
[Lambda] [Python] Post to Twitter from Lambda!
Connect to utf8mb4 database from python
Python (from first time to execution)
Post images from Python to Tumblr
How to access wikipedia from python
Python to switch from another language
File upload to Azure Storage (Python)
[Introduction to Python3 Day 21] Chapter 10 System (10.1 to 10.5)
Did not change from Python 2 to 3
Update Python on Mac from 2 to 3
[Python] How to convert db file to csv
[Python] Fluid simulation: From linear to non-linear
From Python to using MeCab (and CaboCha)
[Translation] reticulate vignette: R to Python interface
Script to generate directory from json file
How to update Google Sheets from Python
[Python] Convert csv file delimiters to tab delimiters
Send a message from Python to Slack
Convert psd file to png in Python
I want to use jar from python
Connecting from python to MySQL on CentOS 6.4
How to access RDS from Lambda (python)
Create a deb file from a python package
Python> Output numbers from 1 to 100, 501 to 600> For csv
Convert from Markdown to HTML in Python
API explanation to touch mastodon from python
Connect to coincheck's Websocket API from Python
It's easier to iterate over a python file from a command prompt (cmd) .bat