[PYTHON] I built an application with Lambda that notifies LINE of "likes" using the Qiita API

(Addition) It was just free, so I will post it on the 9th day of AWS Lambda and Serverless # 2 Advent Calendar 2019.

Overview

I suddenly thought that I wanted the following LINE notification of Qiita's likes, line.jpg

Discover related articles! -> We built a gentle world that will notify you LINE when you like Qiita

The structure is simple (and the article itself) is very helpful.

This time, I wanted to create an event-driven application using AWS services such as Lambda, so I changed the configuration a little and tried to imitate it.

Flow of this article

  1. Configuration overview
  2. Introduction of the technical elements used
  3. Implementation description

1. Configuration overview

Diagram

Diagram.jpg

change point

The original article is implemented with the following simple structure.

--Scraping the notification column of Qiita --Difference from the notification column log saved in the DB --Send the difference to IFTTT's webhooks (IFTTT notifies LINE) --Periodical execution of the above series of processes on AWS Lambda

This time, I reconsidered the configuration as follows so that Lambda processing can be completed more concisely and quickly.

--Scraping-> Change to aggregation using Qiita API --Eliminate sleep to reduce processing time --Easy to build environment --All processing with one Lambda-> Divide into two Lambda (divide into data collection function and notification function) --Difference from past logs using Dynamo DB Stream (no need to write script)

Since I am using Lambda and Dynamo DB for the first time this time, I feel strongly that I want to try various functions. So I can't deny the feeling of overdoing it. I think the original article is smarter.

2. Introduction of the technical elements used

I used the following:

Qiita API v2 The API is available for Qiita (Qiita API v2 documentation). You can easily get various information. However, unfortunately there is no API that can receive notifications, so we will implement notifications by combining the following.

function end point advantage Disadvantage
Get a list of likes GET /api/v2/authenticated_user/items You can get the total number of likes up to 100 articles with one get Memory is consumed because the response contains the body
Get the user ID you liked GET /api/v2/items/:item_id/likes You can get users who like each article You need to get as many articles as you need

Remarks

Lambda pays only for the time you use it, so reducing processing time is prioritized over strict notifications. I think that it is the most strict to get all the user IDs that you like regularly and take the difference, but since Qiita API v2 can only get the user IDs that you like for each article, API as many as the number of articles You will need to hit. Qiita likes have the following tendencies. (Reference: 7 "surprise" found in the analysis of Qiita articles in the last year)

--Amazing ① The average value of "likes" is 8.02. More than half is 0-2 --Amazing ② The distribution of "likes" is so biased that it cannot be graphed.

Therefore, it is thought that likes updates are also biased toward specific articles. It seems that it is not worth it to take the difference one by one. Therefore, I try to get only the number of likes at once by getting the list, narrow down only the articles whose number of likes has changed, and hit the API multiple times to get the user ID that I liked.

We only look at the total number of likes per article, which reduces rigor in the event of a like cancellation, but at the expense of that.

Lambda + Dynamo DB Stream This time, basically, it is enough to run the process regularly (once every 15 minutes, etc.). With a regular Web Server, you're just wasting most of your time booting. If it is a common pay-as-you-go service, it will be lost. However, Lambda charges only for the compute time you actually use, and you don't charge when your code isn't running.

Due to the nature of using resources as much as necessary, various triggers for processing execution can be selected. The following triggers are suitable for this requirement.

--CloudWatch Events: Periodic execution --Dynamo DB Stream: When the DB is changed, it receives the changed data and executes the process.

LINE Notify

You can notify LINE just by putting the access token in the header and POSTing the message. Also, getting an access token is very easy.

3. Implementation description

The implementation procedure is as follows. We will also reprint the block diagram to understand the role of each implementation.

  1. Building an environment for Lambda and Dynamo DB Stream
  2. Get a list of likes (②)
  3. Save the difference in Dynamo DB and stream it (③, ④)
  4. Regular execution with Lambda (①)
  5. Receive the stream and get the users who liked it (⑤)
  6. LINE notification (⑥, ⑦)
Diagram.jpg

I would like to introduce an excerpt of the code used in Lambda. You can see the code you are actually using from here.

1. Building an environment for Lambda and Dynamo DB Stream

I will omit it because it deviates from the main subject.

The following is very helpful and recommended. The content of this article is sufficient if you can hold down to the section "Testing on Lambda". (Reference: First API development using Lambda and DynamoDB)

2. Get a list of likes (②)

HTTP request

In Python, you want to use Requests, but in Lambda you can't use pip install, so it's a hassle to use anything other than built-in functions. (If you still want to use it, here) So, first, prepare a function for get and post request with urllib. The interface is as close to Requests as possible. The req_get and req_post functions take the same arguments as the requests.get and requests.post functions. Also, the Response object can get the contents of the json response with .body.

import json
from urllib.request import Request
from urllib import request, parse, error
from http.client import HTTPResponse


class Response():
    """Http Response Object"""

    def __init__(self, res: HTTPResponse):
        self.body = self._json(res)
        self.status_code = self._status_code(res)
        self.headers = self._headers(res)

    def _json(self, res: HTTPResponse):
        return json.loads(res.read())

    def _status_code(self, res: HTTPResponse) -> int:
        return res.status

    def _headers(self, res: HTTPResponse) -> Dict[str, str]:
        return dict(res.getheaders())


def req_get(url: str, headers=None, params=None) -> Response:
    """get request. simplified request function of Requests
    :return: Response object
    """
    if params:
        url = '{}?{}'.format(url, parse.urlencode(params))

    req = Request(url, headers=headers, method='GET')

    with request.urlopen(req) as res:
        response = Response(res)
    return response


def req_post(url: str, data: Dict[str, Any], headers=None) -> Response:
    """post request. simplified request function of Requests
    :return: Response object
    """
    if headers.get('Content-Type') == 'application/x-www-form-urlencoded':
        encoded_data = parse.urlencode(data).encode()
    else:
        encoded_data = json.dumps(data).encode()

    req = Request(url, data=encoded_data, headers=headers, method='POST')

    with request.urlopen(req) as res:
        response = Response(res)
    return response

Get a list of likes with Qiita API v2

Documentation and [Support](https://help.qiita.com/en/articles/ (qiita-search-options) and hit `GET / api / v2 / authenticated_user / items```. Here, I am using the function `serialize_response``` that discards unnecessary values (only the ID and title and the number of likes are required). Also, if you have a large number of articles, pagination is required. Therefore, since the header contains the total number of articles of the user, the maximum number of pagination is calculated by the first get, and the get is repeated.

def serialize_response(response: Response) -> List[Dict[str, Any]]:
    """serialize response of Qiita API v2"""
    keys = ['id', 'title', 'likes_count']
    return [
        {f: resp.get(f) for f in keys} for resp in response.body
    ]

def get_item(url: str, headers: Dict[str, str], **param) -> List[Dict[str, Any]]:
    """get a item by Qiita API v2 and return the list of serialized response (dictionary)"""
    response = req_get(url, headers=headers, params=param)
    return serialize_response(response)

def get_items(token: str, per_page=1, url='https://qiita.com/api/v2/authenticated_user/items') -> List[Dict[str, Any]]:
    """Pagination to get all articles of authenticated users"""
    headers = {'Authorization': 'Bearer {}'.format(token)}

    response: Response = req_get(url, headers=headers, params={'page': 1, 'per_page': per_page})
    items = serialize_response(response)
    tot_count = int(response.headers['Total-Count'])
    tot_pages = ceil(tot_count / per_page)
    if tot_pages <= 1:
        return items

    for page in range(2, tot_pages + 1):
        items += get_item(url, headers, page=page, per_page=per_page)
    return items

3. Save the difference in Dynamo DB and stream it (③, ④)

If you press Dynamo DB Table Overview / Stream Details / Stream Management, you will see the following. If set, stream data (data before and after the change) will be streamed when Dynamo DB is updated. (In 5., I will trigger Lambda with this stream data) stream.png

Update Dynamo DB with the following function. If the ID is not in Dynamo DB, it is newly created, if the ID exists and the number of likes (iine) is changed, it is updated, otherwise there is no change. Only newly created and updated items will be stream data.

import boto3
from botocore.exceptions import ClientError

def update_logs(items: List[Dict[str, Any]]):
    """Update the number of iine in Dynamo DB
    If item ID do not exist in Dynamo DB, insert them in it
    """
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('iine_qiita_logs')

    for item in items:
        ids = item.get('id')
        title = item.get('title')
        iine = item.get('likes_count')

        try:
            response = table.update_item(
                Key={'ids': ids},
                UpdateExpression="set iine = :newiine, title = :title",
                ConditionExpression="attribute_not_exists(ids) or iine <> :newiine",
                ExpressionAttributeValues={
                    ":newiine": iine,
                    ":title": title
                },
            )
        except ClientError as e:
            if e.response['Error']['Code'] == "ConditionalCheckFailedException":
                print(e.response['Error']['Message'])
            else:
                raise

4. Regular execution with Lambda (①)

Put 2 to 3 codes together on Lambda. Then, trigger "CloudWatch Events". (Although it is obviously too frequent because of Zako: cry :) From 9AM to 1AM, processing is done every 15 minutes. lambda1.png

Then, the following Dynamo DB items will be updated periodically and stream data will be streamed. dynamodb.png

5. Receive the stream and get the users who liked it (⑤)

Build a Lambda on the notification side. Since the stream data of the update has started to flow up to 4., Lambda that receives the stream data and executes the process is required. Just set the trigger to Dynamo DB as below. lambda2.png

Stream data can be obtained from the first argument of the handler specified in Lambda as follows. (Reference: Run Lambda triggered by DynamoDB Stream)

def serialize_record(record: Dict[str, Any]) -> Dict[str, Any]:
    """serialize data of Dynamo DB Stream"""
    if record.get('eventName') != 'MODIFY':
        return {}

    past = record.get('dynamodb', {}).get('OldImage')
    past_iine = int(past.get('iine', {}).get('N', 0))
    ids = past.get('ids', {}).get('S', '')

    new = record.get('dynamodb', {}).get('NewImage')
    title = new.get('title', {}).get('S', '')
    new_iine = int(new.get('iine', {}).get('N', 0))

    return {
        'ids': ids,
        'title': title,
        'new_iine': new_iine,
        'past_iine': past_iine
    }

def lambda_handler(event, context):
    """main handler for Lambda"""
    records = event.get('Records', [])
    for record in records:
        serialized_data = serialize_record(record)
        ...

By now, the ID of the article whose number of likes has increased has been obtained, so the user ID of the likes is obtained from `` `GET / api / v2 / items /: item_id / likes``` of Qiita API v2. To do.

def serialize_response_name(response: Response, new_size: int, num: int, title: str) -> Dict[str, Any]:
    """serialize iine data of Qiita API v2"""
    size = new_size - num
    if size <= 0:
        users: List[str] = []
    else:
        new_iine = response.body[:size]
        users = [
            resp.get('user', {}).get('id') for resp in new_iine
        ]
    return {
        'title': title,
        'users': users
    }

def get_new_iine(item: Dict[str, Any], token: str) -> Dict[str, Any]:
    """HTTP request to Qiita API v2"""
    headers = {'Authorization': 'Bearer {}'.format(token)}
    ids = item.get('ids', '')
    past_iine = item.get('past_iine', 0)
    new_iine = item.get('new_iine', 0)
    url = f'https://qiita.com/api/v2/items/{ids}/likes'

    response = req_get(url, headers=headers)
    title: str = item.get('title', '')
    resp = serialize_response_name(response, new_iine, past_iine, title)
    return resp

6. LINE notification (⑥, ⑦)

You can get an access token by logging in, pressing Issue Access Talk from My Page, pressing "Receive notifications from LINE Notify 1: 1", and then "Issue". line_access_token.png

All you have to do is format it appropriately and post it.

def deserialize_response_name(response: Dict[str, Any], max_length=20) -> str:
    """deserialize text for LINE Notify
    :param max_length: max sentence length
    """
    names = ", ".join(response.get('users', []))
    title = response.get('title', '')
    title = f"{title}" if len(title) <= max_length else f"{title[:max_length]}..."
    return f"\n{names}But"{title}I liked it."

def send_notification(message: str, token: str):
    """send notification by LINE notify"""
    url = 'https://notify-api.line.me/api/notify'
    headers = {
        'Authorization': 'Bearer {}'.format(token),
        'Content-Type': 'application/x-www-form-urlencoded'
    }
    msg = {'message': message}
    response = req_post(url, data=msg, headers=headers)
    return response.body

That's all for the purpose of this article. After that, if you set the following function in the handler, the notification will run.

def lambda_handler(event, context):
    """main handler for Lambda"""
    qiita_token = os.environ["QIITA_TOKEN"]
    line_token = os.environ["LINE_TOKEN"]

    records = event.get('Records', [])
    for record in records:
        serialized_data = serialize_record(record)
        if not serialized_data:
            continue
        new_iines = get_new_iine(serialized_data, qiita_token)
        if len(new_iines.get('users')) == 0:
            continue
        send_notification(deserialize_response_name(new_iines), line_token)

    return {
        'statusCode': 200,
    }

Notification example: line.jpg

Summary

You can now safely receive LINE notifications. I also felt that it was a good theme for getting started with event-driven application development using AWS. I am grateful to the author of the original story for reference. .. ..

Thank you for reading for me until the end! I hope it will be helpful for you! Refs -We built a gentle world that will notify you LINE when you like Qiita -Qiita API v2 documentation

Recommended Posts

I built an application with Lambda that notifies LINE of "likes" using the Qiita API
I made an API with Docker that returns the predicted value of the machine learning model
Create an app that notifies LINE of the weather every morning
Create an application using the Spotify API
Create an application that just searches using the Google Custom Search API with Python 3.3.1 in Bottle
A story that visualizes the present of Qiita with Qiita API + Elasticsearch + Kibana
I tried to get the authentication code of Qiita API with Python.
Get the number of articles accessed and likes with Qiita API + Python
I made a LINE BOT that returns a terrorist image using the Flickr API
With LINEBot, I made an app that informs me of the "bus time"
Create an app that works well with people's reports using the COTOHA API
I created a Slack bot that confirms and notifies AWS Lambda of the expiration date of an SSL certificate
LINE Bot that notifies you of the stocks of interest
I tried using the API of the salmon data project
I made an IFTTT button that unlocks the entrance 2 lock sesame with 1 button (via AWS Lambda)
I wrote a Python script that exports all my posts using the Qiita API v2
I touched the Qiita API
[Python] I tried collecting data using the API of wikipedia
I made an original program guide using the NHK program guide API.
I tried to build an estimation model of article titles that are likely to buzz with Qiita
I made a Line bot that guesses the gender and age of a person from an image
Note that I was addicted to accessing the DB with Python's mysql.connector using a web application.
The latest NGINX is an application server! ?? I measured the benchmark of NGINX Unit with PHP, Python, Go! !!
I tried scraping the ranking of Qiita Advent Calendar with Python
Get the number of PVs of Qiita articles you posted with API
I made a Twitter Bot with Go x Qiita API x Lambda
Using PhantomJS with AWS Lambda until displaying the html of the website
I made a slack bot that notifies me of the temperature
I tried using the checkio API
I did a preliminary survey of the API that receives Zoom meeting entry / exit webhooks on Lambda (1)
I tried to make "Sakurai-san" a LINE BOT with API Gateway + Lambda
I tried to get the movie information of TMDb API with Python
I wrote a Slack bot that notifies delay information with AWS Lambda
I want to create an API that returns a model with a recursive relationship in the Django REST Framework
I tried using Twitter api and Line api
Play with puns using the COTOHA API
I tried using the BigQuery Storage API
Create an easy-to-read pdf of laws and government ordinances using the law api
The story of making a web application that records extensive reading with Django
[Python] Automatically totals the total number of articles posted by Qiita using the API
To automatically send an email with an attachment using the Gmail API in Python
I made a github action that notifies Slack of the visual regression test
I tried to automatically send the literature of the new coronavirus to LINE with Python
Lambda expression (correction) that creates an index (dictionary with members as keys) of the members of the object being collected in python
A story that I wanted to display the division result (%) on HTML with an application using django [Beginner learns python with a reference book in one hand]
I checked the library for using the Gracenote API
I tried hitting the Qiita API from go
I tried using the Google Cloud Vision API
I tried to touch the API of ebay
LINE BOT with Python + AWS Lambda + API Gateway
Recent ranking creation using Qiita API with Python
I tried using the image filter of OpenCV
I made a twitter app that decodes the characters of Pricone with heroku (failure)
I tried to notify the update of "Become a novelist" using "IFTTT" and "Become a novelist API"
I tried to put out the frequent word ranking of LINE talk with Python
I tried to score the syntax that was too humorous and humorous using the COTOHA API.
Here is one of the apps with "artificial intelligence" that I was interested in.
Various memorandums when using sdk of LINE Messaging API with Python (2.7.9) + Google App Engine
How to output the number of VIEWs, likes, and stocks of articles posted on Qiita to CSV (created with "Python + Qiita API v2")