[PYTHON] Input Zaim data to Amazon Elasticsearch Service with Logstash

I challenged myself thinking that I could easily graph the asset status. Among the household account book services, Zaim has an API open to the public, and you can get data with json. I acquired this using Python and tried to realize graphing by putting it into Amazon Elasticsearch Service for studying. In addition, since it is a big deal to introduce a large amount of json, I am challenging Elastic logstash.

Amazon Elasticsearch Service

Elasticsearch managed service provided by AWS. The bottleneck is that the version is slightly older than the latest version, but it is one of the easiest and fastest ways to use Elasticsearch. Kibana is also provided by default.

There is nothing particularly difficult in setting, so create it quickly from the Web console. The access control policy is now permitted by IP. This is not only Elasticsearch but also Kibana access control, so if you want to input data from a server on the cloud and check Kibana on a local PC, you need to specify the IP for each.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "es:*",
      "Resource": "arn:aws:es:ap-northeast-1:xxxxx:domain/xxxxx/*",
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": [
            "xx.xx.xx.xx"
          ]
        }
      }
    }
  ]
}

Zaim API via Python

I used Python requests to hit the Zaim API. The way to write is referring to the request document.

get_zaim.py


# coding: utf-8
import requests
from requests_oauthlib import OAuth1Session
from requests_oauthlib import OAuth1

consumer_key = u"XXXXX"
consumer_secret = u"XXXXX"

request_token_url = u"https://api.zaim.net/v2/auth/request"
authorize_url = u"https://www.zaim.net/users/auth"
access_token_url = u"https://api.zaim.net/v2/auth/access"
callback_uri = u"http://chroju.net/"
get_money_url = u"https://api.zaim.net/v2/home/money"

def oauth_requests():
    auth = OAuth1Session(consumer_key, client_secret=consumer_secret, callback_uri=callback_uri)
    r = auth.fetch_request_token(request_token_url)
    resource_owner_key = r.get('oauth_token')
    resource_owner_secret = r.get('oauth_token_secret')

    authorization_url = auth.authorization_url(authorize_url)
    print 'Please go here and authorize,', authorization_url
    redirect_response = raw_input('Paste the full redirect URL here:')
    oauth_response = auth.parse_authorization_response(redirect_response)
    verifier = oauth_response.get('oauth_verifier')

    auth = OAuth1Session(client_key=consumer_key, client_secret=consumer_secret, resource_owner_key=resource_owner_key, resource_owner_secret=resource_owner_secret, verifier=verifier)
    oauth_token = auth.fetch_access_token(access_token_url)

    resource_owner_key = oauth_token.get('oauth_token')
    resource_owner_secret = oauth_token.get('oauth_token_secret')

    get_json(resource_owner_key, resource_owner_secret)

def get_json(resource_owner_key, resource_owner_secret):
    headeroauth = OAuth1(consumer_key, consumer_secret, resource_owner_key, resource_owner_secret, signature_type='auth_header')
    r = requests.get(get_money_url, auth=headeroauth)
    print r.content

if __name__ == "__main__":
  oauth_requests()

When this is executed, the household account book data will be output in a straightforward manner with json, so format it via jq and save it in a file.

$ python27 get_zaim.py | jq > zaim.json

Elasticsearch

mapping

Once you launch Amazon ES, the usage after that is the same as normal Elasticsearch. First, perform mapping in advance according to the format of the json file that can be obtained with the zaim API.

$ curl -XPUT "http://xxx.ap-northeast-1.es.amazonaws.com/lifelog" -d '
{"mappings" : {
"zaim" : {
"properties" : {
"id" : { "type" : "integer"},
"user_id" : { "type" : "integer"},
"date" : { "type" : "date", "format" : "yyyy-MM-dd"},
"mode" : { "type" : "string" },
"category_id" : { "type" : "integer" },
"genre_id" : { "type" : "integer" },
"from_account_id" : {"type" : "integer"},
"to_account_id" : {"type" : "integer"},
"amount" : {"type" : "integer"},
"comment" : {"type" : "string"},
"active" : {"type" : "integer"},
"created" : {"type" : "date", "format" : "yyyy-MM-dd HH:mm:ss"},
"currency_code" : {"type" : "string"},
"name" : {"type" : "string"},
"receipt_id" : {"type" : "integer"},
"place_uid" : {"type" : "integer"},
"place" : {"type" : "string"},
"path" : {"type":"string"}
}
}
}
}'

Confirm that the mapping settings are correct.

$ curl -XGET "http://xxx.ap-northeast-1.es.amazonaws.com/lifelog/_mapping" | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   733  100   733    0     0  58050      0 --:--:-- --:--:-- --:--:-- 61083
{
  "lifelog": {
    "mappings": {
      "zaim": {
        "properties": {
          "@timestamp": {
            "type": "date",
            "format": "dateOptionalTime"
...

logstash

Probably the best way to put multiple jsons into Elasticsearch at the same time is to use bulk API. It is common. However, since the json input by the bulk API must describe the data part and the index alternately, it is not suitable for the json that is collectively acquired by the API like this time.

As an alternative method, there is a method using fluentd etc., but since it is provided by Elastic, I tried using ** logstash **. logstash is a tool for analyzing and processing stream data similar to fluentd. It's not specifically for Elasticsearch, and you can send data to Datadog or influxDB, for example.

Since json has already been output to the file this time, I cated the file and input it with the input plugin of stdin, and set Elasticsearch as the output plugin. You can get a feel for what plugins are available for input and output by looking at the documentation.

Installation

Install with yum as described in Official.

$ sudo yum install logstash

Creating a conf file

logstash uses the conf file that describes the content you want to process to process the data. The file is roughly divided into three plugin settings: "input" that describes the data receiver, "filter" that describes the analysis method, and "output" that describes the output destination. The conf file used this time is as follows.

pipeline.conf


input {
  stdin {
    codec => json
  }
}
filter {
  json {
    source => "message"
  }
}
output {
  elasticsearch {
    hosts => ["http://xxx.ap-northeast-1.es.amazonaws.com/"]
    index => "lifelog"
    document_type => "zaim"
    document_id => "%{id}"
  }
  stdout {
    codec => rubydebug
  }
}

"Input" is standard input (stdin plugin) and interpreted as json.

Since the file is the input source, it is possible to specify it with the file plugin, but it should be noted at this time that the input after starting logstash is the target of processing, that is, the file plugin is used as the input. If specified, the additional part is the processing target. It is necessary to specify start_position => beginning in order to process the contents originally written in the file from the beginning. Also, once the file is read, the reading position is recorded in ~ / .sincedb _..., so unless this file is deleted, start_position => beginning will not work and it will be read from the middle.

In "filter", which part of json is read is set. Data input as codec => json is not json processed raw, meta attributes etc. are added, so when extracting only the data part purely, specify the message field and below explicitly There is a need to.

In "output", specify elasticsearch plugin and stdout plugin. In the former, you can specify the document ID, so it is specified to use the id field originally included in Zaim's json data. The stdout plugin was specified for debugging purposes. It seems that it is customary to set codec to ruby debug.

Run

After creating the configuration file, actually run logstash.

$ cat zaim.json | logstash -f pipeline.conf

Make a request to Elasticsearch and confirm that you have registered.

$ curl -XGET "http://xxx.ap-northeast-1.es.amazonaws.com/lifelog/zaim/_count" | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    62  100    62    0     0    384      0 --:--:-- --:--:-- --:--:--   385
{
  "count": 1976,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  }
}

Kibana

If you have successfully registered with Elasticsearch, you can already view it from Kibana.

zaim.png

Recommended Posts

Input Zaim data to Amazon Elasticsearch Service with Logstash
How to store CSV data in Amazon Kinesis Streams with standard input
Send a request from AWS Lambda to Amazon Elasticsearch Service
From Elasticsearch installation to data entry
How to deal with imbalanced data
How to deal with imbalanced data
Subtitle data created with Amazon Transcribe
Data integration from Python app on Linux to Amazon Redshift with ODBC
Data integration from Python app on Windows to Amazon Redshift with ODBC
Copy data from Amazon S3 to Google Cloud Storage with Python (boto)
Retrieving food data with Amazon API (Python)
Use Amazon Simple Notification Service with Python
Convert Excel data to JSON with python
Convert FX 1-minute data to 5-minute data with Python
Try converting to tidy data with pandas
How to read problem data with paiza
Bulk deployment with CFn to take a manual snapshot of Elasticsearch Service with Lambda
How to create sample CSV data with hypothesis
Try to aggregate doujin music data with pandas
Convert data with shape (number of data, 1) to (number of data,) with numpy.
Note to plot IP address with Kibana + Elasticsearch
I tried to save the data with discord
I want to knock 100 data sciences with Colaboratory
Save data to flash with STM32 Nucleo Board
How to scrape horse racing data with BeautifulSoup
[Introduction to minimize] Data analysis with SEIR model ♬
I tried to start Jupyter with Amazon lightsail