Python and Treasure Data
Unsere Vision bei Treasure Data ist es, Analysten und Entscheidungsträgern die Analyse von Daten zu erleichtern. Unser Service berührt Rohdaten mit SQL. Wenn Sie jedoch kompliziertere Analysen durchführen oder vorhandene Skripte und Tools verwenden möchten, sollten Sie sich auf jeden Fall an R und Python wenden.
In Bezug auf R kann es über den JDBC-Treiber verbunden werden und wird tatsächlich von Kunden verwendet, aber ich habe mich neulich gefragt, wie ich es neulich von Python aus machen soll, und als ich ein wenig nachgeforscht habe, war es schockierend. Die Fakten wurden entdeckt.
~~ Treasure-Daten haben keinen Python-Client! ~~ (UPDATE: ** Hergestellt von unserem Ass Pythonista **)
Ich habe nicht einmal versucht, einen Dritten zu finden. Dies kehrt den Datenwissenschaftlern den Rücken, die Python / Pandas als 18. haben.
Hier wurde jedoch eine weitere schockierende Tatsache entdeckt.
** Ich fand heraus, dass ich das Ergebnis einer Abfrage erhalten konnte, die Daten als Pandas-Datenrahmen wertschätzte, indem ich nur 60 Zeilen Python-Code schrieb. ** ** **
Das folgende Skript wurde in 4 Stunden geschrieben, einschließlich API-Verhaltensprüfung, Python-Überprüfung und Pandas-Lernen. Dies ist auch der REST-API für Treasure Data zu verdanken ... nicht der Anforderungsbibliothek von Python.
td-client-pandas in 60 lines
import requests
import msgpack
from time import sleep
import json
import pandas
class TreasureData:
def __init__(self, apikey):
self._apikey = apikey
self._endpoint = 'https://api.treasuredata.com'
self._base_headers = {"Authorization": "TD1 %s"%self._apikey}
def query(self, database, query, **opts):
job_id = self._query(database, query, **opts)
while True:
job = self._get_job(job_id)
if job["status"] == 'success':
break
sleep(5)
return {"cursor": self.fetch_result(job_id),
"schema": json.loads(job['hive_result_schema'])}
def _get_job(self, job_id):
request_url = "%s/v3/job/show/%d"%(self._endpoint, job_id)
response = requests.get(request_url, headers=self._base_headers)
return response.json()
def fetch_result(self, job_id):
request_url = "%s/v3/job/result/%d"%(self._endpoint, job_id)
response = requests.get(request_url,
params={"format":"msgpack"},
headers=self._base_headers)
unpacker = msgpack.Unpacker()
for chunk in response.iter_content(8192):
unpacker.feed(chunk)
for unpacked in unpacker:
yield unpacked
def _query(self, database, query, **opts):
if opts.has_key("engine") and opts["engine"] == "tqa":
engine = "presto"
else:
engine = "hive"
request_url = "%s/v3/job/issue/%s/%s"%(self._endpoint, engine, database)
response = requests.post(request_url,
params={"query":query},
headers=self._base_headers)
job_id = int(response.json()["job_id"])
return job_id
class TreasureDataConnector:
def __init__(self, td_apikey):
self._td_client = TreasureData(td_apikey)
self._last_job_id = None
def query(self, database, query, **opts):
result = self._td_client.query(database, query, **opts)
columns, _ = zip(*result['schema'])
data = dict(zip(columns, [[] for _ in columns]))
for row in result["cursor"]:
for k, v in zip(columns, row):
data[k].append(v)
return pandas.DataFrame(data)
Eigentlich sieht es so aus.
In [4]: conn = TreasureDataConnector("a05a6256ec6f32949d55271276777f502b53f7a2")
In [5]: df = conn.query('demo', 'select count(1) as foo, host from syslog group by host')
In [6]: df
Out[6]:
foo host
0 1 pulchritude
1 782 pulchritude.local
Yattane!
Result.next()
Um ehrlich zu sein, bin ich eine R- und Excel-Person, daher bin ich mit Pandas nicht sehr vertraut (oder besser gesagt, ich habe Python zum ersten Mal seit 4 Jahren geschrieben). Ich war jedoch in der Lage, dies schnell zu tun, daher möchte ich, dass Pandas-Datenwissenschaftler Treasure Data ausprobieren!
Recommended Posts