(Zusatz) Es war nur kostenlos, daher werde ich es am 9. Tag von [AWS Lambda und Serverless # 2 Adventskalender 2019] veröffentlichen (https://qiita.com/advent-calendar/2019/lambda2).
Ich dachte plötzlich, dass ich die folgende LINE-Benachrichtigung über Qiitas Likes haben möchte:
Entdecken Sie verwandte Artikel! -> Wir haben eine sanfte Welt aufgebaut, die Sie benachrichtigt, wenn Sie es mit Qiita mögen
Die Struktur ist einfach (und der Artikel selbst) ist sehr hilfreich.
Dieses Mal wollte ich eine ereignisgesteuerte Anwendung mit AWS-Diensten wie Lambda erstellen, daher habe ich die Konfiguration ein wenig geändert und nachgeahmt.
Der ursprüngliche Artikel wird mit der folgenden einfachen Struktur implementiert.
Dieses Mal haben wir die Konfiguration wie folgt überarbeitet, damit der Lambda-Prozess präziser und schneller abgeschlossen werden kann.
--Scraping-> Mit der Qiita-API zur Aggregation wechseln
Da ich dieses Mal zum ersten Mal Lambda und Dynamo DB verwende, habe ich das starke Gefühl, dass ich verschiedene Funktionen ausprobieren möchte. Ich kann das Gefühl, es zu übertreiben, nicht leugnen. Ich denke, der Originalartikel ist schlauer.
Ich habe folgendes verwendet:
Qiita API v2 Die API ist für Qiita verfügbar (Dokumentation zur Qiita API v2). Sie können leicht verschiedene Informationen erhalten. Leider gibt es keine API, die Benachrichtigungen empfangen kann. Daher werden wir Benachrichtigungen realisieren, indem wir Folgendes kombinieren.
Funktion | Endpunkt | Vorteil | Nachteil |
---|---|---|---|
Holen Sie sich eine Liste von Likes | GET /api/v2/authenticated_user/items | Sie können die Gesamtzahl der Likes bis zu 100 Artikel mit einem Get erhalten | Speicher wird verbraucht, weil die Antwort den Körper enthält |
Holen Sie sich die Benutzer-ID, die Ihnen gefallen hat | GET /api/v2/items/:item_id/likes | Sie können Benutzer erhalten, denen jeder Artikel gefällt | Sie müssen so viele Artikel erhalten, wie Sie benötigen |
Lambda berechnet nur für die Zeit, in der es verwendet wird. Daher hat die Reduzierung der Verarbeitungszeit Vorrang vor strengen Benachrichtigungen. Ich denke, dass es am strengsten ist, alle Benutzer-IDs zu erhalten, die Sie regelmäßig mögen, und den Unterschied zu machen, aber da Qiita API v2 nur die Benutzer-IDs erhalten kann, die Sie für jeden Artikel mögen, API so viele wie die Anzahl der Artikel Sie müssen treffen. Qiita mag die folgenden Tendenzen. (Referenz: [7 "erstaunlich" in der Analyse von Qiita-Artikeln im letzten Jahr gefunden) (https://qiita.com/youwht/items/f3fa7e6cc2687509c06e))
- Erstaunlich ① Der Durchschnittswert von "Likes" beträgt 8,02. Mehr als die Hälfte ist 0-2
- Erstaunlich ② Die Verteilung von "Likes" ist so voreingenommen, dass sie nicht grafisch dargestellt werden kann.
Daher wird angenommen, dass das Update von Likes auch auf einen bestimmten Artikel ausgerichtet ist. Es scheint nicht wert zu sein, den Unterschied einzeln zu betrachten. Daher versuche ich, nur die Anzahl der Likes auf einmal zu erhalten, indem ich die Liste abrufe, nur die Artikel einzugrenzen, deren Anzahl der Likes sich geändert hat, und mehrmals auf die API zu klicken, um die Benutzer-ID zu erhalten, die mir gefallen hat.
Wir betrachten nur die Gesamtzahl der Likes pro Artikel. Wenn also ein Like storniert wird, ist dies weniger streng, aber auf Kosten davon.
Lambda + Dynamo DB Stream Diesmal reicht es im Grunde aus, den Prozess regelmäßig auszuführen (einmal alle 15 Minuten usw.). Mit einem normalen Webserver verschwenden Sie nur die meiste Zeit beim Booten. Wenn es sich um einen üblichen Pay-as-you-go-Service handelt, geht dieser verloren. Lambda berechnet jedoch nur die tatsächlich verwendete Rechenzeit und keine Gebühren, wenn der Code nicht ausgeführt wird.
Aufgrund der Art und Weise, wie Ressourcen so oft wie nötig verwendet werden, können Sie verschiedene Trigger für die Verarbeitung der Ausführung auswählen. Die folgenden Trigger sind für diese Anforderung geeignet.
--CloudWatch-Ereignisse: Regelmäßige Ausführung --Dynamo DB Stream: Wenn die DB geändert wird, empfängt sie die geänderten Daten und führt den Prozess aus.
LINE Notify
Sie können LINE einfach benachrichtigen, indem Sie das Zugriffstoken in den Header einfügen und die Nachricht POSTEN. Es ist auch sehr einfach, ein Zugriffstoken zu erhalten.
Das Verfahren zur Implementierung ist wie folgt. Wir werden auch das Blockdiagramm erneut drucken, um die Rolle jeder Implementierung zu verstehen.
Ich möchte einen Auszug des in Lambda verwendeten Codes vorstellen. Sie können den Code, den Sie tatsächlich verwenden, von [hier] aus sehen (https://github.com/tokusumi/qiita_notification_lambda_job).
Ich werde es weglassen, weil es vom Hauptthema abweicht.
Das Folgende ist sehr hilfreich und empfehlenswert. Der Inhalt dieses Artikels ist ausreichend, wenn Sie sich an den Abschnitt "Testen mit Lambda" halten können. (Referenz: Erste API-Entwicklung mit Lambda und DynamoDB)
In Python möchten Sie Requests verwenden, in Lambda können Sie die Pip-Installation jedoch nicht verwenden. Daher ist es mühsam, andere als die integrierten Funktionen zu verwenden. (Wenn Sie es weiterhin verwenden möchten, hier) Bereiten Sie also zunächst eine Funktion zum Abrufen und Veröffentlichen von Anforderungen mit urllib vor. Die Schnittstelle ist so nah wie möglich an Anfragen. Die Funktionen req_get und req_post verwenden dieselben Argumente wie die Funktionen request.get und request.post. Außerdem kann das Response-Objekt den Inhalt der json-Antwort mit `` `.body``` abrufen.
import json
from urllib.request import Request
from urllib import request, parse, error
from http.client import HTTPResponse
class Response():
"""Http Response Object"""
def __init__(self, res: HTTPResponse):
self.body = self._json(res)
self.status_code = self._status_code(res)
self.headers = self._headers(res)
def _json(self, res: HTTPResponse):
return json.loads(res.read())
def _status_code(self, res: HTTPResponse) -> int:
return res.status
def _headers(self, res: HTTPResponse) -> Dict[str, str]:
return dict(res.getheaders())
def req_get(url: str, headers=None, params=None) -> Response:
"""get request. simplified request function of Requests
:return: Response object
"""
if params:
url = '{}?{}'.format(url, parse.urlencode(params))
req = Request(url, headers=headers, method='GET')
with request.urlopen(req) as res:
response = Response(res)
return response
def req_post(url: str, data: Dict[str, Any], headers=None) -> Response:
"""post request. simplified request function of Requests
:return: Response object
"""
if headers.get('Content-Type') == 'application/x-www-form-urlencoded':
encoded_data = parse.urlencode(data).encode()
else:
encoded_data = json.dumps(data).encode()
req = Request(url, data=encoded_data, headers=headers, method='POST')
with request.urlopen(req) as res:
response = Response(res)
return response
Dokumentation und [Support](https://help.qiita.com/de/articles/ (Qiita-Suchoptionen) und klicken Sie auf `GET / api / v2 / authenticated_user / items```. Hier verwende ich die Funktion
`serialize_response```, die unnötige Werte verwirft (nur die ID und der Titel sowie die Anzahl der Likes sind erforderlich). Wenn Sie eine große Anzahl von Artikeln haben, benötigen Sie auch Seitennationen. Da daher die Gesamtzahl der Artikel des Benutzers in der Kopfzeile enthalten ist, wird die maximale Anzahl der Seitennationen durch das erste Abrufen berechnet und das Abrufen wiederholt.
def serialize_response(response: Response) -> List[Dict[str, Any]]:
"""serialize response of Qiita API v2"""
keys = ['id', 'title', 'likes_count']
return [
{f: resp.get(f) for f in keys} for resp in response.body
]
def get_item(url: str, headers: Dict[str, str], **param) -> List[Dict[str, Any]]:
"""get a item by Qiita API v2 and return the list of serialized response (dictionary)"""
response = req_get(url, headers=headers, params=param)
return serialize_response(response)
def get_items(token: str, per_page=1, url='https://qiita.com/api/v2/authenticated_user/items') -> List[Dict[str, Any]]:
"""Pagenate, um alle Artikel von authentifizierten Benutzern zu erhalten"""
headers = {'Authorization': 'Bearer {}'.format(token)}
response: Response = req_get(url, headers=headers, params={'page': 1, 'per_page': per_page})
items = serialize_response(response)
tot_count = int(response.headers['Total-Count'])
tot_pages = ceil(tot_count / per_page)
if tot_pages <= 1:
return items
for page in range(2, tot_pages + 1):
items += get_item(url, headers, page=page, per_page=per_page)
return items
Wenn Sie auf Dynamo DB-Tabellenübersicht / Stream-Details / Stream-Verwaltung klicken, wird Folgendes angezeigt. Wenn festgelegt, werden Stream-Daten (Daten vor und nach der Änderung) gestreamt, wenn Dynamo DB aktualisiert wird. (In 5. werden diese Stream-Daten als Auslöser verwendet, um Lambda laufen zu lassen.)
Aktualisieren Sie Dynamo DB mit der folgenden Funktion. Wenn sich die ID nicht in Dynamo DB befindet, wird sie neu erstellt. Wenn die ID vorhanden ist und die Anzahl der Likes (iine) geändert wird, wird sie aktualisiert, andernfalls erfolgt keine Änderung. Nur neu erstellte und aktualisierte Elemente werden als Daten übertragen.
import boto3
from botocore.exceptions import ClientError
def update_logs(items: List[Dict[str, Any]]):
"""Update the number of iine in Dynamo DB
If item ID do not exist in Dynamo DB, insert them in it
"""
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('iine_qiita_logs')
for item in items:
ids = item.get('id')
title = item.get('title')
iine = item.get('likes_count')
try:
response = table.update_item(
Key={'ids': ids},
UpdateExpression="set iine = :newiine, title = :title",
ConditionExpression="attribute_not_exists(ids) or iine <> :newiine",
ExpressionAttributeValues={
":newiine": iine,
":title": title
},
)
except ClientError as e:
if e.response['Error']['Code'] == "ConditionalCheckFailedException":
print(e.response['Error']['Message'])
else:
raise
Setze 2 bis 3 Codes auf Lambda zusammen. Dann lösen Sie "Cloud Watch Events" aus. (Obwohl es wegen Zako offensichtlich zu häufig ist: cry :) Von 9 bis 1 Uhr morgens erfolgt die Verarbeitung alle 15 Minuten.
Anschließend werden die folgenden Dynamo DB-Elemente regelmäßig aktualisiert und Daten werden gestreamt.
Erstellen Sie Lambda auf der Benachrichtigungsseite. Da die aktualisierten Stream-Daten bis zu 4 fließen, ist Lambda erforderlich, das die Stream-Daten empfängt und den Prozess ausführt. Stellen Sie den Trigger einfach wie unten gezeigt auf Dynamo DB.
Stream-Daten können aus dem ersten Argument des in Lambda angegebenen Handlers wie folgt abgerufen werden. (Referenz: Lambda ausführen, ausgelöst durch DynamoDB-Stream)
def serialize_record(record: Dict[str, Any]) -> Dict[str, Any]:
"""serialize data of Dynamo DB Stream"""
if record.get('eventName') != 'MODIFY':
return {}
past = record.get('dynamodb', {}).get('OldImage')
past_iine = int(past.get('iine', {}).get('N', 0))
ids = past.get('ids', {}).get('S', '')
new = record.get('dynamodb', {}).get('NewImage')
title = new.get('title', {}).get('S', '')
new_iine = int(new.get('iine', {}).get('N', 0))
return {
'ids': ids,
'title': title,
'new_iine': new_iine,
'past_iine': past_iine
}
def lambda_handler(event, context):
"""main handler for Lambda"""
records = event.get('Records', [])
for record in records:
serialized_data = serialize_record(record)
...
Nachdem Sie die ID des Artikels haben, dessen Anzahl an Likes gestiegen ist, können Sie die gewünschte Benutzer-ID aus `` `GET / api / v2 / items /: item_id / lik``` der Qiita API v2 abrufen. Machen.
def serialize_response_name(response: Response, new_size: int, num: int, title: str) -> Dict[str, Any]:
"""serialize iine data of Qiita API v2"""
size = new_size - num
if size <= 0:
users: List[str] = []
else:
new_iine = response.body[:size]
users = [
resp.get('user', {}).get('id') for resp in new_iine
]
return {
'title': title,
'users': users
}
def get_new_iine(item: Dict[str, Any], token: str) -> Dict[str, Any]:
"""HTTP request to Qiita API v2"""
headers = {'Authorization': 'Bearer {}'.format(token)}
ids = item.get('ids', '')
past_iine = item.get('past_iine', 0)
new_iine = item.get('new_iine', 0)
url = f'https://qiita.com/api/v2/items/{ids}/likes'
response = req_get(url, headers=headers)
title: str = item.get('title', '')
resp = serialize_response_name(response, new_iine, past_iine, title)
return resp
Sie können ein Zugriffstoken erhalten, indem Sie sich anmelden, auf "Zugriffsgespräch von meiner Seite aus" drücken, auf "Benachrichtigungen von LINE-Benachrichtigung 1: 1 empfangen" und dann auf "Problem" klicken.
Alles was Sie tun müssen, ist es entsprechend zu formatieren und zu veröffentlichen.
def deserialize_response_name(response: Dict[str, Any], max_length=20) -> str:
"""deserialize text for LINE Notify
:param max_length: max sentence length
"""
names = ", ".join(response.get('users', []))
title = response.get('title', '')
title = f"{title}" if len(title) <= max_length else f"{title[:max_length]}..."
return f"\n{names}Aber"{title}Ich mochte es."
def send_notification(message: str, token: str):
"""send notification by LINE notify"""
url = 'https://notify-api.line.me/api/notify'
headers = {
'Authorization': 'Bearer {}'.format(token),
'Content-Type': 'application/x-www-form-urlencoded'
}
msg = {'message': message}
response = req_post(url, data=msg, headers=headers)
return response.body
Das ist alles für den Zweck dieses Artikels. Wenn Sie danach die folgende Funktion im Handler festlegen, wird die Benachrichtigung ausgeführt.
def lambda_handler(event, context):
"""main handler for Lambda"""
qiita_token = os.environ["QIITA_TOKEN"]
line_token = os.environ["LINE_TOKEN"]
records = event.get('Records', [])
for record in records:
serialized_data = serialize_record(record)
if not serialized_data:
continue
new_iines = get_new_iine(serialized_data, qiita_token)
if len(new_iines.get('users')) == 0:
continue
send_notification(deserialize_response_name(new_iines), line_token)
return {
'statusCode': 200,
}
Benachrichtigungsbeispiel:
Sie können jetzt LINE-Benachrichtigungen sicher empfangen. Ich fand es auch ein gutes Thema, um mit der ereignisgesteuerten Anwendungsentwicklung mit AWS zu beginnen. Ich bin dem Autor der Originalgeschichte als Referenz dankbar. .. ..
Danke, dass du bis zum Ende für mich gelesen hast! Ich hoffe es wird hilfreich für Sie sein! Refs