[PYTHON] Ein Mechanismus zum Abrufen, Konvertieren, Sammeln und Benachrichtigen offener Datenkataloge

Ein Mechanismus zum Abrufen offener Datenkataloge, um konvertierte Daten zu sammeln und zu benachrichtigen

Einführung

Zuvor schrieb ich einen Artikel API, die durch Konvertierung vom Open Data Catalog (CSV) der Präfektur Shizuoka in Daten (json) der COVID-19-Website für Gegenmaßnahmen erhalten wurde. Es war. Die hier erstellte API wird mit Lambda abgefragt. Wenn sich die Daten ändern, werden sie in S3 gespeichert, die Informationen werden auch in DynamoDB gespeichert und Slack wird über die Änderung informiert. Es war. Es ist ETL so zu tun. Neben Hamamatsu City wurden auch Daten aus Shizuoka City überwacht.

Der Funktionscode von Lambda (Python) ist in diesem Repository verfügbar. https://github.com/ww2or3ww/covid19_shizuoka-opendata_notifier

Laufen Sie regelmäßig auf Lambda

Fügen Sie einen Trigger hinzu. ink (16).png Stellen Sie den Auslöser ein. Wählen Sie EventBridge (ClowdWatch Event) und erstellen Sie eine Regel. Screenshot 2020-06-14 at 10.24.58.png Bitte gehen Sie durch, wie Sie die Zeitplanformel schreiben. Screenshot 2020-06-14 at 10.31.39.png Nur das. Screenshot 2020-06-14 at 10.33.34.png

Verwenden Sie Umgebungsvariablen

Legen Sie beim Veröffentlichen des Python-Quellcodes in GitHub usw. die Informationen fest, die Sie nicht in der Umgebungsvariablen veröffentlichen möchten.

Wählen Sie unter Umgebungscode von Lambda die Option "Umgebungsvariablen" bearbeiten. ink (17).png Fügen Sie die Schlüssel und Werte der Umgebungsvariablen hinzu. Screenshot 2020-06-14 at 10.45.22.png Wird hinzugefügt werden. Screenshot 2020-06-14 at 10.47.36.png

Wenn Sie es aus Python-Code erhalten, sieht es folgendermaßen aus:

api_address = os.environ["API_ADDRESS_CSV2JSON"]

Holen Sie sich JSON-Daten über die API

Rufen Sie diese API auf, um die JSON-Daten abzurufen. Der Code sieht so aus.

import json
import requests
class CityInfo:
    def __init__(self, city, queryParam):
        self.city = city
        self.queryParam = queryParam
CityInfo(
    "hamamatsu", 
    "main_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,inspection_persons:d4827176-d887-412a-9344-f84f161786a2,contacts:1b57f2c0-081e-4664-ba28-9cce56d0b314"
)
apiResponse = requests.get("{0}?type={1}".format(API_ADDRESS_CSV2JSON, queryParam), auth=AUTH, headers={"x-api-key": API_KEY_CSV2JSON})
retJson = json.loads(apiResponse.text)

Verwalten Sie die zuletzt aktualisierten Informationen mit DynamoDB

Die Tabellenspezifikationen sind wie folgt. Screenshot 2020-06-14 at 12.31.13.png

city(PK) Stadttyp(hamamatsu/shizuoka-shi)
type(SK) Datentyp(Typen wie Anzahl der positiven Patienten und Anzahl der getesteten Personen)
id ID des offenen Datenkatalogs der Präfektur Shizuoka
name Modellname
path S3-Pfad, in dem die neuesten Daten gespeichert sind(Schlüssel)
update Datum und Uhrzeit der letzten Aktualisierung der Daten

Da alle JSON-Daten ein letztes Aktualisierungsdatum haben, vergleichen Sie dieses Datum und diese Uhrzeit mit dem DynamoDB-Update und aktualisieren Sie den Datensatz, falls er aktualisiert wurde. (Für neue Daten einfügen.)

import boto3
from boto3.dynamodb.conditions import Key
DYNAMODB_NAME = os.environ["DYNAMODB_NAME"]
DYNAMO_TABLE = boto3.resource("dynamodb").Table(DYNAMODB_NAME)

def processType(cityInfo, retJson, typeId):
    date = retJson[type]["date"]
    listItem = typeId.split(":")
    type = listItem[0]
    id = listItem[1]

    record = selectItem(cityInfo.city, type)
    if(record["Count"] is 0):
        path = uploadFile(cityInfo.city, type, id, date, retJson[type])
        insertItem(cityInfo.city, type, id, date, TYPE_NAME[type], path)
    elif record["Items"][0]["update"] != date:
        path = uploadFile(cityInfo.city, type, id, date, retJson[type])
        updateItem(cityInfo.city, type, id, date, path)

def selectItem(city, type):
    return DYNAMO_TABLE.query(
        KeyConditionExpression=Key("city").eq(city) & Key("type").eq(type)
    )

def insertItem(city, type, id, date, name, path):
    DYNAMO_TABLE.put_item(
      Item = {
        "city": city, 
        "type": type, 
        "id": id, 
        "update" : date, 
        "name" : name, 
        "path" : path
      }
    )
    
def updateItem(city, type, id, date, path):
    DYNAMO_TABLE.update_item(
        Key={
            "city": city,
            "type": type,
        },
        UpdateExpression="set #update = :update, #path = :path",
        ExpressionAttributeNames={
            "#update": "update", 
            "#path": "path"
        },
        ExpressionAttributeValues={
            ":update": date, 
            ":path": path
        }
    )

Ich habe versucht, nicht nur die neuesten Informationen, sondern auch Verlaufsinformationen in einer separaten Tabelle zu speichern, aber das werde ich weglassen.

Auf S3 hochladen

Die aktualisierten Daten werden in S3 hochgeladen.

import boto3
S3 = boto3.resource('s3') 
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]

def uploadFile(city, type, id, date, jsonData):
    dt = datetime.strptime(date, '%Y/%m/%d %H:%M')
    path = "data/{0}/{1}/{2}/{3}/{4}/{5}-{6}".format(city, type, dt.year, str(dt.month).zfill(2), str(dt.day).zfill(2), str(dt.hour).zfill(2), str(dt.minute).zfill(2))
    objJson = S3.Object(S3_BUCKET_NAME, "{0}.json".format(path))
    objJson.put(Body = json.dumps(jsonData, ensure_ascii=False, indent=2))
    return path

Nicht nur JSON, sondern auch CSV der Originaldaten wird erfasst und als Satz mit JSON gespeichert, aber ich werde es weglassen.

Es wird in S3 so akkumuliert. Screenshot 2020-06-14 at 13.52.15.png In Bezug auf die Partitionierung wäre es besser gewesen, wenn CSV und JSON nicht am selben Ort wären, und wenn ich es mir noch einmal anschaue, scheint es ein bisschen subtil zu sein.

Benachrichtigen Sie Slack

Wir werden Slack über Updates informieren. Der Python-Code sieht folgendermaßen aus:

import slackweb
SLACK_WEBHOOK_HAMAMATSU = os.environ["SLACK_WEBHOOK_HAMAMATSU"]

notifyText = "【{0}】\n{1}".format(cityInfo.city, notifyText)
notifyToSlack(SLACK_WEBHOOK_HAMAMATSU, notifyText)
def notifyToSlack(url, text):
    slack = slackweb.Slack(url=url)
    slack.notify(text=text)

Wie kann ich die WebHook-URL an slackweb weiterleiten? Gehen Sie zu Eingehende WebHooks. ink (19).png Sie können die Webhook-URL abrufen, indem Sie den Kanal auswählen, den Sie benachrichtigen möchten, und auf die Schaltfläche "Eingehende WebHooks-Integration hinzufügen" klicken. ink (20).png Sie können hier auch das Symbol und den Namen ändern.

So wird Slack benachrichtigt. Screenshot 2020-06-14 at 13.56.26.png

Nachwort

Ich habe festgestellt, dass seit [Letzter Beitrag (15.4.)] Zwei Monate vergangen sind (https://qiita.com/w2or3w/items/fd1ecb116116aae051cc), daher schreibe ich in Eile einen Artikel. Es ist heute, am 14. Juni, kaum sicher. Dieses Gerät selbst wurde um den 20. April fertiggestellt und ich betreibe es seitdem persönlich.

Eigentlich habe ich angefangen, diesen Mechanismus zu entwickeln, in der Hoffnung, den Slack-Kanal über code4hamamatsu zu informieren und alle über die Datenaktualisierung zu informieren. Ab der ersten Aprilhälfte.

Ich habe jedoch eine Umgebung, in der GitHub Actions Änderungen abfragt und festschreibt. Dann wird der Netlify-Build ausgeführt und das Ergebnis wird dem Slack-Kanal mitgeteilt, sodass ich die Möglichkeit habe, es zu verwenden. Es gab keine.

Während ich es machte, war ich etwas nervös, weil ich sagte "Oh, ich brauche das nicht", aber ich habe es bis zu dem Punkt geschafft, an dem es funktioniert, während ich die Priorität senkte, und ich habe das Gefühl, dass ich es persönlich leite.

Vorbereitet, um Ende April LT-Material bei JAWS-UG Hamamatsu AWS-Studiengruppe 2020 # 4 zu erstellen, damit jeder zumindest seine Existenz kennt. Ich habe es auch auf die Tagesordnung gesetzt, aber ich habe es nicht in kürzester Zeit getan. Screenshot 2020-06-14 at 09.51.49.png

Ich dachte darüber nach, über Qiitas Artikel zu schreiben, aber es scheint, als wären seitdem zwei Monate vergangen. Ah! Armes Kind! !!

Recommended Posts

Ein Mechanismus zum Abrufen, Konvertieren, Sammeln und Benachrichtigen offener Datenkataloge
Versuchen Sie es mit PHATE, einer Methode zur Reduzierung und Visualisierung biologischer Daten
Erstellen Sie schnell eine Python-Umgebung für Deep Learning / Data Science (Windows)