[PYTHON] Entrez les données Zaim dans Amazon Elasticsearch Service avec Logstash

Je me suis mis au défi en pensant que je pourrais facilement représenter graphiquement l'état des actifs. Parmi les services de livre de comptes des ménages, Zaim a une API ouverte, et vous pouvez obtenir des données avec json. J'ai acquis cela en utilisant Python et j'ai essayé de réaliser des graphiques en les mettant dans Amazon Elasticsearch Service pour les étudier. De plus, j'essaie de logstash créé par Elastic car c'est un gros problème de mettre une grande quantité de json.

Amazon Elasticsearch Service

Service géré Elasticsearch fourni par AWS. Le goulot d'étranglement est que la version est légèrement plus ancienne que la dernière version, mais c'est l'un des moyens les plus simples et les plus rapides d'utiliser Elasticsearch. Kibana est également fourni par défaut.

Il n'y a rien de particulièrement difficile à configurer, alors créez-le rapidement à partir de la console Web. La politique de contrôle d'accès est désormais autorisée par IP. Comme il ne s'agit pas seulement de la recherche élastique, mais également du contrôle d'accès de Kibana, par exemple, si les données sont entrées depuis un serveur sur le cloud et que Kibana est confirmée sur un PC local, il est nécessaire de spécifier l'IP pour chacun.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "es:*",
      "Resource": "arn:aws:es:ap-northeast-1:xxxxx:domain/xxxxx/*",
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": [
            "xx.xx.xx.xx"
          ]
        }
      }
    }
  ]
}

Zaim API via Python

J'ai utilisé des requêtes Python pour accéder à l'API Zaim. La méthode d'écriture est basée sur le document de requêtes.

get_zaim.py


# coding: utf-8
import requests
from requests_oauthlib import OAuth1Session
from requests_oauthlib import OAuth1

consumer_key = u"XXXXX"
consumer_secret = u"XXXXX"

request_token_url = u"https://api.zaim.net/v2/auth/request"
authorize_url = u"https://www.zaim.net/users/auth"
access_token_url = u"https://api.zaim.net/v2/auth/access"
callback_uri = u"http://chroju.net/"
get_money_url = u"https://api.zaim.net/v2/home/money"

def oauth_requests():
    auth = OAuth1Session(consumer_key, client_secret=consumer_secret, callback_uri=callback_uri)
    r = auth.fetch_request_token(request_token_url)
    resource_owner_key = r.get('oauth_token')
    resource_owner_secret = r.get('oauth_token_secret')

    authorization_url = auth.authorization_url(authorize_url)
    print 'Please go here and authorize,', authorization_url
    redirect_response = raw_input('Paste the full redirect URL here:')
    oauth_response = auth.parse_authorization_response(redirect_response)
    verifier = oauth_response.get('oauth_verifier')

    auth = OAuth1Session(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=resource_owner_key, resource_owner_secret=resource_owner_secret, verifier=verifier)
    oauth_token = auth.fetch_access_token(access_token_url)

    resource_owner_key = oauth_token.get('oauth_token')
    resource_owner_secret = oauth_token.get('oauth_token_secret')

    get_json(resource_owner_key, resource_owner_secret)

def get_json(resource_owner_key, resource_owner_secret):
    headeroauth = OAuth1(consumer_key, consumer_secret, resource_owner_key, resource_owner_secret, signature_type='auth_header')
    r = requests.get(get_money_url, auth=headeroauth)
    print r.content

if __name__ == "__main__":
  oauth_requests()

Lorsque cela est exécuté, les données du livre de comptes du ménage seront affichées honnêtement avec json, donc formatez-les via jq et enregistrez-les dans un fichier.

$ python27 get_zaim.py | jq > zaim.json

Elasticsearch

mapping

Une fois que vous lancez Amazon ES, l'utilisation est la même que celle de la recherche Elastic normale. Commencez par effectuer le mappage à l'avance en fonction du format du fichier json qui peut être obtenu avec l'API zaim.

$ curl -XPUT "http://xxx.ap-northeast-1.es.amazonaws.com/lifelog" -d '
{"mappings" : {
"zaim" : {
"properties" : {
"id" : { "type" : "integer"},
"user_id" : { "type" : "integer"},
"date" : { "type" : "date", "format" : "yyyy-MM-dd"},
"mode" : { "type" : "string" },
"category_id" : { "type" : "integer" },
"genre_id" : { "type" : "integer" },
"from_account_id" : {"type" : "integer"},
"to_account_id" : {"type" : "integer"},
"amount" : {"type" : "integer"},
"comment" : {"type" : "string"},
"active" : {"type" : "integer"},
"created" : {"type" : "date", "format" : "yyyy-MM-dd HH:mm:ss"},
"currency_code" : {"type" : "string"},
"name" : {"type" : "string"},
"receipt_id" : {"type" : "integer"},
"place_uid" : {"type" : "integer"},
"place" : {"type" : "string"},
"path" : {"type":"string"}
}
}
}
}'

Vérifiez que les paramètres de mappage sont corrects.

$ curl -XGET "http://xxx.ap-northeast-1.es.amazonaws.com/lifelog/_mapping" | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   733  100   733    0     0  58050      0 --:--:-- --:--:-- --:--:-- 61083
{
  "lifelog": {
    "mappings": {
      "zaim": {
        "properties": {
          "@timestamp": {
            "type": "date",
            "format": "dateOptionalTime"
...

logstash

La meilleure façon de mettre plusieurs jsons dans Elasticsearch en même temps est probablement d'utiliser l 'API en masse. Il est courant. Cependant, le json qui est entré par l'API en bloc doit décrire la partie données et l'index en alternance, il n'est donc pas approprié pour le json qui est collectivement acquis par l'API comme cette fois.

Comme méthode alternative, il existe une méthode utilisant fluentd etc., mais j'ai essayé d'utiliser ** logstash ** car il est fourni par Elastic. logstash est un outil d'analyse et de traitement des données de flux similaire à fluentd. Ce n'est pas spécifiquement pour la recherche Elastic, et les données peuvent être envoyées à Datadog ou influxDB, par exemple.

Cette fois, json a déjà été sorti dans le fichier, donc j'ai `catʻed le fichier et l'ai mis avec le plugin d'entrée de stdin, et ai défini Elasticsearch comme plugin de sortie. Vous pouvez avoir une idée du type de plugins préparés pour l'entrée et la sortie en regardant le document.

Installation

Installez avec yum selon la description de Officiel.

$ sudo yum install logstash

Créer un fichier conf

logstash utilise le fichier conf qui décrit le contenu que vous souhaitez traiter pour traiter les données. Le fichier est à peu près divisé en trois paramètres de plug-in: «entrée» qui décrit le récepteur de données, «filtre» qui décrit la méthode d'analyse et «sortie» qui décrit la destination de sortie. Le fichier conf utilisé cette fois est le suivant.

pipeline.conf


input {
  stdin {
    codec => json
  }
}
filter {
  json {
    source => "message"
  }
}
output {
  elasticsearch {
    hosts => ["http://xxx.ap-northeast-1.es.amazonaws.com/"]
    index => "lifelog"
    document_type => "zaim"
    document_id => "%{id}"
  }
  stdout {
    codec => rubydebug
  }
}

«Input» est une entrée standard (plugin stdin) et est interprétée comme json.

Étant donné que le fichier est la source d'entrée, il est possible de le spécifier avec le plugin de fichier, mais il convient de noter à ce stade que l'entrée après le démarrage de logstash est la cible du traitement, c'est-à-dire que le plugin de fichier est utilisé comme entrée Si spécifié, la partie supplémentaire est la cible de traitement. Il est nécessaire de spécifier start_position => begin afin de traiter le contenu initialement écrit dans le fichier depuis le début. De plus, une fois le fichier lu, la position de lecture est enregistrée dans ~ / .sincedb _..., donc à moins que ce fichier ne soit supprimé, start_position => début ne fonctionnera pas et il sera lu à partir du milieu.

Dans "filter", quelle partie de json est lue est définie. L'entrée de données en tant que codec => json n'est pas traitée brute par json, mais des méta-attributs, etc. sont ajoutés, donc lors de l'extraction uniquement de la partie de données, spécifiez le champ message et ci-dessous explicitement. Il y a un besoin de.

Dans "output", spécifiez le plugin de recherche élastique et le plugin stdout. Dans le premier cas, l'ID du document peut être spécifié, de sorte que le champ id initialement inclus dans les données json de Zaim est spécifié. Le plugin stdout a été spécifié à des fins de débogage. Il semble qu'il soit habituel de régler «codec» sur «rubydebug».

Courir

Après avoir créé le fichier de configuration, exécutez en fait logstash.

$ cat zaim.json | logstash -f pipeline.conf

Faites une demande à Elasticsearch et confirmez que vous vous êtes inscrit.

$ curl -XGET "http://xxx.ap-northeast-1.es.amazonaws.com/lifelog/zaim/_count" | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    62  100    62    0     0    384      0 --:--:-- --:--:-- --:--:--   385
{
  "count": 1976,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  }
}

Kibana

Si vous vous êtes inscrit avec succès auprès d'Elasticsearch, vous pouvez déjà le consulter depuis Kibana.

zaim.png

Recommended Posts

Entrez les données Zaim dans Amazon Elasticsearch Service avec Logstash
Comment stocker des données CSV dans Amazon Kinesis Streams avec une entrée standard
Envoyer une demande d'AWS Lambda à Amazon Elasticsearch Service
De l'installation d'Elasticsearch à la saisie des données
Comment gérer les données déséquilibrées
Données de sous-titres créées avec Amazon Transcribe
Intégration de données depuis l'application Python sur Linux vers Amazon Redshift avec ODBC
Intégration de données depuis l'application Python sur Windows vers Amazon Redshift avec ODBC
Copier des données d'Amazon S3 vers Google Cloud Storage avec Python (boto)
Obtenez des données alimentaires avec l'API Amazon (Python)
Utiliser Amazon Simple Notification Service avec Python
Convertir des données Excel en JSON avec python
Convertissez des données FX 1 minute en données 5 minutes avec Python
Essayez de convertir en données ordonnées avec les pandas
Comment lire les données de problème avec Paiza
Déploiement groupé avec CFn pour prendre des instantanés manuels d'Elastic Search Service avec Lambda
Comment créer des exemples de données CSV avec hypothèse
Essayez d'agréger les données de musique doujin avec des pandas
Convertissez les données avec la forme (nombre de données, 1) en (nombre de données,) avec numpy.
Remarque pour tracer l'adresse IP avec Kibana + Elastic Search
J'ai essayé de sauvegarder les données avec discorde
Je veux frapper 100 sciences des données avec Colaboratory
Enregistrez les données pour flasher avec la carte Nucleo STM32
Comment récupérer des données de courses de chevaux avec Beautiful Soup
[Introduction à minimiser] Analyse des données avec le modèle SEIR ♬
J'ai essayé de démarrer Jupyter avec toutes les lumières d'Amazon