Zuvor schrieb ich einen Artikel API, die durch Konvertierung vom Open Data Catalog (CSV) der Präfektur Shizuoka in Daten (json) der COVID-19-Website für Gegenmaßnahmen erhalten wurde. Es war. Die hier erstellte API wird mit Lambda abgefragt. Wenn sich die Daten ändern, werden sie in S3 gespeichert, die Informationen werden auch in DynamoDB gespeichert und Slack wird über die Änderung informiert. Es war. Es ist ETL so zu tun. Neben Hamamatsu City wurden auch Daten aus Shizuoka City überwacht.
Der Funktionscode von Lambda (Python) ist in diesem Repository verfügbar. https://github.com/ww2or3ww/covid19_shizuoka-opendata_notifier
Fügen Sie einen Trigger hinzu. Stellen Sie den Auslöser ein. Wählen Sie EventBridge (ClowdWatch Event) und erstellen Sie eine Regel. Bitte gehen Sie durch, wie Sie die Zeitplanformel schreiben. Nur das.
Legen Sie beim Veröffentlichen des Python-Quellcodes in GitHub usw. die Informationen fest, die Sie nicht in der Umgebungsvariablen veröffentlichen möchten.
Wählen Sie unter Umgebungscode von Lambda die Option "Umgebungsvariablen" bearbeiten. Fügen Sie die Schlüssel und Werte der Umgebungsvariablen hinzu. Wird hinzugefügt werden.
Wenn Sie es aus Python-Code erhalten, sieht es folgendermaßen aus:
api_address = os.environ["API_ADDRESS_CSV2JSON"]
Rufen Sie diese API auf, um die JSON-Daten abzurufen. Der Code sieht so aus.
import json
import requests
class CityInfo:
def __init__(self, city, queryParam):
self.city = city
self.queryParam = queryParam
CityInfo(
"hamamatsu",
"main_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,patients_summary:5ab47071-3651-457c-ae2b-bfb8fdbe1af1,inspection_persons:d4827176-d887-412a-9344-f84f161786a2,contacts:1b57f2c0-081e-4664-ba28-9cce56d0b314"
)
apiResponse = requests.get("{0}?type={1}".format(API_ADDRESS_CSV2JSON, queryParam), auth=AUTH, headers={"x-api-key": API_KEY_CSV2JSON})
retJson = json.loads(apiResponse.text)
Die Tabellenspezifikationen sind wie folgt.
city(PK) | Stadttyp(hamamatsu/shizuoka-shi) |
type(SK) | Datentyp(Typen wie Anzahl der positiven Patienten und Anzahl der getesteten Personen) |
id | ID des offenen Datenkatalogs der Präfektur Shizuoka |
name | Modellname |
path | S3-Pfad, in dem die neuesten Daten gespeichert sind(Schlüssel) |
update | Datum und Uhrzeit der letzten Aktualisierung der Daten |
Da alle JSON-Daten ein letztes Aktualisierungsdatum haben, vergleichen Sie dieses Datum und diese Uhrzeit mit dem DynamoDB-Update und aktualisieren Sie den Datensatz, falls er aktualisiert wurde. (Für neue Daten einfügen.)
import boto3
from boto3.dynamodb.conditions import Key
DYNAMODB_NAME = os.environ["DYNAMODB_NAME"]
DYNAMO_TABLE = boto3.resource("dynamodb").Table(DYNAMODB_NAME)
def processType(cityInfo, retJson, typeId):
date = retJson[type]["date"]
listItem = typeId.split(":")
type = listItem[0]
id = listItem[1]
record = selectItem(cityInfo.city, type)
if(record["Count"] is 0):
path = uploadFile(cityInfo.city, type, id, date, retJson[type])
insertItem(cityInfo.city, type, id, date, TYPE_NAME[type], path)
elif record["Items"][0]["update"] != date:
path = uploadFile(cityInfo.city, type, id, date, retJson[type])
updateItem(cityInfo.city, type, id, date, path)
def selectItem(city, type):
return DYNAMO_TABLE.query(
KeyConditionExpression=Key("city").eq(city) & Key("type").eq(type)
)
def insertItem(city, type, id, date, name, path):
DYNAMO_TABLE.put_item(
Item = {
"city": city,
"type": type,
"id": id,
"update" : date,
"name" : name,
"path" : path
}
)
def updateItem(city, type, id, date, path):
DYNAMO_TABLE.update_item(
Key={
"city": city,
"type": type,
},
UpdateExpression="set #update = :update, #path = :path",
ExpressionAttributeNames={
"#update": "update",
"#path": "path"
},
ExpressionAttributeValues={
":update": date,
":path": path
}
)
Ich habe versucht, nicht nur die neuesten Informationen, sondern auch Verlaufsinformationen in einer separaten Tabelle zu speichern, aber das werde ich weglassen.
Die aktualisierten Daten werden in S3 hochgeladen.
import boto3
S3 = boto3.resource('s3')
S3_BUCKET_NAME = os.environ["S3_BUCKET_NAME"]
def uploadFile(city, type, id, date, jsonData):
dt = datetime.strptime(date, '%Y/%m/%d %H:%M')
path = "data/{0}/{1}/{2}/{3}/{4}/{5}-{6}".format(city, type, dt.year, str(dt.month).zfill(2), str(dt.day).zfill(2), str(dt.hour).zfill(2), str(dt.minute).zfill(2))
objJson = S3.Object(S3_BUCKET_NAME, "{0}.json".format(path))
objJson.put(Body = json.dumps(jsonData, ensure_ascii=False, indent=2))
return path
Nicht nur JSON, sondern auch CSV der Originaldaten wird erfasst und als Satz mit JSON gespeichert, aber ich werde es weglassen.
Es wird in S3 so akkumuliert. In Bezug auf die Partitionierung wäre es besser gewesen, wenn CSV und JSON nicht am selben Ort wären, und wenn ich es mir noch einmal anschaue, scheint es ein bisschen subtil zu sein.
Wir werden Slack über Updates informieren. Der Python-Code sieht folgendermaßen aus:
import slackweb
SLACK_WEBHOOK_HAMAMATSU = os.environ["SLACK_WEBHOOK_HAMAMATSU"]
notifyText = "【{0}】\n{1}".format(cityInfo.city, notifyText)
notifyToSlack(SLACK_WEBHOOK_HAMAMATSU, notifyText)
def notifyToSlack(url, text):
slack = slackweb.Slack(url=url)
slack.notify(text=text)
Wie kann ich die WebHook-URL an slackweb weiterleiten? Gehen Sie zu Eingehende WebHooks. Sie können die Webhook-URL abrufen, indem Sie den Kanal auswählen, den Sie benachrichtigen möchten, und auf die Schaltfläche "Eingehende WebHooks-Integration hinzufügen" klicken. Sie können hier auch das Symbol und den Namen ändern.
So wird Slack benachrichtigt.
Ich habe festgestellt, dass seit [Letzter Beitrag (15.4.)] Zwei Monate vergangen sind (https://qiita.com/w2or3w/items/fd1ecb116116aae051cc), daher schreibe ich in Eile einen Artikel. Es ist heute, am 14. Juni, kaum sicher. Dieses Gerät selbst wurde um den 20. April fertiggestellt und ich betreibe es seitdem persönlich.
Eigentlich habe ich angefangen, diesen Mechanismus zu entwickeln, in der Hoffnung, den Slack-Kanal über code4hamamatsu zu informieren und alle über die Datenaktualisierung zu informieren. Ab der ersten Aprilhälfte.
Ich habe jedoch eine Umgebung, in der GitHub Actions Änderungen abfragt und festschreibt. Dann wird der Netlify-Build ausgeführt und das Ergebnis wird dem Slack-Kanal mitgeteilt, sodass ich die Möglichkeit habe, es zu verwenden. Es gab keine.
Während ich es machte, war ich etwas nervös, weil ich sagte "Oh, ich brauche das nicht", aber ich habe es bis zu dem Punkt geschafft, an dem es funktioniert, während ich die Priorität senkte, und ich habe das Gefühl, dass ich es persönlich leite.
Vorbereitet, um Ende April LT-Material bei JAWS-UG Hamamatsu AWS-Studiengruppe 2020 # 4 zu erstellen, damit jeder zumindest seine Existenz kennt. Ich habe es auch auf die Tagesordnung gesetzt, aber ich habe es nicht in kürzester Zeit getan.
Ich dachte darüber nach, über Qiitas Artikel zu schreiben, aber es scheint, als wären seitdem zwei Monate vergangen. Ah! Armes Kind! !!