[PYTHON] Un mécanisme pour interroger, convertir, accumuler et notifier les catalogues de données ouvertes

Un mécanisme d'interrogation des catalogues de données ouvertes pour accumuler et notifier les données converties

introduction

Auparavant, j'ai écrit un article API obtenue en convertissant le catalogue de données ouvertes de la préfecture de Shizuoka (csv) en données (json) du site de contre-mesures COVID-19. C'était. L'API créée ici est interrogée avec Lambda et, en cas de modification des données, elle est stockée dans S3, les informations sont également enregistrées dans DynamoDB et Slack est informé de la modification. C'était. C'est la prétention ETL. En plus de la ville de Hamamatsu, les données de la ville de Shizuoka ont également été surveillées.

Le code de fonction de Lambda (Python) est disponible dans ce référentiel. https://github.com/ww2or3ww/covid19_shizuoka-opendata_notifier

Exécuter régulièrement sur Lambda

Ajoutez un déclencheur. ink (16).png Réglez la gâchette. Sélectionnez EventBridge (ClowdWatch Event) et créez une règle. Screenshot 2020-06-14 at 10.24.58.png Veuillez expliquer comment rédiger la formule de planification. Screenshot 2020-06-14 at 10.31.39.png Seulement ça. Screenshot 2020-06-14 at 10.33.34.png

Utiliser des variables d'environnement

Lors de la publication du code source Python sur GitHub, etc., définissez les informations que vous ne souhaitez pas publier dans la variable d'environnement.

Sélectionnez Modifier "Variables d'environnement" sous le code de fonction de Lambda. ink (17).png Ajoutez les clés et les valeurs des variables d'environnement. Screenshot 2020-06-14 at 10.45.22.png Sera ajouté. Screenshot 2020-06-14 at 10.47.36.png

Lorsque vous l'obtenez à partir du code Python, cela ressemble à ceci:

api_address = os.environ["API_ADDRESS_CSV2JSON"]

Obtenez des données JSON via l'API

Appelez cette API pour obtenir les données JSON. Le code ressemble à ceci.

import json
import requests
class CityInfo:
    def __init__(self, city, queryParam):
        self.city = city
        self.queryParam = queryParam
CityInfo(
    "hamamatsu", 
    "main_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,inspection_persons:d4827176-d887-412a-9344-f84f161786a2,contacts:1b57f2c0-081e-4664-ba28-9cce56d0b314"
)
apiResponse = requests.get("{0}?type={1}".format(API_ADDRESS_CSV2JSON, queryParam), auth=AUTH, headers={"x-api-key": API_KEY_CSV2JSON})
retJson = json.loads(apiResponse.text)

Gérez les dernières informations mises à jour avec DynamoDB

Les spécifications du tableau sont comme ça. Screenshot 2020-06-14 at 12.31.13.png

city(PK) Type de ville(hamamatsu/shizuoka-shi)
type(SK) type de données(Types tels que le nombre de patients positifs et le nombre de personnes testées)
id ID du catalogue de données ouvertes de la préfecture de Shizuoka
name nom du type
path Chemin S3 où les dernières données sont stockées(Clé)
update La date et l'heure de la dernière mise à jour des données

Étant donné que chaque donnée JSON a une date de dernière mise à jour, comparez cette date et cette heure avec la mise à jour DynamoDB et mettez à jour l'enregistrement s'il a été mis à jour. (Insérer pour les nouvelles données.)

import boto3
from boto3.dynamodb.conditions import Key
DYNAMODB_NAME = os.environ["DYNAMODB_NAME"]
DYNAMO_TABLE = boto3.resource("dynamodb").Table(DYNAMODB_NAME)

def processType(cityInfo, retJson, typeId):
    date = retJson[type]["date"]
    listItem = typeId.split(":")
    type = listItem[0]
    id = listItem[1]

    record = selectItem(cityInfo.city, type)
    if(record["Count"] is 0):
        path = uploadFile(cityInfo.city, type, id, date, retJson[type])
        insertItem(cityInfo.city, type, id, date, TYPE_NAME[type], path)
    elif record["Items"][0]["update"] != date:
        path = uploadFile(cityInfo.city, type, id, date, retJson[type])
        updateItem(cityInfo.city, type, id, date, path)

def selectItem(city, type):
    return DYNAMO_TABLE.query(
        KeyConditionExpression=Key("city").eq(city) & Key("type").eq(type)
    )

def insertItem(city, type, id, date, name, path):
    DYNAMO_TABLE.put_item(
      Item = {
        "city": city, 
        "type": type, 
        "id": id, 
        "update" : date, 
        "name" : name, 
        "path" : path
      }
    )
    
def updateItem(city, type, id, date, path):
    DYNAMO_TABLE.update_item(
        Key={
            "city": city,
            "type": type,
        },
        UpdateExpression="set #update = :update, #path = :path",
        ExpressionAttributeNames={
            "#update": "update", 
            "#path": "path"
        },
        ExpressionAttributeValues={
            ":update": date, 
            ":path": path
        }
    )

J'ai essayé de stocker non seulement les dernières informations, mais aussi les informations d'historique dans un tableau séparé, mais je vais l'omettre.

Télécharger vers S3

Les données mises à jour seront téléchargées vers S3.

import boto3
S3 = boto3.resource('s3') 
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]

def uploadFile(city, type, id, date, jsonData):
    dt = datetime.strptime(date, '%Y/%m/%d %H:%M')
    path = "data/{0}/{1}/{2}/{3}/{4}/{5}-{6}".format(city, type, dt.year, str(dt.month).zfill(2), str(dt.day).zfill(2), str(dt.hour).zfill(2), str(dt.minute).zfill(2))
    objJson = S3.Object(S3_BUCKET_NAME, "{0}.json".format(path))
    objJson.put(Body = json.dumps(jsonData, ensure_ascii=False, indent=2))
    return path

Non seulement JSON mais aussi CSV des données d'origine sont acquis et enregistrés en tant qu'ensemble avec JSON, mais je vais l'omettre.

Il est accumulé dans S3 comme ça. Screenshot 2020-06-14 at 13.52.15.png En ce qui concerne le partitionnement, il aurait été préférable que CSV et JSON ne soient pas au même endroit, et quand je le regarde à nouveau, cela semble un peu subtil.

Notifier Slack

Nous informerons Slack de toute mise à jour. Le code Python ressemble à ceci:

import slackweb
SLACK_WEBHOOK_HAMAMATSU = os.environ["SLACK_WEBHOOK_HAMAMATSU"]

notifyText = "【{0}】\n{1}".format(cityInfo.city, notifyText)
notifyToSlack(SLACK_WEBHOOK_HAMAMATSU, notifyText)
def notifyToSlack(url, text):
    slack = slackweb.Slack(url=url)
    slack.notify(text=text)

Comment faire passer l'URL du WebHook à slackweb? Accédez à Incoming WebHooks. ink (19).png Vous pouvez obtenir l'URL du Webhook en sélectionnant le canal que vous souhaitez notifier et en cliquant sur le bouton "Ajouter l'intégration des WebHooks entrants". ink (20).png Vous pouvez également modifier l'icône et le nom ici.

Voici comment Slack est notifié. Screenshot 2020-06-14 at 13.56.26.png

Épilogue

J'ai remarqué que deux mois se sont écoulés depuis le Dernier message (15/04), donc j'écris un article à la hâte. C'est à peine sûr aujourd'hui le 14/06. Cet appareil lui-même a été achevé vers 4/20, et je l'utilise personnellement depuis.

En fait, j'ai commencé à créer ce mécanisme, dans l'espoir de notifier le canal Slack de code4hamamatsu et d'informer tout le monde de la mise à jour des données. À partir de la première quinzaine d'avril.

Cependant, j'ai un environnement dans lequel GitHub Actions interroge et valide toutes les modifications, puis la compilation Netlify s'exécute et le résultat est notifié au canal Slack, j'ai donc la possibilité de l'utiliser. Il n'y avait pas.

Pendant que je le faisais, j'étais un peu nerveux parce que j'ai dit: «Oh, je n'ai pas besoin de ça.

Préparé pour faire du matériel LT au JAWS-UG Hamamatsu AWS Study Group 2020 # 4 à la fin du mois d'avril afin que tout le monde sache au moins son existence. Je l'ai également mis à l'ordre du jour, mais je ne l'ai pas fait en un rien de temps. Screenshot 2020-06-14 at 09.51.49.png

Je pensais essayer d'écrire sur l'article de Qiita, mais il semble que cela fait deux mois depuis. Ah! Pauvre enfant! !!

Recommended Posts

Un mécanisme pour interroger, convertir, accumuler et notifier les catalogues de données ouvertes
Essayez d'utiliser PHATE, une méthode de réduction et de visualisation des données biologiques
Créez rapidement un environnement python pour le Deep Learning / Data Science (Windows)