[PYTHON] Implementierungsbeispiel des LINE BOT-Servers für den tatsächlichen Betrieb

Überblick

In Anbetracht eines LINE BOT-Servers, der tatsächlich betrieben werden kann, ist eine asynchrone Verarbeitung erforderlich, wie von yoichiro6642 unter der folgenden Referenz-URL geschrieben. Referenz-URL: LINE BOT-Serverarchitektur, die auch dann sicher ist, wenn eine große Anzahl von Nachrichten eingeht

Sequence Diagram.png

Ich habe einen LINE BOT-Server (Skelett) wie oben beschrieben geschrieben, um einer großen Anzahl von Nachrichten in einer kleinen bis mittleren Umgebung standhalten zu können. Das letzte "Reduzieren der Anzahl von API-Aufrufen" (Geben Sie mehrere MIDs bei der Nachrichtenübertragung an, um die Anzahl von PI-Aufrufen zu reduzieren) ist nicht implementiert. Die verwendete Umgebung ist wie folgt.

Ich hatte die Option von Amazon API Gateway + Lambda + DynamoDB, aber ich fragte mich, ob Node.js + MongoDB + Python einen leichten Dispatcher & JobWorker mit weniger Overhead implementieren könnte.

Für Queue wurden RabbitMQ, memcached, Redis usw. berücksichtigt, aber ich habe MongoDB aus den folgenden Gründen verwendet.

Vorausgesetztes Wissen

Implementierungsbeispiel

MongoDB vorbereiten

Ich habe über Replikation und oplogSizeMB angegeben.

text:mongod.line_bot.conf


systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod.line_bot.log
storage:
  dbPath: /var/lib/mongo/line_bot
  journal:
    enabled: true
processManagement:
  fork: false  # fork and run in background
  pidFilePath: /var/run/mongodb/mongod.line_bot.pid  # location of pidfile
net:
  port: 27017
  bindIp: 127.0.0.1  # Listen to local interface only, comment to listen on all interfaces.
replication:
  oplogSizeMB: 3072

Starten Sie Mongod

Starten Sie im Master-Modus.

$ mongod --master -f mongod.line_bot.conf

Eine Sammlung erstellen

Die Sammlung ist eine begrenzte Sammlung, sodass Sie sich keine Gedanken über das Kapazitätswachstum machen müssen.

create_collection


#!/bin/bach -v
mongo --port=27017 <<EOF
use line_bot;
db.createCollection("recvq", {
  capped: true,
  size: 1048576000 // 1GB
});
EOF

BOT Server(Node.js) frontDesk.js empfängt eine Nachricht von LINE Server und gibt eine sofortige Antwort zurück.

frontDesk.js


// Settings of the this program
var httpsPort = 443;
var allowPath = "/callback";
var httpsOpt  = {
    "caKey"  : "/etc/letsencrypt/live/xxx/privkey.pem",
    "caCert" : "/etc/letsencrypt/live/xxx/fullchain.pem",
    "caCa"   : "/etc/letsencrypt/live/xxx/chain.pem"
};
local = {};
local['channelSecret'] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";

// Settings of the MongoDB
var mongoHost = "127.0.0.1";
var mongoPort = 27017;
var mongoDb   = "line_bot";
var mongoCol  = "recvq";

var express= require('express'),
bodyParser = require('body-parser'),
log4js     = require('log4js'),
https      = require('https'),
fs         = require('fs'),
mongo      = require('mongodb'),
path       = require('path');

var accept = require(__dirname+'/accept');
var app    = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));

// MongoDB
var MongoClient = require('mongodb').MongoClient, assert = require('assert');
var mongoUrl = 'mongodb://'+mongoHost + ":" + mongoPort + "/" + mongoDb;
set_col(local, mongoUrl, function(rc, local, mongoUrl) {
    if (!rc) {
        console.log("set_col.rc:"+rc);
        local.db.close();
        process.exit(1);
    }
    console.log("Connected succesfully to "+mongoUrl);
});

// handle a request
app.post(allowPath, function(req, res, next) {
    local['acceptTime'] = new Date().getTime();  // record accept time(ms)

    // response ASAP
    res.status(200).send("OK");
    res.end();

    accept.post_callback(req, res, next);  // Handle the request
});

// server certificate authority
var httpsOpt = {
    key:  fs.readFileSync(httpsOpt.caKey),
    cert: fs.readFileSync(httpsOpt.caCert),
    ca:   fs.readFileSync(httpsOpt.caCa)
};
// listen port
var httpsServer = https.createServer(httpsOpt, app);
httpsServer.listen(httpsPort, function() {
    console.log('Listening on port '+httpsPort+'...'); 
}).on('error', function(err) {
    if (err.errno === 'EADDRINUSE') {
        console.log('This program is already running.');
    } else {
        console.log(err);
    }
    process.exit(1);
});

function set_col(local, url, callback) {
    // Use connect method to connect to the MongoServer
    MongoClient.connect(url, function(err, db) {
        if (err) {
            console.log("MongoDB connection error."); console.log(err);
            process.exit(1);
        }
        local['db'] = db;

        local.db.collection(mongoCol, function(err, collection) {
            if (err) {
                console.log("MongoDB collection error."); console.log(err);
                process.exit(1);
            }
            local.db['collection'] = collection;
            callback(true, local, url);
        });
    });
}

Verwenden Sie danach accept.js, um die Signatur zu überprüfen und in MongoDB zu registrieren.

accept.js


var crypto = require('crypto');
var assert = require('assert');

exports.post_callback = function(req, res) {
    //Auf Unterschrift prüfen
    if ((! req.headers) || (! req.headers["x-line-channelsignature"])) {
        console.log("400. Bad Request. The request does not have a x-line-channelsignature");
        return;
    }

    //Überprüfen Sie das Ergebnis der Anfrage
    if ((! req.body) ||
        (! req.body['result'])) {
        console.log("400. Bad Request. The request does not have result");
        return;
    }
    var result_num = req.body.result.length;

    //Sha256-Verschlüsselung des HTTP-Körpers mit channelSecret,Fragen Sie nach einem Base64-Digest.
    var body_str = new Buffer(JSON.stringify(req.body), 'utf8');
    computedSignature = crypto.createHmac("sha256",local['channelSecret']).update(body_str).digest("base64");

    //Vergleichen Sie die Signaturen und bestätigen Sie die Gültigkeit
    if (req.headers["x-line-channelsignature"] != computedSignature) {
        console.log("400. Bad Request. The x-line-channelsignature is wrong.");
        return;
    }

    //Geben Sie die empfangene Zeit ein
    for (var i=0; i<Object.keys(req.body.result).length; i++) {
        req.body.result[i]['acceptTime'] = local['acceptTime'];
    }

    //Nachricht in MongoDB registrieren
    local.db.collection.insertMany(req.body.result, function(err, r) {
        assert.equal(null, err);
        assert.equal(result_num, r.insertedCount);

        toQueueTime = new Date().getTime() - local['acceptTime'];
        console.log("necessary time to store to queue: "+toQueueTime+" ms");

        return;
    });

}

Dispatcher & jobWorker Implementiert in Python Multithreading. Der jobWorker-Thread wartet beim Erstellen auf threading.Event () mit wait (). Der Trigger-Thread überwacht oplog mit ts und beginnt mit der Verarbeitung, wenn es der Warteschlange hinzugefügt wird. Ordnen Sie den Inhalt der Lesewarteschlange einem freien jobWorker-Thread zu, setzen Sie Event und lassen Sie jobWorker mit der Verarbeitung beginnen.

Mir sind Thread-Referenzen und Aktualisierungen von Listen und Variablen bekannt, daher sperre ich sie nicht ... Ich hatte vor, dies zu tun, aber als ich in mehreren Threads auf den LINE-API-Server zugegriffen habe, ist ein Fehler bezüglich der Anzahl gleichzeitiger Verbindungen aufgetreten. Daher wird die exklusive Sperre in purchase () verwendet, um von jobWorker aus auf den LINE-API-Server zuzugreifen. Da das Dokument diesen Bereich nicht beschreibt, habe ich ihn auf 1 Multiplex und ein Zugriffsintervall von 100 ms eingestellt. Ich bin neu in Python Multithreading. Bitte weisen Sie auf Fehler hin.

dispatcher.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Settings of the this program
NumOfThread       = 20
searchInterval    = 100000  # uSec
mainSleepInterval = 60      # Sec

# Settings of the MongoDB
mongoHost = "127.0.0.1";
mongoPort = 27017;
mongoDb   = "line_bot";
mongoCol  = "recvq";

import os,os.path
import sys
import threading
import time
import json
import pymongo
from   pymongo.cursor import CursorType
from   datetime import datetime
import datetime
import jobWorker

usleep = lambda x: time.sleep(x/1000000.0)  #Mikrosekundenschlaf


#####Arbeitsthread
def workerThread(tt):
    tno = str(tt[0])
    while True:
        tt[2].clear()  #Ereignis löschen und warten, bis Evant auftritt
        tt[3] = 'w'
        tt[2].wait()
        if verbose:  #Das Warten endet. Starten Sie die Verarbeitung
            print '\nworker['+tno+']: wake up'
        
        #Rufen Sie hier die eigentliche Verarbeitungsfunktion auf
        jobWorker.jobWorker(verbose, tno, tt[4]['o'])


#####MongoDB-Trigger-Thread
def TriggerMongo(t, tchain, last, searchInterval, host, port, db, col):
    dbCol = db + '.' + col
    c = pymongo.MongoClient(host, port)
    # Uncomment this for master/slave.
    oplog = c.local.oplog['$main']
    # Uncomment this for replica sets.
    #oplog = c.local.oplog.rs
    
    first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
    ts = first['ts']
    
    while True:
        cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
        while cursor.alive:
            for doc in cursor:
                #Regelmäßig{h:0,ts:Timestamp(nn.., 1),o:{},v:2,ns:'',op:'n'}Kehrt zurück
                #Ist op:'n'Ist nur Information. ignorieren.
                if doc['ns']==dbCol and doc['op']!='n':
                    #Finde freie Threads
                    i = tchain[last]
                    while t[i][3] != 'w':
                        i = tchain[i]
                        if i == tchain[last]:  #Wenn Sie nach einer Runde suchen
                            usleep(searchInterval)
                    
                    t[i][4] = doc  #Freier Thread t[n][4]Daten speichern in
                    t[i][3] = 'r'
                    t[i][2].set()  # t[n]Startanweisung verarbeiten
                    last = i
                # Work with doc here
                ts = doc['ts']
        print "got out of a while corsor.alive loop"


#######################################################################

# Check of the parameter
verbose = False
if len(sys.argv)==2 and sys.argv[1]=='-v':
    verbose = True
elif len(sys.argv)!=1:
    print "Usage: %s [-v]" % (sys.argv[0],)
    quit()

#Erstellung von Worker-Thread-Verwaltungsdaten&Arbeitsthread-Erstellung
# [ThreadNo, ThreadObj ,EvantObj, status,Daten, die an den Thread übergeben werden sollen]
#   (status ' ':in Vorbereitung, 'w':Warten / frei, 'r':Laufen)
#  :
t = [ [0 for i in range(5)] for i in range(NumOfThread)]
for i in range(NumOfThread):
    t[i][0] = i                  # Thread No.
    t[i][2] = threading.Event()  #Evant Objektgenerierung
    t[i][3] = ' '                # is_running
    #Arbeitsthread-Erstellung
    t[i][1] = threading.Thread(name='worker['+str(i)+']', target=workerThread,
                               args=(t[i],))
    t[i][1].setDaemon(True)

# Thread list of circulation
tc = [0 for i in range(NumOfThread)]  #Der Wert ist der nächste Thread Nr.
for i in range(NumOfThread):
    tc[i] = i+1
tc[i] = 0  # make a list of circulation

lastThread = i  #Zuletzt verwendeter Thread.Als nächstes kommt tc[lastThread]Verwenden Sie den zweiten Thread.

#Worker-Thread starten
for i in range(NumOfThread):
    t[i][1].start()

#Warten Sie nach dem Starten des Worker-Threads auf den Wartestatus
yetAllThread = True
while yetAllThread:
    for i in range(NumOfThread):
        if t[i][3] == ' ':
            break
        else:
            usleep(100)  #Das Überwachungsintervall ist 0.1 Millisekunde
    if i == NumOfThread-1:
        yetAllThread = False
    else:
        usleep(100)  #Das Überwachungsintervall ist 0.1 Millisekunde

#MongoDB-Trigger-Thread-Generierung
t_mongo = threading.Thread(name='t_mongo', target=TriggerMongo, args=(t,tc,lastThread,searchInterval,mongoHost,mongoPort,mongoDb,mongoCol,))
t_mongo.setDaemon(True)
t_mongo.start()  #Start

#Haupt-Bedroung
while True:
    time.sleep(mainSleepInterval)

jobWorker.py ist der Thread, der die eigentliche Verarbeitung durchführt. Dies ist ein Beispiel, das nur Papageien entsprechend der Art des zu sendenden Inhalts zurückgibt. Bitte beachten Sie, dass die Methode zur Aufnahme von MID (von) je nach opType unterschiedlich ist.

jobWorker.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Note of caution:
# - This program is one of the threads.
# - Must not do exit(),quit()
# - Please use only return()

# Settings of the LINE API Server
lineApiHost      = "trialbot-api_line_me"
accessIntervalMS = 100  # ms
getProfilesUrl   = "https://trialbot-api.line.me/v1/profiles"
postUrl          = "https://trialbot-api.line.me/v1/events"
getContentUrl    = "https://trialbot-api.line.me/v1/bot/message/$messageId/content"
header =  {
    "Content-Type"                : "application/json; charser=UTF-8",
    "X-Line-ChannelID"            : "xxxxxxxxxx",
    "X-Line-ChannelSecret"        : "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
    "X-Line-Trusted-User-With-ACL": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}
postBodyTemplate = {
    "to"          : [],
    "toChannel"   : 1383378250,
    "eventType"   : "138311608800106203",
    "content"     : {}
}

import threading
import time
import datetime
import json

usleep = lambda x: time.sleep(x/1000000.0)  #Mikrosekundenschlaf

#Sperre im Zusammenhang mit der gleichzeitigen Vermeidung des gleichzeitigen Zugriffs mehrerer JobWorker-Threads auf den LINE-API-Server
globalLock = {}            #Sperre für jedes Verbindungsziel
globalLastAccessTime = {}  #Letzte Zugriffszeit für jedes Verbindungsziel
loadTime = int(time.time()*1000)

def jobWorker(verbose, mynum, recvBody):
    global globalLock
    global globalLastAccessTime
    
    #Anfängliche Sperreinstellungen für jedes Verbindungsziel
    if not globalLock.has_key(lineApiHost):
        globalLock[lineApiHost] = threading.Lock()
    #Anfangseinstellung der letzten Zugriffszeit für jedes Verbindungsziel
    if not globalLastAccessTime.has_key(lineApiHost):
        globalLastAccessTime[lineApiHost] = loadTime
    
    if verbose:
        recvBody['_id'] = 'ObjectId("'+str(recvBody['_id'])+'")'
        print 'worker['+mynum+'] recvBody: '+str(int(time.time()*1000)-recvBody['acceptTime'])+' ms to here from accept'
        print recvBody
    
    opType = recvBody['content'].get('opType')
    
    # blocked from user
    if opType == 8:
        #MID des Blockbenutzers aus der Benutzerverwaltung(recvBody['content']['params'][0])Löschen
        print 'please delete user "'+recvBody['content']['params'][0]+'" from management data.'
        return
    
    #Kopieren Sie den Body-Teil von POST
    postBody = {}
    postBody['to'] = ''
    postBody['toChannel'] = postBodyTemplate['toChannel']
    postBody['eventType'] = postBodyTemplate['eventType']
    postBody['content'] = {}
    
    #Ziel der Nachrichtenantwort
    if opType==4:  # New user
        postBody['to'] = [ recvBody['content']['params'][0] ]
    else:
        postBody['to'] = [ recvBody['content']['from'] ]
    
    # New user
    if opType==4:
        #Benutzerprofil abrufen
        result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
        userProfile = json.loads(result.text)
        resText = 'Herzlich willkommen!'
        #Das Profil sollte der Benutzerverwaltungs-DB mit der MID des Benutzers hinzugefügt werden
        print 'please add '+userProfile['contacts'][0]['displayName']+'\'s profile to user management db.'
        print json.dumps(userProfile, sort_keys = True, indent=4)
    
    #Verarbeitung gemäß der Nachricht
    contentType = recvBody['content'].get('contentType')
    resText = ''
    if contentType == 1:  # Text
        resText = u'Ja,'+recvBody['content']['text']+u',nicht wahr.'
    elif contentType == 2:  # Image
        resText = u'Es ist ein Foto...'
    elif contentType == 3:  # Video
        resText = u'Es ist ein Video...'
    elif contentType == 4:  # Audio
        resText = u'Es ist eine Sprachnachricht...'
    elif contentType == 7:  # Location
        resText = u'Es sind Standortinformationen...'
        if verbose:
            print recvBody['content']['text'].encode('utf-8')
            print recvBody['content']['location']['title'].encode('utf-8')
            print recvBody['content']['location']['address'].encode('utf-8')
    elif contentType == 8:  # Sticker
        resText = u'Es ist eine Briefmarke'
        if verbose:
            print recvBody['content']['contentMetadata']['STKTXT'].encode('utf-8')
    elif contentType == 10: # Contact
        # Contact(contentType==10)Holen Sie sich das mittlere Profil von contentMetadata
        resText = recvBody['content']['contentMetadata']['displayName']+u'Es sind Ihre Kontaktinformationen'
        result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, recvBody['content']['contentMetadata']['mid'], accessIntervalMS)
        contactProfile = json.loads(result.text)
        if verbose:
            print '\ncontactProfile: ' + str(contactProfile)
    
    #Antwortnachricht senden
    if resText:
        #Benutzerprofil abrufen(Ursprünglich zum Zeitpunkt der Benutzerregistrierung in der DB registriert, nach Bedarf erworben)
        result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
        userProfile = json.loads(result.text)
        resText = userProfile['contacts'][0]['displayName'] + u'Herr.' + resText
        if verbose:
            print '\nprofile: ' + str(userProfile)
        
        #Ausgehende Nachricht ist Text(ContentType=1).Außerdem Bild,Video,Audio,Location,Sticker,multiple messages,Sie können umfangreiche Nachrichten senden
        postBody['content'] = {
            'contentType': 1,
            'toType'     : 1,
            'text'       : resText
        }
        if verbose:
            print '\nworker['+mynum+'] ' + postUrl
            print 'worker['+mynum+'] postHeader: ' + json.dumps(header, sort_keys = True, indent = 4)
            print 'worker['+mynum+'] postBody: ' + json.dumps(postBody, sort_keys = True, indent = 4)
        
        #Messege senden
        r = apiServer(verbose, mynum, 'post', lineApiHost, postUrl, header, postBody, accessIntervalMS)
    
    return


#Zugriff auf den LINE-API-Server
def apiServer(verbose, mynum, method, host, url, header, body, accessIntervalMS):
    import requests
    global globalLock
    global globalLastAccessTime
    
    globalLock[host].acquire()  # Lock
    
    #Wie lange sollte ich warten, wenn ich eine bestimmte Zeit habe, um auf den LINE API-Server zuzugreifen?
    currentTime = int(time.time()*1000)
    remain = accessIntervalMS - (currentTime - globalLastAccessTime[host])
    
    if verbose:
        print 'worker['+mynum+'] time since last access(ms): '+str(currentTime - globalLastAccessTime[host])
        print 'worker['+mynum+'] remain='+str(remain)+' ms'
    
    # wait accessIntervalMS from last access
    if remain > 0:
        usleep(remain*1000)
    
    if method=='get':
        if body:
            payload = { 'mids': body }
            r = requests.get(url, params=payload, headers=header)
        else:
            if verbose:
                print url, header
            r = requests.get(url, headers=header)
    else:
        r = requests.post(url, data=json.dumps(body), headers=header)
    
    if verbose and r.status_code!=200:
        print 'worker['+mynum+'] HTTP status code: ' + str(r.status_code)
        print 'worker['+mynum+'] response: ' + r.text
    
    globalLastAccessTime[host] = int(time.time()*1000)
    
    globalLock[host].release()  # release
    return r

Zusammenfassung

Ich denke, wir konnten das Grundgerüst von Dispatcher & jowWorker implementieren, einem leichten Warteschlangenmechanismus, der auch dann verwendet werden kann, wenn eine große Anzahl von Nachrichten eingeht. Im Ausgangszustand von 64-Bit-CentOS 7 liegt die Obergrenze für die Anzahl der Threads im gesamten System bei 30118, aber wenn es sich um ..5000 Threads handelt, schlägt die Generierung fehl. (... ich brauche nicht so viel) Ein solcher Mechanismus ist nicht nur für den BOT-Server erforderlich, sondern auch für die effiziente Zustellung einer großen E-Mail-Menge über mehrere SMTP-Server.

Wenn Sie die jobWorker-Seite zu einer anderen Sprachanwendung machen möchten, können Sie sie verwenden, indem Sie sie zu einem Mikrodienst machen oder sie ändern, um in einem anderen Prozess mit Pipe zu kommunizieren. Wenn Sie die Last mit diesem Mechanismus verteilen möchten, machen Sie MongoDB zu einem anderen Server oder Sharding oder bringen Sie "4.consurring message" oder höher auf einen anderen Computer. Wenn Sie mehr Job-Worker verteilen möchten, ist es besser, daraus einen Mikrodienst zu machen oder einen anderen Request-Broker-Mechanismus zu verwenden.

Recommended Posts

Implementierungsbeispiel des LINE BOT-Servers für den tatsächlichen Betrieb
Erstellen Sie einen API-Server, um den Betrieb der Front-Implementierung mit Python3 und Flask zu überprüfen
Implementierung von Scale-Space für SIFT
Benachrichtigen Sie LINE über Informationen zum Zugbetrieb
Implementierungsbeispiel für das Hostile Generation Network (GAN) von Keras [Für Anfänger]
Erstellen Sie mit Minette für Python einen LINE BOT
Formatieren Sie eine Zeile json, um die Anzeige zu vereinfachen
Code zum Überprüfen des Betriebs von Python Matplot lib
Implementierung eines Deep Learning-Modells zur Bilderkennung
Einfaches Implementierungsbeispiel für eine Art der Datenerweiterung