[PYTHON] Geben Sie Zaim-Daten mit Logstash in den Amazon Elasticsearch Service ein

Ich forderte mich heraus zu denken, dass ich den Asset-Status leicht grafisch darstellen könnte. Unter den Haushaltsbuchdiensten verfügt Zaim über eine offene API, und Sie können Daten mit json abrufen. Ich habe dies mit Python erworben und versucht, Grafiken zu realisieren, indem ich sie zum Lernen in den Amazon Elasticsearch Service gestellt habe. Außerdem versuche ich, von Elastic erstelltes Logstash zu erstellen, da es eine große Sache ist, eine große Menge Json einzubringen.

Amazon Elasticsearch Service

Von AWS bereitgestellter Elasticsearch-verwalteter Dienst. Der Engpass besteht darin, dass die Version etwas älter als die neueste Version ist, aber es ist eine der einfachsten und schnellsten Möglichkeiten, Elasticsearch zu verwenden. Standardmäßig wird auch Kibana bereitgestellt.

Das Einstellen ist nicht besonders schwierig. Erstellen Sie es daher schnell über die Webkonsole. Die Zugriffssteuerungsrichtlinie ist jetzt per IP zulässig. Da dies nicht nur eine elastische Suche ist, sondern auch die Zugriffssteuerung von Kibana. Wenn beispielsweise Daten von einem Server in der Cloud eingegeben und Kibana auf einem lokalen PC bestätigt wird, muss für jeden die IP angegeben werden.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "es:*",
      "Resource": "arn:aws:es:ap-northeast-1:xxxxx:domain/xxxxx/*",
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": [
            "xx.xx.xx.xx"
          ]
        }
      }
    }
  ]
}

Zaim API via Python

Ich habe Python-Anfragen verwendet, um die Zaim-API zu erreichen. Die Schreibmethode basiert auf dem Anforderungsdokument.

get_zaim.py


# coding: utf-8
import requests
from requests_oauthlib import OAuth1Session
from requests_oauthlib import OAuth1

consumer_key = u"XXXXX"
consumer_secret = u"XXXXX"

request_token_url = u"https://api.zaim.net/v2/auth/request"
authorize_url = u"https://www.zaim.net/users/auth"
access_token_url = u"https://api.zaim.net/v2/auth/access"
callback_uri = u"http://chroju.net/"
get_money_url = u"https://api.zaim.net/v2/home/money"

def oauth_requests():
    auth = OAuth1Session(consumer_key, client_secret=consumer_secret, callback_uri=callback_uri)
    r = auth.fetch_request_token(request_token_url)
    resource_owner_key = r.get('oauth_token')
    resource_owner_secret = r.get('oauth_token_secret')

    authorization_url = auth.authorization_url(authorize_url)
    print 'Please go here and authorize,', authorization_url
    redirect_response = raw_input('Paste the full redirect URL here:')
    oauth_response = auth.parse_authorization_response(redirect_response)
    verifier = oauth_response.get('oauth_verifier')

    auth = OAuth1Session(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=resource_owner_key, resource_owner_secret=resource_owner_secret, verifier=verifier)
    oauth_token = auth.fetch_access_token(access_token_url)

    resource_owner_key = oauth_token.get('oauth_token')
    resource_owner_secret = oauth_token.get('oauth_token_secret')

    get_json(resource_owner_key, resource_owner_secret)

def get_json(resource_owner_key, resource_owner_secret):
    headeroauth = OAuth1(consumer_key, consumer_secret, resource_owner_key, resource_owner_secret, signature_type='auth_header')
    r = requests.get(get_money_url, auth=headeroauth)
    print r.content

if __name__ == "__main__":
  oauth_requests()

Wenn dies ausgeführt wird, werden die Haushaltskontobuchdaten ehrlich mit json ausgegeben. Formatieren Sie sie daher über jq und speichern Sie sie in einer Datei.

$ python27 get_zaim.py | jq > zaim.json

Elasticsearch

mapping

Sobald Sie Amazon ES starten, entspricht die Verwendung danach der normalen elastischen Suche. Führen Sie zunächst eine Zuordnung im Voraus gemäß dem Format der JSON-Datei durch, die mit der zaim-API abgerufen werden kann.

$ curl -XPUT "http://xxx.ap-northeast-1.es.amazonaws.com/lifelog" -d '
{"mappings" : {
"zaim" : {
"properties" : {
"id" : { "type" : "integer"},
"user_id" : { "type" : "integer"},
"date" : { "type" : "date", "format" : "yyyy-MM-dd"},
"mode" : { "type" : "string" },
"category_id" : { "type" : "integer" },
"genre_id" : { "type" : "integer" },
"from_account_id" : {"type" : "integer"},
"to_account_id" : {"type" : "integer"},
"amount" : {"type" : "integer"},
"comment" : {"type" : "string"},
"active" : {"type" : "integer"},
"created" : {"type" : "date", "format" : "yyyy-MM-dd HH:mm:ss"},
"currency_code" : {"type" : "string"},
"name" : {"type" : "string"},
"receipt_id" : {"type" : "integer"},
"place_uid" : {"type" : "integer"},
"place" : {"type" : "string"},
"path" : {"type":"string"}
}
}
}
}'

Stellen Sie sicher, dass die Zuordnungseinstellungen korrekt sind.

$ curl -XGET "http://xxx.ap-northeast-1.es.amazonaws.com/lifelog/_mapping" | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   733  100   733    0     0  58050      0 --:--:-- --:--:-- --:--:-- 61083
{
  "lifelog": {
    "mappings": {
      "zaim": {
        "properties": {
          "@timestamp": {
            "type": "date",
            "format": "dateOptionalTime"
...

logstash

Der wahrscheinlich beste Weg, um mehrere Jsons gleichzeitig in Elasticsearch einzufügen, ist die Verwendung der Bulk-API. Es ist üblich. Der von der Bulk-API eingegebene JSON muss jedoch den Datenteil und den Index abwechselnd beschreiben, sodass er nicht für den JSON geeignet ist, der wie diesmal von der API gemeinsam erfasst wird.

Als alternative Methode gibt es eine Methode, die fluentd usw. verwendet, aber ich habe versucht, ** logstash ** zu verwenden, da diese von Elastic bereitgestellt wird. logstash ist ein Tool zum Analysieren und Verarbeiten von Stream-Daten, die Fluentd ähneln. Es ist nicht speziell für die elastische Suche vorgesehen, und Daten können beispielsweise an Datadog oder influxDB gesendet werden.

Dieses Mal wurde json bereits in die Datei ausgegeben, also habe ich die Datei "cat" und sie mit dem Eingabe-Plugin von stdin eingefügt und Elasticsearch als Ausgabe-Plugin festgelegt. Sie können sich anhand des Dokuments ein Bild davon machen, welche Plugins für die Ein- und Ausgabe vorbereitet sind.

Installation

Installieren Sie mit yum gemäß der Beschreibung in Official.

$ sudo yum install logstash

Erstellen einer Conf-Datei

logstash verwendet die conf-Datei, die den Inhalt beschreibt, den Sie zur Verarbeitung der Daten verarbeiten möchten. Die Datei ist grob in drei Plugin-Einstellungen unterteilt: "Eingabe", die den Datenempfänger beschreibt, "Filter", der die Analysemethode beschreibt, und "Ausgabe", die das Ausgabeziel beschreibt. Die diesmal verwendete conf-Datei lautet wie folgt.

pipeline.conf


input {
  stdin {
    codec => json
  }
}
filter {
  json {
    source => "message"
  }
}
output {
  elasticsearch {
    hosts => ["http://xxx.ap-northeast-1.es.amazonaws.com/"]
    index => "lifelog"
    document_type => "zaim"
    document_id => "%{id}"
  }
  stdout {
    codec => rubydebug
  }
}

"Input" ist ein Standardeingang (stdin plugin) und wird als json interpretiert.

Da die Datei die Eingabequelle ist, ist es möglich, sie mit dem Datei-Plugin anzugeben. Zu diesem Zeitpunkt sollte jedoch beachtet werden, dass die Eingabe nach dem Starten von logstash das Ziel der Verarbeitung ist, dh, das Datei-Plugin wird als Eingabe verwendet. Wenn angegeben, ist der zusätzliche Teil das Verarbeitungsziel. Um den ursprünglich von Anfang an in die Datei geschriebenen Inhalt zu verarbeiten, muss "start_position => begin" angegeben werden. Sobald die Datei gelesen wurde, wird die Leseposition in "~ / .sincedb _..." aufgezeichnet. Wenn diese Datei nicht gelöscht wird, funktioniert "start_position => begin" nicht und wird von der Mitte aus gelesen.

In "filter" wird festgelegt, welcher Teil von json gelesen wird. Die Dateneingabe als "codec => json" wird von json nicht roh verarbeitet, sondern es werden Metaattribute usw. hinzugefügt. Wenn Sie also nur den Datenteil rein extrahieren, geben Sie das Feld "message" und unten explizit an. Es besteht die Notwendigkeit.

Geben Sie unter "Ausgabe" das Plugin für elastische Suche und das Plugin stdout an. Im ersten Fall kann die Dokument-ID angegeben werden, sodass das ID-Feld angegeben wird, das ursprünglich in Zaims JSON-Daten enthalten war. Das stdout-Plugin wurde für Debugging-Zwecke angegeben. Es scheint üblich zu sein, "Codec" auf "Rubydebug" zu setzen.

Lauf

Führen Sie nach dem Erstellen der Konfigurationsdatei tatsächlich logstash aus.

$ cat zaim.json | logstash -f pipeline.conf

Stellen Sie eine Anfrage an Elasticsearch und bestätigen Sie, dass Sie sich registriert haben.

$ curl -XGET "http://xxx.ap-northeast-1.es.amazonaws.com/lifelog/zaim/_count" | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    62  100    62    0     0    384      0 --:--:-- --:--:-- --:--:--   385
{
  "count": 1976,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  }
}

Kibana

Wenn Sie sich erfolgreich bei Elasticsearch registriert haben, können Sie es bereits von Kibana aus anzeigen.

zaim.png

Recommended Posts

Geben Sie Zaim-Daten mit Logstash in den Amazon Elasticsearch Service ein
Speichern von CSV-Daten in Amazon Kinesis Streams mit Standardeingabe
Senden Sie eine Anfrage von AWS Lambda an Amazon Elasticsearch Service
Von der Installation von Elasticsearch bis zur Dateneingabe
Umgang mit unausgeglichenen Daten
Mit Amazon Transcribe erstellte Untertiteldaten
Datenintegration von der Python-App unter Linux zu Amazon Redshift mit ODBC
Datenintegration von der Python-App unter Windows zu Amazon Redshift mit ODBC
Kopieren Sie Daten von Amazon S3 mit Python (boto) in Google Cloud Storage.
Holen Sie sich Lebensmitteldaten mit Amazon API (Python)
Verwenden Sie den Amazon Simple Notification Service mit Python
Konvertieren Sie Excel-Daten mit Python in JSON
Versuchen Sie, mit Pandas in ordentliche Daten umzuwandeln
Wie man Problemdaten mit Paiza liest
Massenbereitstellung mit CFn, um manuelle Schnappschüsse des Elastic Search Service mit Lambda zu erstellen
Erstellen von CSV-Beispieldaten mit Hypothese
Versuchen Sie, Doujin-Musikdaten mit Pandas zu aggregieren
Konvertieren Sie Daten mit Form (Anzahl der Daten, 1) in (Anzahl der Daten,) mit numpy.
Hinweis zum Zeichnen der IP-Adresse mit Kibana + Elastic Search
Ich habe versucht, die Daten mit Zwietracht zu speichern
Ich möchte 100 Datenwissenschaften mit Colaboratory schlagen
Speichern Sie Daten zum Flashen mit STM32 Nucleo Board
So kratzen Sie Pferderenndaten mit Beautiful Soup
[Einführung zur Minimierung] Datenanalyse mit SEIR-Modell ♬
Ich habe versucht, Jupyter mit allen Amazon-Lichtern zu starten