EINLEITUNG: Wes McKinney, Autor von Pandas, schrieb einen sehr interessanten Blog über Python-Datentools. Ich fragte, ob ich ihn übersetzen und in der japanischen PyData-Community veröffentlichen könnte. Ich habe das erhalten, also werde ich es nach und nach übersetzen und veröffentlichen.
Übersetzt von: Native Hadoop-Dateisystem (HDFS) -Konnektivität in Python
2017/1/3
Bisher wurden viele Python-Bibliotheken für die Interaktion mit HDFS entwickelt, das auch als Hadoop-Dateisystem bekannt ist. Einige erfolgen über das HDFS-Web-HDFS-Gateway, während andere native Protokollpuffer-basierte RPC-Schnittstellen sind. In diesem Beitrag gebe ich Ihnen einen Überblick über vorhandene Bibliotheken und zeige Ihnen, was ich getan habe, um eine leistungsstarke HDFS-Schnittstelle für die Entwicklung des Ökosystems von Arrow bereitzustellen.
Dieser Blog ist eine Fortsetzung eines Beitrags über die Roadmap 2017.
HDFS ist Teil von Apache Hadoop und basiert ursprünglich auf dem Google-Dateisystem, das im ursprünglichen MapReduce-Dokument beschrieben ist. HDFS verwendet Googles Protokollpuffer (manchmal kurz "Protobufs" genannt) als natives Drahtprotokoll für Remoteprozeduraufrufe oder RPCs.
Systeme, die mit HDFS interagieren, implementieren normalerweise das Messaging-Format und das RPC-Protokoll von Protobuf, ähnlich dem Haupt-Java-Client. WebHDFS wurde entwickelt, um Anwendungen mit geringer Auslastung das Lesen und Schreiben von Dateien zu erleichtern, und bietet ein HTTP- oder HTTPS-Gateway, mit dem PUT- und GET-Anforderungen anstelle von Protobufs RPC verwendet werden können.
Für leichte Anwendungen haben WebHDFS und native Protobufs RPC einen vergleichbaren Datendurchsatz, aber native Konnektivität wird im Allgemeinen als hoch skalierbar und für den produktiven Einsatz geeignet angesehen.
Python hat zwei WebHDFS-Schnittstellen, die ich verwendet habe:
Später in diesem Artikel konzentrieren wir uns auf die native RPC-Client-Oberfläche.
Wenn Sie eine native Verbindung zu HDFS aus einer Sprache herstellen möchten, die mit C gut funktioniert, wie Python, besteht die "offizielle" Methode in Apache Hadoop darin, libhdfs zu verwenden. libhdfs ist ein JNI-basierter C-Wrapper für HDFS-Java-Clients. Der Hauptvorteil von libhdfs besteht darin, dass es von großen Hadoop-Anbietern vertrieben und unterstützt wird und Teil des Apache Hadoop-Projekts ist. Der Nachteil ist, dass Sie JNI verwenden (die JVM wird innerhalb eines Python-Prozesses gestartet) und eine vollständige Hadoop-Java-Distribution auf der Clientseite benötigen. Dies ist für einige Kunden eine inakzeptable Bedingung und erfordert im Gegensatz zu anderen Kunden Unterstützung auf Produktionsebene. Beispielsweise verwendet die C ++ - Anwendung Apache Impala (Incubation Project) libhdfs, um auf Daten in HDFS zuzugreifen.
Aufgrund des hohen Gewichts von libhdfs wurden von Natur aus alternative native Schnittstellen zu HDFS entwickelt.
--libhdfs3 ist eine reine C ++ - Bibliothek, die jetzt Teil von Apache HAWQ (Incubation Project) ist. libhdfs3 wurde von Pivotal Labs für die Verwendung in HAWQ auf SQL-on-Hadoop-Systemen entwickelt. Das Schöne an libhdfs3 ist, dass es auf der C-API-Ebene sehr gut mit libhdfs kompatibel ist. Zu einem bestimmten Zeitpunkt war libhdfs3 offiziell wahrscheinlich Teil von Apache Hadoop, aber jetzt ist dies unwahrscheinlich (siehe HDFS-8707, da eine neue C ++ - Bibliothek in Entwicklung ist).
--snakebite: Eine reine Python-Implementierung der von Spotify entwickelten Protobuf-RPC-Schnittstelle von Hadoop.
Da snakebite keine umfassende Client-API bietet (Sie können beispielsweise keine Dateien schreiben) und keine gute Leistung erbringt (nur in Python implementiert), konzentrieren wir uns von nun an auf libhdfs und libhdfs3. Ich werde es weiterhin tun.
Es gab viele Versuche, eine C-Level-Schnittstelle zu libhdfs für die JNI-Bibliothek zu erstellen. Darunter befinden sich cyhdfs (mit Cython), libpyhdfs (normale Python C-Erweiterung) und pyhdfs (mit SWIG). Eine der Herausforderungen beim Erstellen einer C-Erweiterung für libhdfs besteht darin, dass die gemeinsam genutzte Bibliothek libhdfs.so in der Hdoop-Distribution enthalten und verteilt ist. Daher ist $ LD_LIBRARY_PATH zum Laden dieser gemeinsam genutzten Bibliothek geeignet. Ist einzustellen auf. Darüber hinaus muss die libjvm.so der JVM auch während des Imports geladen werden können. Wenn diese Bedingungen kombiniert werden, fallen Sie in die "Einstellungshölle".
Als ich daran dachte, eine C ++ HDFS-Schnittstelle für Apache Arrow (und auch Python über PyArrow) zu erstellen, fand ich eine Implementierung von libhdfs in Turis SFrame-Projekt. Es sollte beides in einem vernünftigen Ansatz finden, wenn JVM und libhdfs zur Laufzeit geladen werden. Ich habe diesen Ansatz mit Arrow gewählt und es hat funktioniert. Durch die Verwendung dieser Implementierung haben die Datenserialisierungstools von Arrow (wie Apache Parquet) einen sehr geringen E / A-Overhead und bieten außerdem eine praktische Python-Dateischnittstelle.
Die C-APIs in den Treiberbibliotheken libhdfs und libhdfs3 sind ziemlich identisch, sodass ich die Treiber gemäß den Schlüsselwortargumenten von Python wechseln konnte.
from pyarrow import HdfsClient
#Verwenden Sie libhdfs
hdfs = HdfsClient(host, port, username, driver='libhdfs')
#Verwenden Sie libhdfs3
hdfs_alt = HdfsClient(host, port, username, driver='libhdfs3')
with hdfs.open('/path/to/file') as f:
...
Parallel dazu haben die Entwickler des Dask-Projekts hdfs3 erstellt, eine reine Python-Schnittstelle zu libhdfs3. Es wurden ctypes verwendet, um die C-Erweiterung zu vermeiden. hdfs3 bietet Zugriff auf andere Funktionen von libhdfs3 sowie eine Python-Dateischnittstelle.
from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host, port, user)
with hdfs.open('/path/to/file', 'rb') as f:
...
Für einen lokalen CDH 5.6.0 HDFS-Cluster habe ich den kollektiven Durchschnitt der Leseleistung für Dateien mit einer Größe von 4 KB bis 100 MB mit drei verschiedenen Einstellungen berechnet.
--hdfs3 (immer libhdfs3 verwenden) --pyarrow.HdfsClient mit driver = 'libhdfs' --pyarrow.HdfsClient mit driver = 'libhdfs3'
Sie können alle diese Pakete wie folgt erhalten:
conda install pyarrow hdfs3 libhdfs3 -c conda-forge
Hinweis: Das pyarrow conda-forge-Paket ist derzeit nur unter Linux verfügbar. Theoretisch sollte dieses Problem am 20. Januar 2017 behoben sein. Bitte lassen Sie uns wissen, ob jemand bei der Windows-Unterstützung helfen kann.
Die Leistungszahl beträgt Megabyte / Sekunde ("Durchsatz"). Der Benchmark-Code befindet sich am Ende dieses Beitrags. Ich bin gespannt, wie dieses Ergebnis in einer größeren Vielfalt von Produktionsumgebungen und Hadoop-Einstellungen aussieht.
HDFS RPC data perf
Zumindest in meinen Tests habe ich folgende interessante Ergebnisse erzielt:
--Libhdfs zeigte in diesem Test den höchsten Durchsatz, obwohl er auf Java und JNI basiert. --libhdfs3 lief bei kleinen Lesevorgängen nicht gut. Dies kann an der RPC-Latenz oder einem Problem liegen, das mir in den Einstellungen nicht bekannt ist.
Das Folgende ist die logarithmische Zeitachse.
HDFS RPC data perf
Einer der Gründe für die Erstellung von HDFS-ähnlichen E / A-Schnittstellen innerhalb der Pyarrow-Bibliothek besteht darin, dass alle eine gemeinsame Ebene der Speicherverwaltung verwenden und einen sehr geringen (möglicherweise null) Kopieraufwand haben. , Weil Sie die Daten übergeben können. Andererseits verursacht eine Bibliothek, die nur die Python-Dateischnittstelle verfügbar macht, einen gewissen Overhead, da der Speicher vom Byte-String-Objekt im Python-Interpreter verarbeitet wird.
Die Details des C ++ - E / A-Systems von Arrow gehen über den Rahmen dieses Artikels hinaus, aber ich werde in Zukunft in diesem Blog darüber berichten.
Bankcode
import gc
import random
import time
import pyarrow as pa
import hdfs3
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
DATA_SIZE = 200 * (1 << 20)
data = 'a' * DATA_SIZE
hdfs = pa.HdfsClient('localhost', 20500, 'wesm')
hdfscpp = pa.HdfsClient('localhost', 20500, 'wesm', driver='libhdfs3')
hdfs3_fs = hdfs3.HDFileSystem('localhost', port=20500, user='wesm')
hdfs.delete(path)
path = '/tmp/test-data-file-1'
with hdfs.open(path, 'wb') as f:
f.write(data)
def read_chunk(f, size):
# do a random seek
f.seek(random.randint(0, size))
return f.read(size)
def ensemble_average(runner, niter=10):
start = time.clock()
gc.disable()
data_chunks = []
for i in range(niter):
data_chunks.append(runner())
elapsed = (time.clock() - start) / niter
gc.enable()
return elapsed
def make_test_func(fh, chunksize):
def runner():
return read_chunk(fh, chunksize)
return runner
KB = 1024
MB = 1024 * KB
chunksizes = [4 * KB, MB, 10 * MB, 100 * MB]
iterations = [100, 100, 100, 10]
handles = {
('pyarrow', 'libhdfs'): hdfs.open(path),
('pyarrow', 'libhdfs3'): hdfscpp.open(path),
('hdfs3', 'libhdfs3'): hdfs3_fs.open(path, 'rb')
}
timings = []
for (library, driver), handle in handles.items():
for chunksize, niter in zip(chunksizes, iterations):
tester = make_test_func(handle, chunksize)
timing = ensemble_average(tester, niter=niter)
throughput = chunksize / timing
result = (library, driver, chunksize, timing, throughput)
print(result)
timings.append(result)
results = pd.DataFrame.from_records(timings, columns=['library', 'driver', 'read_size', 'timing', 'throughput'])
results['MB/s'] = results['throughput'] / MB
results
results['type'] = results['library'] + '+' + results['driver']
plt.figure(figsize=(12, 6))
g = sns.factorplot(y='read_size', x='MB/s', hue='type', data=results, kind='bar', orient='h', size=(10))
g.despine(left=True)
#g.fig.get_axes()[0].set_xscale('log', basex=2)
g.fig.set_size_inches(12, 4)
plt.savefig('results2.png')
Recommended Posts