(Übersetzung) Native Verbindung von Python zum Hadoop-Dateisystem (HDFS)

EINLEITUNG: Wes McKinney, Autor von Pandas, schrieb einen sehr interessanten Blog über Python-Datentools. Ich fragte, ob ich ihn übersetzen und in der japanischen PyData-Community veröffentlichen könnte. Ich habe das erhalten, also werde ich es nach und nach übersetzen und veröffentlichen.

Übersetzt von: Native Hadoop-Dateisystem (HDFS) -Konnektivität in Python

2017/1/3

Bisher wurden viele Python-Bibliotheken für die Interaktion mit HDFS entwickelt, das auch als Hadoop-Dateisystem bekannt ist. Einige erfolgen über das HDFS-Web-HDFS-Gateway, während andere native Protokollpuffer-basierte RPC-Schnittstellen sind. In diesem Beitrag gebe ich Ihnen einen Überblick über vorhandene Bibliotheken und zeige Ihnen, was ich getan habe, um eine leistungsstarke HDFS-Schnittstelle für die Entwicklung des Ökosystems von Arrow bereitzustellen.

Dieser Blog ist eine Fortsetzung eines Beitrags über die Roadmap 2017.

Hadoop-Dateisystemprotokoll

HDFS ist Teil von Apache Hadoop und basiert ursprünglich auf dem Google-Dateisystem, das im ursprünglichen MapReduce-Dokument beschrieben ist. HDFS verwendet Googles Protokollpuffer (manchmal kurz "Protobufs" genannt) als natives Drahtprotokoll für Remoteprozeduraufrufe oder RPCs.

Systeme, die mit HDFS interagieren, implementieren normalerweise das Messaging-Format und das RPC-Protokoll von Protobuf, ähnlich dem Haupt-Java-Client. WebHDFS wurde entwickelt, um Anwendungen mit geringer Auslastung das Lesen und Schreiben von Dateien zu erleichtern, und bietet ein HTTP- oder HTTPS-Gateway, mit dem PUT- und GET-Anforderungen anstelle von Protobufs RPC verwendet werden können.

Für leichte Anwendungen haben WebHDFS und native Protobufs RPC einen vergleichbaren Datendurchsatz, aber native Konnektivität wird im Allgemeinen als hoch skalierbar und für den produktiven Einsatz geeignet angesehen.

Python hat zwei WebHDFS-Schnittstellen, die ich verwendet habe:

Später in diesem Artikel konzentrieren wir uns auf die native RPC-Client-Oberfläche.

Zugriff von Python mit nativem RPC

Wenn Sie eine native Verbindung zu HDFS aus einer Sprache herstellen möchten, die mit C gut funktioniert, wie Python, besteht die "offizielle" Methode in Apache Hadoop darin, libhdfs zu verwenden. libhdfs ist ein JNI-basierter C-Wrapper für HDFS-Java-Clients. Der Hauptvorteil von libhdfs besteht darin, dass es von großen Hadoop-Anbietern vertrieben und unterstützt wird und Teil des Apache Hadoop-Projekts ist. Der Nachteil ist, dass Sie JNI verwenden (die JVM wird innerhalb eines Python-Prozesses gestartet) und eine vollständige Hadoop-Java-Distribution auf der Clientseite benötigen. Dies ist für einige Kunden eine inakzeptable Bedingung und erfordert im Gegensatz zu anderen Kunden Unterstützung auf Produktionsebene. Beispielsweise verwendet die C ++ - Anwendung Apache Impala (Incubation Project) libhdfs, um auf Daten in HDFS zuzugreifen.

Aufgrund des hohen Gewichts von libhdfs wurden von Natur aus alternative native Schnittstellen zu HDFS entwickelt.

--libhdfs3 ist eine reine C ++ - Bibliothek, die jetzt Teil von Apache HAWQ (Incubation Project) ist. libhdfs3 wurde von Pivotal Labs für die Verwendung in HAWQ auf SQL-on-Hadoop-Systemen entwickelt. Das Schöne an libhdfs3 ist, dass es auf der C-API-Ebene sehr gut mit libhdfs kompatibel ist. Zu einem bestimmten Zeitpunkt war libhdfs3 offiziell wahrscheinlich Teil von Apache Hadoop, aber jetzt ist dies unwahrscheinlich (siehe HDFS-8707, da eine neue C ++ - Bibliothek in Entwicklung ist).

--snakebite: Eine reine Python-Implementierung der von Spotify entwickelten Protobuf-RPC-Schnittstelle von Hadoop.

Da snakebite keine umfassende Client-API bietet (Sie können beispielsweise keine Dateien schreiben) und keine gute Leistung erbringt (nur in Python implementiert), konzentrieren wir uns von nun an auf libhdfs und libhdfs3. Ich werde es weiterhin tun.

Python instar Gesicht zu libhdfs und libhdfs3

Es gab viele Versuche, eine C-Level-Schnittstelle zu libhdfs für die JNI-Bibliothek zu erstellen. Darunter befinden sich cyhdfs (mit Cython), libpyhdfs (normale Python C-Erweiterung) und pyhdfs (mit SWIG). Eine der Herausforderungen beim Erstellen einer C-Erweiterung für libhdfs besteht darin, dass die gemeinsam genutzte Bibliothek libhdfs.so in der Hdoop-Distribution enthalten und verteilt ist. Daher ist $ LD_LIBRARY_PATH zum Laden dieser gemeinsam genutzten Bibliothek geeignet. Ist einzustellen auf. Darüber hinaus muss die libjvm.so der JVM auch während des Imports geladen werden können. Wenn diese Bedingungen kombiniert werden, fallen Sie in die "Einstellungshölle".

Als ich daran dachte, eine C ++ HDFS-Schnittstelle für Apache Arrow (und auch Python über PyArrow) zu erstellen, fand ich eine Implementierung von libhdfs in Turis SFrame-Projekt. Es sollte beides in einem vernünftigen Ansatz finden, wenn JVM und libhdfs zur Laufzeit geladen werden. Ich habe diesen Ansatz mit Arrow gewählt und es hat funktioniert. Durch die Verwendung dieser Implementierung haben die Datenserialisierungstools von Arrow (wie Apache Parquet) einen sehr geringen E / A-Overhead und bieten außerdem eine praktische Python-Dateischnittstelle.

Die C-APIs in den Treiberbibliotheken libhdfs und libhdfs3 sind ziemlich identisch, sodass ich die Treiber gemäß den Schlüsselwortargumenten von Python wechseln konnte.

from pyarrow import HdfsClient

#Verwenden Sie libhdfs
hdfs = HdfsClient(host, port, username, 	driver='libhdfs')

#Verwenden Sie libhdfs3
hdfs_alt = HdfsClient(host, port, username, 	driver='libhdfs3')

with hdfs.open('/path/to/file') as f:
    ...

Parallel dazu haben die Entwickler des Dask-Projekts hdfs3 erstellt, eine reine Python-Schnittstelle zu libhdfs3. Es wurden ctypes verwendet, um die C-Erweiterung zu vermeiden. hdfs3 bietet Zugriff auf andere Funktionen von libhdfs3 sowie eine Python-Dateischnittstelle.

from hdfs3 import HDFileSystem

hdfs = HDFileSystem(host, port, user)
with hdfs.open('/path/to/file', 'rb') as f:
    ...

Datenzugriffsleistung von pyarrow.HdfsClient und hdfs3

Für einen lokalen CDH 5.6.0 HDFS-Cluster habe ich den kollektiven Durchschnitt der Leseleistung für Dateien mit einer Größe von 4 KB bis 100 MB mit drei verschiedenen Einstellungen berechnet.

--hdfs3 (immer libhdfs3 verwenden) --pyarrow.HdfsClient mit driver = 'libhdfs' --pyarrow.HdfsClient mit driver = 'libhdfs3'

Sie können alle diese Pakete wie folgt erhalten:

conda install pyarrow hdfs3 libhdfs3 -c conda-forge

Hinweis: Das pyarrow conda-forge-Paket ist derzeit nur unter Linux verfügbar. Theoretisch sollte dieses Problem am 20. Januar 2017 behoben sein. Bitte lassen Sie uns wissen, ob jemand bei der Windows-Unterstützung helfen kann.

Die Leistungszahl beträgt Megabyte / Sekunde ("Durchsatz"). Der Benchmark-Code befindet sich am Ende dieses Beitrags. Ich bin gespannt, wie dieses Ergebnis in einer größeren Vielfalt von Produktionsumgebungen und Hadoop-Einstellungen aussieht.

HDFS RPC data perflibhdfs_perf_linear.png

Zumindest in meinen Tests habe ich folgende interessante Ergebnisse erzielt:

--Libhdfs zeigte in diesem Test den höchsten Durchsatz, obwohl er auf Java und JNI basiert. --libhdfs3 lief bei kleinen Lesevorgängen nicht gut. Dies kann an der RPC-Latenz oder einem Problem liegen, das mir in den Einstellungen nicht bekannt ist.

Das Folgende ist die logarithmische Zeitachse.

HDFS RPC data perflibhdfs_perf_log.png

Native C ++ - E / A in Apache Arrow

Einer der Gründe für die Erstellung von HDFS-ähnlichen E / A-Schnittstellen innerhalb der Pyarrow-Bibliothek besteht darin, dass alle eine gemeinsame Ebene der Speicherverwaltung verwenden und einen sehr geringen (möglicherweise null) Kopieraufwand haben. , Weil Sie die Daten übergeben können. Andererseits verursacht eine Bibliothek, die nur die Python-Dateischnittstelle verfügbar macht, einen gewissen Overhead, da der Speicher vom Byte-String-Objekt im Python-Interpreter verarbeitet wird.

Die Details des C ++ - E / A-Systems von Arrow gehen über den Rahmen dieses Artikels hinaus, aber ich werde in Zukunft in diesem Blog darüber berichten.

Bankcode

import gc
import random
import time
import pyarrow as pa
import hdfs3
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

DATA_SIZE = 200 * (1 << 20)
data = 'a' * DATA_SIZE

hdfs = pa.HdfsClient('localhost', 20500, 'wesm')
hdfscpp = pa.HdfsClient('localhost', 20500, 'wesm', driver='libhdfs3')
hdfs3_fs = hdfs3.HDFileSystem('localhost', port=20500, user='wesm')

hdfs.delete(path)
path = '/tmp/test-data-file-1'
with hdfs.open(path, 'wb') as f:
    f.write(data)

def read_chunk(f, size):
    # do a random seek
    f.seek(random.randint(0, size))
    return f.read(size)

def ensemble_average(runner, niter=10):
    start = time.clock()
    gc.disable()
    data_chunks = []
    for i in range(niter):
        data_chunks.append(runner())
    elapsed = (time.clock() - start) / niter
    gc.enable()
    return elapsed

def make_test_func(fh, chunksize):
    def runner():
        return read_chunk(fh, chunksize)
    return runner

KB = 1024
MB = 1024 * KB
chunksizes = [4 * KB, MB, 10 * MB, 100 * MB]
iterations = [100, 100, 100, 10]

handles = {
    ('pyarrow', 'libhdfs'): hdfs.open(path),
    ('pyarrow', 'libhdfs3'): hdfscpp.open(path),
    ('hdfs3', 'libhdfs3'): hdfs3_fs.open(path, 'rb')
}

timings = []
for (library, driver), handle in handles.items():
    for chunksize, niter in zip(chunksizes, iterations):
        tester = make_test_func(handle, chunksize)
        timing = ensemble_average(tester, niter=niter)
        throughput = chunksize / timing

        result = (library, driver, chunksize, timing, throughput)
        print(result)
        timings.append(result)

results = pd.DataFrame.from_records(timings, columns=['library', 'driver', 'read_size', 'timing', 'throughput'])
results['MB/s'] = results['throughput'] / MB
results
results['type'] = results['library'] + '+' + results['driver']
	
plt.figure(figsize=(12, 6))
g = sns.factorplot(y='read_size', x='MB/s', hue='type', data=results, kind='bar', orient='h', size=(10))
g.despine(left=True)
#g.fig.get_axes()[0].set_xscale('log', basex=2)
g.fig.set_size_inches(12, 4)

plt.savefig('results2.png')

Recommended Posts

(Übersetzung) Native Verbindung von Python zum Hadoop-Dateisystem (HDFS)
Excel-Datei aus Python importieren (in DB registriert)
Von der Datei zur Diagrammzeichnung in Python. Grundstufe Grundstufe
Änderungen von Python 3.0 zu Python 3.5
Änderungen von Python 2 zu Python 3.0
[Python] Ändern Sie die Standardeingabe von der Tastatur in eine Textdatei
[Python] Konvertierung von WGS84 in ein ebenes orthogonales Koordinatensystem
Verfahren zum Konvertieren einer Python-Datei in eine Exe aus der Ubunts-Umgebungskonstruktion
Python-Skript, das eine JSON-Datei aus einer CSV-Datei erstellt
Post von Python nach Slack
Flirte von PHP nach Python
[Für Anfänger] Skript innerhalb von 10 Zeilen (4. Verbindung von Python zu sqlite3)
Wechseln Sie von Python2.7 zu Python3.6 (centos7)
Stellen Sie von Python aus eine Verbindung zu SQLite her
Python OCR System Erhöhen Sie Zeichen aus Bildern, um die Arbeitseffizienz zu verbessern
Führen Sie das Python-Skript aus der Batchdatei aus
[Python] Mit Python in eine CSV-Datei schreiben
Ausgabe in eine CSV-Datei mit Python
[Lambda] [Python] Von Lambda auf Twitter posten!
Stellen Sie von Python aus eine Verbindung zur utf8mb4-Datenbank her
Python (vom ersten Mal bis zur Ausführung)
Poste ein Bild von Python auf Tumblr
So greifen Sie über Python auf Wikipedia zu
Python, um von einer anderen Sprache zu wechseln
Datei-Upload in Azure Storage (Python)
[Einführung in Python3 Tag 21] Kapitel 10 System (10.1 bis 10.5)
Hat sich nicht von Python 2 auf 3 geändert
Aktualisieren Sie Mac Python von 2 auf 3
[Python] So konvertieren Sie eine Datenbankdatei in CSV
[Python] Fluidsimulation: Von linear zu nichtlinear
Von Python bis zur Verwendung von MeCab (und CaboCha)
[Übersetzung] Vignette retikulieren: R zu Python-Schnittstelle
Skript zum Generieren eines Verzeichnisses aus einer JSON-Datei
So aktualisieren Sie Google Sheets von Python
[Python] Konvertieren Sie CSV-Dateibegrenzer in Tabulatortrennzeichen
Konvertieren Sie die psd-Datei in Python in png
Ich möchte ein Glas aus Python verwenden
Herstellen einer Verbindung von Python zu MySQL unter CentOS 6.4
Zugriff auf RDS von Lambda (Python)
Erstellen Sie eine Deb-Datei aus einem Python-Paket
Python> Ausgaben von 1 bis 100, 501 bis 600> Für CSV
In Python von Markdown in HTML konvertieren
API-Erklärung zum Berühren von Mastodon aus Python
Stellen Sie von Python aus eine Verbindung zur Websocket-API von coincheck her