[PYTHON] Exemple d'implémentation du serveur LINE BOT pour un fonctionnement réel

Aperçu

Considérant un serveur LINE BOT qui peut être réellement exploité, un traitement asynchrone est requis comme écrit par yoichiro6642 à l'URL de référence suivante. URL de référence: Architecture de serveur LINE BOT qui est sûre même si un grand nombre de messages arrivent

Sequence Diagram.png

J'ai écrit un serveur LINE BOT (squelette) selon ce qui précède dans le but de pouvoir supporter un grand nombre de messages dans un environnement de petite à moyenne échelle. Le dernier "Réduire le nombre d'appels API" (spécifier plusieurs MID dans la transmission des messages pour réduire le nombre d'appels PI) n'est pas implémenté. L'environnement utilisé est le suivant.

J'avais l'option Amazon API Gateway + Lambda + DynamoDB, mais je me demandais si Node.js + MongoDB + Python pouvait implémenter un Dispatcher & jobWorker léger avec moins de frais généraux.

Pour Queue, RabbitMQ, memcached, Redis, etc. ont été pris en compte, mais j'ai utilisé MongoDB pour les raisons suivantes.

Connaissances préalables

Exemple d'implémentation

Préparation de MongoDB

J'ai spécifié la réplication et oplogSizeMB.

text:mongod.line_bot.conf


systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod.line_bot.log
storage:
  dbPath: /var/lib/mongo/line_bot
  journal:
    enabled: true
processManagement:
  fork: false  # fork and run in background
  pidFilePath: /var/run/mongodb/mongod.line_bot.pid  # location of pidfile
net:
  port: 27017
  bindIp: 127.0.0.1  # Listen to local interface only, comment to listen on all interfaces.
replication:
  oplogSizeMB: 3072

Lancez Mongod

Démarrez en mode maître.

$ mongod --master -f mongod.line_bot.conf

Créer une collection

La collection est une collection plafonnée afin que vous n'ayez pas à vous soucier de la croissance de la capacité.

create_collection


#!/bin/bach -v
mongo --port=27017 <<EOF
use line_bot;
db.createCollection("recvq", {
  capped: true,
  size: 1048576000 // 1GB
});
EOF

BOT Server(Node.js) frontDesk.js reçoit un message de LINE Server et renvoie une réponse immédiate.

frontDesk.js


// Settings of the this program
var httpsPort = 443;
var allowPath = "/callback";
var httpsOpt  = {
    "caKey"  : "/etc/letsencrypt/live/xxx/privkey.pem",
    "caCert" : "/etc/letsencrypt/live/xxx/fullchain.pem",
    "caCa"   : "/etc/letsencrypt/live/xxx/chain.pem"
};
local = {};
local['channelSecret'] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";

// Settings of the MongoDB
var mongoHost = "127.0.0.1";
var mongoPort = 27017;
var mongoDb   = "line_bot";
var mongoCol  = "recvq";

var express= require('express'),
bodyParser = require('body-parser'),
log4js     = require('log4js'),
https      = require('https'),
fs         = require('fs'),
mongo      = require('mongodb'),
path       = require('path');

var accept = require(__dirname+'/accept');
var app    = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));

// MongoDB
var MongoClient = require('mongodb').MongoClient, assert = require('assert');
var mongoUrl = 'mongodb://'+mongoHost + ":" + mongoPort + "/" + mongoDb;
set_col(local, mongoUrl, function(rc, local, mongoUrl) {
    if (!rc) {
        console.log("set_col.rc:"+rc);
        local.db.close();
        process.exit(1);
    }
    console.log("Connected succesfully to "+mongoUrl);
});

// handle a request
app.post(allowPath, function(req, res, next) {
    local['acceptTime'] = new Date().getTime();  // record accept time(ms)

    // response ASAP
    res.status(200).send("OK");
    res.end();

    accept.post_callback(req, res, next);  // Handle the request
});

// server certificate authority
var httpsOpt = {
    key:  fs.readFileSync(httpsOpt.caKey),
    cert: fs.readFileSync(httpsOpt.caCert),
    ca:   fs.readFileSync(httpsOpt.caCa)
};
// listen port
var httpsServer = https.createServer(httpsOpt, app);
httpsServer.listen(httpsPort, function() {
    console.log('Listening on port '+httpsPort+'...'); 
}).on('error', function(err) {
    if (err.errno === 'EADDRINUSE') {
        console.log('This program is already running.');
    } else {
        console.log(err);
    }
    process.exit(1);
});

function set_col(local, url, callback) {
    // Use connect method to connect to the MongoServer
    MongoClient.connect(url, function(err, db) {
        if (err) {
            console.log("MongoDB connection error."); console.log(err);
            process.exit(1);
        }
        local['db'] = db;

        local.db.collection(mongoCol, function(err, collection) {
            if (err) {
                console.log("MongoDB collection error."); console.log(err);
                process.exit(1);
            }
            local.db['collection'] = collection;
            callback(true, local, url);
        });
    });
}

Après cela, utilisez accept.js pour vérifier la signature et l'enregistrer dans MongoDB.

accept.js


var crypto = require('crypto');
var assert = require('assert');

exports.post_callback = function(req, res) {
    //Vérifier la signature
    if ((! req.headers) || (! req.headers["x-line-channelsignature"])) {
        console.log("400. Bad Request. The request does not have a x-line-channelsignature");
        return;
    }

    //Vérifiez le résultat de la demande
    if ((! req.body) ||
        (! req.body['result'])) {
        console.log("400. Bad Request. The request does not have result");
        return;
    }
    var result_num = req.body.result.length;

    //Chiffrement Sha256 du corps HTTP avec channelSecret,Demandez un condensé base64.
    var body_str = new Buffer(JSON.stringify(req.body), 'utf8');
    computedSignature = crypto.createHmac("sha256",local['channelSecret']).update(body_str).digest("base64");

    //Comparez les signatures et confirmez la validité
    if (req.headers["x-line-channelsignature"] != computedSignature) {
        console.log("400. Bad Request. The x-line-channelsignature is wrong.");
        return;
    }

    //Entrez l'heure de réception
    for (var i=0; i<Object.keys(req.body.result).length; i++) {
        req.body.result[i]['acceptTime'] = local['acceptTime'];
    }

    //Enregistrer le message dans MongoDB
    local.db.collection.insertMany(req.body.result, function(err, r) {
        assert.equal(null, err);
        assert.equal(result_num, r.insertedCount);

        toQueueTime = new Date().getTime() - local['acceptTime'];
        console.log("necessary time to store to queue: "+toQueueTime+" ms");

        return;
    });

}

Dispatcher & jobWorker Implémenté en multithreading Python. Le thread jobWorker attend threading.Event () avec wait () lors de sa création. Le thread de déclenchement surveille oplog avec ts et commence le traitement lorsqu'il est ajouté à la file d'attente. Attribuez le contenu de la file d'attente de lecture à un thread jobWorker gratuit, définissez Event et laissez jobWorker démarrer le traitement.

Je suis conscient des références de thread et des mises à jour des listes et des variables, donc je ne les verrouille pas ... J'avais l'intention de le faire, mais lorsque j'ai accédé au serveur LINE API dans plusieurs threads, une erreur concernant le nombre de connexions simultanées s'est produite. Par conséquent, le verrou exclusif est utilisé dans acquiert () pour accéder au serveur API LINE à partir de jobWorker. Puisque le document ne décrit pas cette zone, je l'ai mis à 1 multiplex et intervalle d'accès de 100 ms. Je suis nouveau dans le multithreading Python, veuillez donc signaler toute erreur.

dispatcher.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Settings of the this program
NumOfThread       = 20
searchInterval    = 100000  # uSec
mainSleepInterval = 60      # Sec

# Settings of the MongoDB
mongoHost = "127.0.0.1";
mongoPort = 27017;
mongoDb   = "line_bot";
mongoCol  = "recvq";

import os,os.path
import sys
import threading
import time
import json
import pymongo
from   pymongo.cursor import CursorType
from   datetime import datetime
import datetime
import jobWorker

usleep = lambda x: time.sleep(x/1000000.0)  #Micro secondes de sommeil


#####fil de travail
def workerThread(tt):
    tno = str(tt[0])
    while True:
        tt[2].clear()  #Effacer l'événement et attendre qu'Evant se produise
        tt[3] = 'w'
        tt[2].wait()
        if verbose:  #L'attente se termine. Commencer le traitement
            print '\nworker['+tno+']: wake up'
        
        #Appelez la fonction de traitement actuelle ici
        jobWorker.jobWorker(verbose, tno, tt[4]['o'])


#####Thread de déclenchement MongoDB
def TriggerMongo(t, tchain, last, searchInterval, host, port, db, col):
    dbCol = db + '.' + col
    c = pymongo.MongoClient(host, port)
    # Uncomment this for master/slave.
    oplog = c.local.oplog['$main']
    # Uncomment this for replica sets.
    #oplog = c.local.oplog.rs
    
    first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
    ts = first['ts']
    
    while True:
        cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
        while cursor.alive:
            for doc in cursor:
                #Régulièrement{h:0,ts:Timestamp(nn.., 1),o:{},v:2,ns:'',op:'n'}Retour
                #Est op:'n'C'est juste une information. ignorer.
                if doc['ns']==dbCol and doc['op']!='n':
                    #Trouver des fils gratuits
                    i = tchain[last]
                    while t[i][3] != 'w':
                        i = tchain[i]
                        if i == tchain[last]:  #Si vous cherchez un tour
                            usleep(searchInterval)
                    
                    t[i][4] = doc  #Fil gratuit t[n][4]Stocker les données dans
                    t[i][3] = 'r'
                    t[i][2].set()  # t[n]Traitement de l'instruction de démarrage
                    last = i
                # Work with doc here
                ts = doc['ts']
        print "got out of a while corsor.alive loop"


#######################################################################

# Check of the parameter
verbose = False
if len(sys.argv)==2 and sys.argv[1]=='-v':
    verbose = True
elif len(sys.argv)!=1:
    print "Usage: %s [-v]" % (sys.argv[0],)
    quit()

#création de données de gestion des threads de travail&création de thread de travail
# [ThreadNo, ThreadObj ,EvantObj, status,Données à transmettre au thread]
#   (status ' ':en préparation, 'w':En attente / libre, 'r':Fonctionnement)
#  :
t = [ [0 for i in range(5)] for i in range(NumOfThread)]
for i in range(NumOfThread):
    t[i][0] = i                  # Thread No.
    t[i][2] = threading.Event()  #Génération d'objets Evant
    t[i][3] = ' '                # is_running
    #création de thread de travail
    t[i][1] = threading.Thread(name='worker['+str(i)+']', target=workerThread,
                               args=(t[i],))
    t[i][1].setDaemon(True)

# Thread list of circulation
tc = [0 for i in range(NumOfThread)]  #La valeur est le fil suivant Non.
for i in range(NumOfThread):
    tc[i] = i+1
tc[i] = 0  # make a list of circulation

lastThread = i  #Dernier fil utilisé.Vient ensuite tc[lastThread]Utilisez le deuxième fil.

#début du thread de travail
for i in range(NumOfThread):
    t[i][1].start()

#Attendre l'état d'attente après le démarrage du thread de travail
yetAllThread = True
while yetAllThread:
    for i in range(NumOfThread):
        if t[i][3] == ' ':
            break
        else:
            usleep(100)  #L'intervalle de surveillance est de 0.1 milliseconde
    if i == NumOfThread-1:
        yetAllThread = False
    else:
        usleep(100)  #L'intervalle de surveillance est de 0.1 milliseconde

#Génération de threads de déclenchement MongoDB
t_mongo = threading.Thread(name='t_mongo', target=TriggerMongo, args=(t,tc,lastThread,searchInterval,mongoHost,mongoPort,mongoDb,mongoCol,))
t_mongo.setDaemon(True)
t_mongo.start()  #début

#fil conducteur
while True:
    time.sleep(mainSleepInterval)

jobWorker.py est le thread qui effectue le traitement réel. Il s'agit d'un échantillon qui ne renvoie les perroquets qu'en fonction du type de contenu à envoyer. Veuillez noter que la méthode de prise de MID (de) diffère selon l'opType.

jobWorker.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Note of caution:
# - This program is one of the threads.
# - Must not do exit(),quit()
# - Please use only return()

# Settings of the LINE API Server
lineApiHost      = "trialbot-api_line_me"
accessIntervalMS = 100  # ms
getProfilesUrl   = "https://trialbot-api.line.me/v1/profiles"
postUrl          = "https://trialbot-api.line.me/v1/events"
getContentUrl    = "https://trialbot-api.line.me/v1/bot/message/$messageId/content"
header =  {
    "Content-Type"                : "application/json; charser=UTF-8",
    "X-Line-ChannelID"            : "xxxxxxxxxx",
    "X-Line-ChannelSecret"        : "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "X-Line-Trusted-User-With-ACL": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}
postBodyTemplate = {
    "to"          : [],
    "toChannel"   : 1383378250,
    "eventType"   : "138311608800106203",
    "content"     : {}
}

import threading
import time
import datetime
import json

usleep = lambda x: time.sleep(x/1000000.0)  #Micro secondes de sommeil

#Verrou lié pour éviter l'accès simultané au serveur API LINE à partir de plusieurs threads de travail
globalLock = {}            #Verrouiller pour chaque destination de connexion
globalLastAccessTime = {}  #Heure du dernier accès pour chaque destination de connexion
loadTime = int(time.time()*1000)

def jobWorker(verbose, mynum, recvBody):
    global globalLock
    global globalLastAccessTime
    
    #Paramètres de verrouillage initiaux pour chaque destination de connexion
    if not globalLock.has_key(lineApiHost):
        globalLock[lineApiHost] = threading.Lock()
    #Réglage initial de l'heure du dernier accès pour chaque destination de connexion
    if not globalLastAccessTime.has_key(lineApiHost):
        globalLastAccessTime[lineApiHost] = loadTime
    
    if verbose:
        recvBody['_id'] = 'ObjectId("'+str(recvBody['_id'])+'")'
        print 'worker['+mynum+'] recvBody: '+str(int(time.time()*1000)-recvBody['acceptTime'])+' ms to here from accept'
        print recvBody
    
    opType = recvBody['content'].get('opType')
    
    # blocked from user
    if opType == 8:
        #MID du bloc utilisateur de la gestion des utilisateurs(recvBody['content']['params'][0])Effacer
        print 'please delete user "'+recvBody['content']['params'][0]+'" from management data.'
        return
    
    #Copier la partie corps de POST
    postBody = {}
    postBody['to'] = ''
    postBody['toChannel'] = postBodyTemplate['toChannel']
    postBody['eventType'] = postBodyTemplate['eventType']
    postBody['content'] = {}
    
    #Destination de la réponse au message
    if opType==4:  # New user
        postBody['to'] = [ recvBody['content']['params'][0] ]
    else:
        postBody['to'] = [ recvBody['content']['from'] ]
    
    # New user
    if opType==4:
        #Obtenir le profil utilisateur
        result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
        userProfile = json.loads(result.text)
        resText = 'Bienvenue!'
        #Le profil doit être ajouté à la base de données de gestion des utilisateurs avec le MID de l'utilisateur
        print 'please add '+userProfile['contacts'][0]['displayName']+'\'s profile to user management db.'
        print json.dumps(userProfile, sort_keys = True, indent=4)
    
    #Traitement selon le message
    contentType = recvBody['content'].get('contentType')
    resText = ''
    if contentType == 1:  # Text
        resText = u'Oui,'+recvBody['content']['text']+u',n'est-ce pas.'
    elif contentType == 2:  # Image
        resText = u'C'est une photo...'
    elif contentType == 3:  # Video
        resText = u'C'est une vidéo...'
    elif contentType == 4:  # Audio
        resText = u'C'est un message vocal...'
    elif contentType == 7:  # Location
        resText = u'C'est des informations de localisation...'
        if verbose:
            print recvBody['content']['text'].encode('utf-8')
            print recvBody['content']['location']['title'].encode('utf-8')
            print recvBody['content']['location']['address'].encode('utf-8')
    elif contentType == 8:  # Sticker
        resText = u'C'est un timbre'
        if verbose:
            print recvBody['content']['contentMetadata']['STKTXT'].encode('utf-8')
    elif contentType == 10: # Contact
        # Contact(contentType==10)Obtenez le profil intermédiaire de contentMetadata
        resText = recvBody['content']['contentMetadata']['displayName']+u'Ce sont vos coordonnées'
        result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, recvBody['content']['contentMetadata']['mid'], accessIntervalMS)
        contactProfile = json.loads(result.text)
        if verbose:
            print '\ncontactProfile: ' + str(contactProfile)
    
    #Envoyer un message de réponse
    if resText:
        #Obtenir le profil utilisateur(Enregistré à l'origine dans la base de données au moment de l'enregistrement de l'utilisateur, acquis au besoin)
        result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
        userProfile = json.loads(result.text)
        resText = userProfile['contacts'][0]['displayName'] + u'M.' + resText
        if verbose:
            print '\nprofile: ' + str(userProfile)
        
        #Le message sortant est un texte(ContentType=1).En outre, Image,Video,Audio,Location,Sticker,multiple messages,Vous pouvez envoyer des messages riches
        postBody['content'] = {
            'contentType': 1,
            'toType'     : 1,
            'text'       : resText
        }
        if verbose:
            print '\nworker['+mynum+'] ' + postUrl
            print 'worker['+mynum+'] postHeader: ' + json.dumps(header, sort_keys = True, indent = 4)
            print 'worker['+mynum+'] postBody: ' + json.dumps(postBody, sort_keys = True, indent = 4)
        
        #Envoyer un message
        r = apiServer(verbose, mynum, 'post', lineApiHost, postUrl, header, postBody, accessIntervalMS)
    
    return


#Accès au serveur de l'API LINE
def apiServer(verbose, mynum, method, host, url, header, body, accessIntervalMS):
    import requests
    global globalLock
    global globalLastAccessTime
    
    globalLock[host].acquire()  # Lock
    
    #Combien de temps dois-je attendre si j'ai un certain temps pour accéder au serveur de l'API LINE?
    currentTime = int(time.time()*1000)
    remain = accessIntervalMS - (currentTime - globalLastAccessTime[host])
    
    if verbose:
        print 'worker['+mynum+'] time since last access(ms): '+str(currentTime - globalLastAccessTime[host])
        print 'worker['+mynum+'] remain='+str(remain)+' ms'
    
    # wait accessIntervalMS from last access
    if remain > 0:
        usleep(remain*1000)
    
    if method=='get':
        if body:
            payload = { 'mids': body }
            r = requests.get(url, params=payload, headers=header)
        else:
            if verbose:
                print url, header
            r = requests.get(url, headers=header)
    else:
        r = requests.post(url, data=json.dumps(body), headers=header)
    
    if verbose and r.status_code!=200:
        print 'worker['+mynum+'] HTTP status code: ' + str(r.status_code)
        print 'worker['+mynum+'] response: ' + r.text
    
    globalLastAccessTime[host] = int(time.time()*1000)
    
    globalLock[host].release()  # release
    return r

Résumé

Je pense que nous avons pu implémenter le squelette de Dispatcher & jowWorker, un mécanisme de file d'attente léger qui peut être utilisé même lorsqu'un grand nombre de messages arrivent. Dans l'état initial de 64 bits CentOS 7, la limite supérieure du nombre de threads dans tout le système est 30118, mais s'il s'agit de ..5000 threads, la génération échouera. (... je n'ai pas besoin de tant que ça) Un tel mécanisme est nécessaire non seulement pour le serveur BOT, mais également pour livrer efficacement une grande quantité de courrier en utilisant plusieurs serveurs SMTP.

Si vous souhaitez faire du côté jobWorker une application de langue différente, vous pouvez l'utiliser en en faisant un micro service ou en le modifiant pour qu'il communique avec pipe dans un autre processus. Si vous voulez distribuer la charge avec ce mécanisme, faites de MongoDB un autre serveur ou Sharding, ou apportez "4. message récurrent" ou plus tard sur une autre machine. Si vous souhaitez répartir les travailleurs plus que cela, il est préférable d'en faire un micro-service ou d'utiliser un autre mécanisme de courtier de demandes.

Recommended Posts

Exemple d'implémentation du serveur LINE BOT pour un fonctionnement réel
Construisez un serveur API pour vérifier le fonctionnement de l'implémentation frontale avec python3 et Flask
Implémentation de Scale-Space pour SIFT
Notifier LINE des informations sur l'exploitation du train
Exemple d'implémentation d'un réseau de génération hostile (GAN) par Keras
Créer un LINE BOT avec Minette pour Python
Formatez une ligne de json pour une visualisation facile
Code pour vérifier le fonctionnement de Python Matplot lib
Implémentation du modèle Deep Learning pour la reconnaissance d'images
Exemple d'implémentation simple d'un type d'augmentation de données