[PYTHON] A mechanism for polling open data catalogs for conversion, storage, and notification

A mechanism for polling open data catalogs to accumulate and notify converted data

Introduction

Previously, I wrote an article API to convert from Shizuoka open data catalog (csv) to data (json) of COVID-19 countermeasure site. It was. I made a mechanism such as polling the API created here with Lambda, accumulating data in S3 if there is a change, saving the information in DynamoDB, and notifying Slack that there is a change. It was. It's an ETL pretend. In addition to Hamamatsu City, data from Shizuoka City was also monitored.

Lambda function code (Python) is available in this repository. https://github.com/ww2or3ww/covid19_shizuoka-opendata_notifier

Run regularly on Lambda

Add a trigger. ink (16).png Set the trigger. Select EventBridge (ClowdWatch Event) and create a rule. Screenshot 2020-06-14 at 10.24.58.png Please go through how to write the schedule formula. Screenshot 2020-06-14 at 10.31.39.png Only this. Screenshot 2020-06-14 at 10.33.34.png

Use environment variables

When publishing the Python source code to GitHub etc., set the information you do not want to publish in the environment variable.

Select Edit "Environment Variables" under the Lambda function code. ink (17).png Add the keys and values of environment variables. Screenshot 2020-06-14 at 10.45.22.png Will be added. Screenshot 2020-06-14 at 10.47.36.png

When you get it from Python code, it looks like this:

api_address = os.environ["API_ADDRESS_CSV2JSON"]

Get JSON data via API

Call this API to get the JSON data. The code looks like this.

import json
import requests
class CityInfo:
    def __init__(self, city, queryParam):
        self.city = city
        self.queryParam = queryParam
CityInfo(
    "hamamatsu", 
    "main_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,inspection_persons:d4827176-d887-412a-9344-f84f161786a2,contacts:1b57f2c0-081e-4664-ba28-9cce56d0b314"
)
apiResponse = requests.get("{0}?type={1}".format(API_ADDRESS_CSV2JSON, queryParam), auth=AUTH, headers={"x-api-key": API_KEY_CSV2JSON})
retJson = json.loads(apiResponse.text)

Manage last updated information with DynamoDB

The table specifications are like this. Screenshot 2020-06-14 at 12.31.13.png

city(PK) City type(hamamatsu/shizuoka-shi)
type(SK) type of data(Types such as the number of positive patients and the number of people tested)
id Shizuoka Open Data Catalog ID
name type name
path S3 path where the latest data is stored(Key)
update The date and time when the data was last updated

Since each JSON data has a last update date, compare that date and time with the DynamoDB update, and if it is updated, update the record as well. (Insert for new data.)

import boto3
from boto3.dynamodb.conditions import Key
DYNAMODB_NAME = os.environ["DYNAMODB_NAME"]
DYNAMO_TABLE = boto3.resource("dynamodb").Table(DYNAMODB_NAME)

def processType(cityInfo, retJson, typeId):
    date = retJson[type]["date"]
    listItem = typeId.split(":")
    type = listItem[0]
    id = listItem[1]

    record = selectItem(cityInfo.city, type)
    if(record["Count"] is 0):
        path = uploadFile(cityInfo.city, type, id, date, retJson[type])
        insertItem(cityInfo.city, type, id, date, TYPE_NAME[type], path)
    elif record["Items"][0]["update"] != date:
        path = uploadFile(cityInfo.city, type, id, date, retJson[type])
        updateItem(cityInfo.city, type, id, date, path)

def selectItem(city, type):
    return DYNAMO_TABLE.query(
        KeyConditionExpression=Key("city").eq(city) & Key("type").eq(type)
    )

def insertItem(city, type, id, date, name, path):
    DYNAMO_TABLE.put_item(
      Item = {
        "city": city, 
        "type": type, 
        "id": id, 
        "update" : date, 
        "name" : name, 
        "path" : path
      }
    )
    
def updateItem(city, type, id, date, path):
    DYNAMO_TABLE.update_item(
        Key={
            "city": city,
            "type": type,
        },
        UpdateExpression="set #update = :update, #path = :path",
        ExpressionAttributeNames={
            "#update": "update", 
            "#path": "path"
        },
        ExpressionAttributeValues={
            ":update": date, 
            ":path": path
        }
    )

I tried to store not only the latest information but also history information in a separate table, but I will omit that.

Upload to S3

The updated data will be uploaded to S3.

import boto3
S3 = boto3.resource('s3') 
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]

def uploadFile(city, type, id, date, jsonData):
    dt = datetime.strptime(date, '%Y/%m/%d %H:%M')
    path = "data/{0}/{1}/{2}/{3}/{4}/{5}-{6}".format(city, type, dt.year, str(dt.month).zfill(2), str(dt.day).zfill(2), str(dt.hour).zfill(2), str(dt.minute).zfill(2))
    objJson = S3.Object(S3_BUCKET_NAME, "{0}.json".format(path))
    objJson.put(Body = json.dumps(jsonData, ensure_ascii=False, indent=2))
    return path

Not only JSON but also CSV of the original data is acquired and saved as a set with JSON, but I will omit it.

It is accumulated in S3 like this. Screenshot 2020-06-14 at 13.52.15.png Regarding partitioning, it would have been better if CSV and JSON were not in the same place, and when I look at it again, it seems a bit subtle.

Notify Slack

Notify Slack of any updates. The Python code looks like this:

import slackweb
SLACK_WEBHOOK_HAMAMATSU = os.environ["SLACK_WEBHOOK_HAMAMATSU"]

notifyText = "【{0}】\n{1}".format(cityInfo.city, notifyText)
notifyToSlack(SLACK_WEBHOOK_HAMAMATSU, notifyText)
def notifyToSlack(url, text):
    slack = slackweb.Slack(url=url)
    slack.notify(text=text)

How do I get the WebHook URL to pass to slackweb? Go to Incoming WebHooks. ink (19).png You can get the Webhook URL by selecting the channel you want to notify and pressing the "Add Incoming WebHooks integration" button. ink (20).png You can also change the icon and name here.

Here's how Slack is notified. Screenshot 2020-06-14 at 13.56.26.png

Afterword

I noticed that it's been about 2 months since Last post (4/15), so I'm writing an article in a hurry. It's just barely safe today on 6/14. This device itself was completed around 4/20, and I have been operating it personally since then.

Actually, I started making this mechanism, hoping to notify the Slack channel of code4hamamatsu and let everyone know about the data update. From the first half of April.

However, I have an environment in which GitHub Actions polls and commits any changes, then the Netlify build runs and the result is notified to the Slack channel, so I have an opportunity to use it. There was not.

While I was making it, I was a little nervous because I said "Oh, I don't need this", but I made it to the point where it works while lowering the priority, and I feel like I'm running it personally.

At the end of April JAWS-UG Hamamatsu AWS Study Group 2020 # 4, I prepared to use it as a material for LT so that everyone would know about its existence. I also posted it on the agenda, but I didn't do it in no time. Screenshot 2020-06-14 at 09.51.49.png

I was thinking of trying to write about Qiita's article, but it seems like two months have passed and it continues to this day. Ah! Poor child! !!

Recommended Posts

A mechanism for polling open data catalogs for conversion, storage, and notification
Try using PHATE, a dimensionality reduction and visualization method for biological data
Quickly build a python environment for deep learning and data science (Windows)