In Anbetracht eines LINE BOT-Servers, der tatsächlich betrieben werden kann, ist eine asynchrone Verarbeitung erforderlich, wie von yoichiro6642 unter der folgenden Referenz-URL geschrieben. Referenz-URL: LINE BOT-Serverarchitektur, die auch dann sicher ist, wenn eine große Anzahl von Nachrichten eingeht
Ich habe einen LINE BOT-Server (Skelett) wie oben beschrieben geschrieben, um einer großen Anzahl von Nachrichten in einer kleinen bis mittleren Umgebung standhalten zu können. Das letzte "Reduzieren der Anzahl von API-Aufrufen" (Geben Sie mehrere MIDs bei der Nachrichtenübertragung an, um die Anzahl von PI-Aufrufen zu reduzieren) ist nicht implementiert. Die verwendete Umgebung ist wie folgt.
Ich hatte die Option von Amazon API Gateway + Lambda + DynamoDB, aber ich fragte mich, ob Node.js + MongoDB + Python einen leichten Dispatcher & JobWorker mit weniger Overhead implementieren könnte.
Für Queue wurden RabbitMQ, memcached, Redis usw. berücksichtigt, aber ich habe MongoDB aus den folgenden Gründen verwendet.
Ich habe über Replikation und oplogSizeMB angegeben.
text:mongod.line_bot.conf
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod.line_bot.log
storage:
dbPath: /var/lib/mongo/line_bot
journal:
enabled: true
processManagement:
fork: false # fork and run in background
pidFilePath: /var/run/mongodb/mongod.line_bot.pid # location of pidfile
net:
port: 27017
bindIp: 127.0.0.1 # Listen to local interface only, comment to listen on all interfaces.
replication:
oplogSizeMB: 3072
Starten Sie im Master-Modus.
$ mongod --master -f mongod.line_bot.conf
Die Sammlung ist eine begrenzte Sammlung, sodass Sie sich keine Gedanken über das Kapazitätswachstum machen müssen.
create_collection
#!/bin/bach -v
mongo --port=27017 <<EOF
use line_bot;
db.createCollection("recvq", {
capped: true,
size: 1048576000 // 1GB
});
EOF
BOT Server(Node.js) frontDesk.js empfängt eine Nachricht von LINE Server und gibt eine sofortige Antwort zurück.
frontDesk.js
// Settings of the this program
var httpsPort = 443;
var allowPath = "/callback";
var httpsOpt = {
"caKey" : "/etc/letsencrypt/live/xxx/privkey.pem",
"caCert" : "/etc/letsencrypt/live/xxx/fullchain.pem",
"caCa" : "/etc/letsencrypt/live/xxx/chain.pem"
};
local = {};
local['channelSecret'] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
// Settings of the MongoDB
var mongoHost = "127.0.0.1";
var mongoPort = 27017;
var mongoDb = "line_bot";
var mongoCol = "recvq";
var express= require('express'),
bodyParser = require('body-parser'),
log4js = require('log4js'),
https = require('https'),
fs = require('fs'),
mongo = require('mongodb'),
path = require('path');
var accept = require(__dirname+'/accept');
var app = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));
// MongoDB
var MongoClient = require('mongodb').MongoClient, assert = require('assert');
var mongoUrl = 'mongodb://'+mongoHost + ":" + mongoPort + "/" + mongoDb;
set_col(local, mongoUrl, function(rc, local, mongoUrl) {
if (!rc) {
console.log("set_col.rc:"+rc);
local.db.close();
process.exit(1);
}
console.log("Connected succesfully to "+mongoUrl);
});
// handle a request
app.post(allowPath, function(req, res, next) {
local['acceptTime'] = new Date().getTime(); // record accept time(ms)
// response ASAP
res.status(200).send("OK");
res.end();
accept.post_callback(req, res, next); // Handle the request
});
// server certificate authority
var httpsOpt = {
key: fs.readFileSync(httpsOpt.caKey),
cert: fs.readFileSync(httpsOpt.caCert),
ca: fs.readFileSync(httpsOpt.caCa)
};
// listen port
var httpsServer = https.createServer(httpsOpt, app);
httpsServer.listen(httpsPort, function() {
console.log('Listening on port '+httpsPort+'...');
}).on('error', function(err) {
if (err.errno === 'EADDRINUSE') {
console.log('This program is already running.');
} else {
console.log(err);
}
process.exit(1);
});
function set_col(local, url, callback) {
// Use connect method to connect to the MongoServer
MongoClient.connect(url, function(err, db) {
if (err) {
console.log("MongoDB connection error."); console.log(err);
process.exit(1);
}
local['db'] = db;
local.db.collection(mongoCol, function(err, collection) {
if (err) {
console.log("MongoDB collection error."); console.log(err);
process.exit(1);
}
local.db['collection'] = collection;
callback(true, local, url);
});
});
}
Verwenden Sie danach accept.js, um die Signatur zu überprüfen und in MongoDB zu registrieren.
accept.js
var crypto = require('crypto');
var assert = require('assert');
exports.post_callback = function(req, res) {
//Auf Unterschrift prüfen
if ((! req.headers) || (! req.headers["x-line-channelsignature"])) {
console.log("400. Bad Request. The request does not have a x-line-channelsignature");
return;
}
//Überprüfen Sie das Ergebnis der Anfrage
if ((! req.body) ||
(! req.body['result'])) {
console.log("400. Bad Request. The request does not have result");
return;
}
var result_num = req.body.result.length;
//Sha256-Verschlüsselung des HTTP-Körpers mit channelSecret,Fragen Sie nach einem Base64-Digest.
var body_str = new Buffer(JSON.stringify(req.body), 'utf8');
computedSignature = crypto.createHmac("sha256",local['channelSecret']).update(body_str).digest("base64");
//Vergleichen Sie die Signaturen und bestätigen Sie die Gültigkeit
if (req.headers["x-line-channelsignature"] != computedSignature) {
console.log("400. Bad Request. The x-line-channelsignature is wrong.");
return;
}
//Geben Sie die empfangene Zeit ein
for (var i=0; i<Object.keys(req.body.result).length; i++) {
req.body.result[i]['acceptTime'] = local['acceptTime'];
}
//Nachricht in MongoDB registrieren
local.db.collection.insertMany(req.body.result, function(err, r) {
assert.equal(null, err);
assert.equal(result_num, r.insertedCount);
toQueueTime = new Date().getTime() - local['acceptTime'];
console.log("necessary time to store to queue: "+toQueueTime+" ms");
return;
});
}
Dispatcher & jobWorker Implementiert in Python Multithreading. Der jobWorker-Thread wartet beim Erstellen auf threading.Event () mit wait (). Der Trigger-Thread überwacht oplog mit ts und beginnt mit der Verarbeitung, wenn es der Warteschlange hinzugefügt wird. Ordnen Sie den Inhalt der Lesewarteschlange einem freien jobWorker-Thread zu, setzen Sie Event und lassen Sie jobWorker mit der Verarbeitung beginnen.
Mir sind Thread-Referenzen und Aktualisierungen von Listen und Variablen bekannt, daher sperre ich sie nicht ... Ich hatte vor, dies zu tun, aber als ich in mehreren Threads auf den LINE-API-Server zugegriffen habe, ist ein Fehler bezüglich der Anzahl gleichzeitiger Verbindungen aufgetreten. Daher wird die exklusive Sperre in purchase () verwendet, um von jobWorker aus auf den LINE-API-Server zuzugreifen. Da das Dokument diesen Bereich nicht beschreibt, habe ich ihn auf 1 Multiplex und ein Zugriffsintervall von 100 ms eingestellt. Ich bin neu in Python Multithreading. Bitte weisen Sie auf Fehler hin.
dispatcher.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Settings of the this program
NumOfThread = 20
searchInterval = 100000 # uSec
mainSleepInterval = 60 # Sec
# Settings of the MongoDB
mongoHost = "127.0.0.1";
mongoPort = 27017;
mongoDb = "line_bot";
mongoCol = "recvq";
import os,os.path
import sys
import threading
import time
import json
import pymongo
from pymongo.cursor import CursorType
from datetime import datetime
import datetime
import jobWorker
usleep = lambda x: time.sleep(x/1000000.0) #Mikrosekundenschlaf
#####Arbeitsthread
def workerThread(tt):
tno = str(tt[0])
while True:
tt[2].clear() #Ereignis löschen und warten, bis Evant auftritt
tt[3] = 'w'
tt[2].wait()
if verbose: #Das Warten endet. Starten Sie die Verarbeitung
print '\nworker['+tno+']: wake up'
#Rufen Sie hier die eigentliche Verarbeitungsfunktion auf
jobWorker.jobWorker(verbose, tno, tt[4]['o'])
#####MongoDB-Trigger-Thread
def TriggerMongo(t, tchain, last, searchInterval, host, port, db, col):
dbCol = db + '.' + col
c = pymongo.MongoClient(host, port)
# Uncomment this for master/slave.
oplog = c.local.oplog['$main']
# Uncomment this for replica sets.
#oplog = c.local.oplog.rs
first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
ts = first['ts']
while True:
cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
while cursor.alive:
for doc in cursor:
#Regelmäßig{h:0,ts:Timestamp(nn.., 1),o:{},v:2,ns:'',op:'n'}Kehrt zurück
#Ist op:'n'Ist nur Information. ignorieren.
if doc['ns']==dbCol and doc['op']!='n':
#Finde freie Threads
i = tchain[last]
while t[i][3] != 'w':
i = tchain[i]
if i == tchain[last]: #Wenn Sie nach einer Runde suchen
usleep(searchInterval)
t[i][4] = doc #Freier Thread t[n][4]Daten speichern in
t[i][3] = 'r'
t[i][2].set() # t[n]Startanweisung verarbeiten
last = i
# Work with doc here
ts = doc['ts']
print "got out of a while corsor.alive loop"
#######################################################################
# Check of the parameter
verbose = False
if len(sys.argv)==2 and sys.argv[1]=='-v':
verbose = True
elif len(sys.argv)!=1:
print "Usage: %s [-v]" % (sys.argv[0],)
quit()
#Erstellung von Worker-Thread-Verwaltungsdaten&Arbeitsthread-Erstellung
# [ThreadNo, ThreadObj ,EvantObj, status,Daten, die an den Thread übergeben werden sollen]
# (status ' ':in Vorbereitung, 'w':Warten / frei, 'r':Laufen)
# :
t = [ [0 for i in range(5)] for i in range(NumOfThread)]
for i in range(NumOfThread):
t[i][0] = i # Thread No.
t[i][2] = threading.Event() #Evant Objektgenerierung
t[i][3] = ' ' # is_running
#Arbeitsthread-Erstellung
t[i][1] = threading.Thread(name='worker['+str(i)+']', target=workerThread,
args=(t[i],))
t[i][1].setDaemon(True)
# Thread list of circulation
tc = [0 for i in range(NumOfThread)] #Der Wert ist der nächste Thread Nr.
for i in range(NumOfThread):
tc[i] = i+1
tc[i] = 0 # make a list of circulation
lastThread = i #Zuletzt verwendeter Thread.Als nächstes kommt tc[lastThread]Verwenden Sie den zweiten Thread.
#Worker-Thread starten
for i in range(NumOfThread):
t[i][1].start()
#Warten Sie nach dem Starten des Worker-Threads auf den Wartestatus
yetAllThread = True
while yetAllThread:
for i in range(NumOfThread):
if t[i][3] == ' ':
break
else:
usleep(100) #Das Überwachungsintervall ist 0.1 Millisekunde
if i == NumOfThread-1:
yetAllThread = False
else:
usleep(100) #Das Überwachungsintervall ist 0.1 Millisekunde
#MongoDB-Trigger-Thread-Generierung
t_mongo = threading.Thread(name='t_mongo', target=TriggerMongo, args=(t,tc,lastThread,searchInterval,mongoHost,mongoPort,mongoDb,mongoCol,))
t_mongo.setDaemon(True)
t_mongo.start() #Start
#Haupt-Bedroung
while True:
time.sleep(mainSleepInterval)
jobWorker.py ist der Thread, der die eigentliche Verarbeitung durchführt. Dies ist ein Beispiel, das nur Papageien entsprechend der Art des zu sendenden Inhalts zurückgibt. Bitte beachten Sie, dass die Methode zur Aufnahme von MID (von) je nach opType unterschiedlich ist.
jobWorker.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Note of caution:
# - This program is one of the threads.
# - Must not do exit(),quit()
# - Please use only return()
# Settings of the LINE API Server
lineApiHost = "trialbot-api_line_me"
accessIntervalMS = 100 # ms
getProfilesUrl = "https://trialbot-api.line.me/v1/profiles"
postUrl = "https://trialbot-api.line.me/v1/events"
getContentUrl = "https://trialbot-api.line.me/v1/bot/message/$messageId/content"
header = {
"Content-Type" : "application/json; charser=UTF-8",
"X-Line-ChannelID" : "xxxxxxxxxx",
"X-Line-ChannelSecret" : "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"X-Line-Trusted-User-With-ACL": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}
postBodyTemplate = {
"to" : [],
"toChannel" : 1383378250,
"eventType" : "138311608800106203",
"content" : {}
}
import threading
import time
import datetime
import json
usleep = lambda x: time.sleep(x/1000000.0) #Mikrosekundenschlaf
#Sperre im Zusammenhang mit der gleichzeitigen Vermeidung des gleichzeitigen Zugriffs mehrerer JobWorker-Threads auf den LINE-API-Server
globalLock = {} #Sperre für jedes Verbindungsziel
globalLastAccessTime = {} #Letzte Zugriffszeit für jedes Verbindungsziel
loadTime = int(time.time()*1000)
def jobWorker(verbose, mynum, recvBody):
global globalLock
global globalLastAccessTime
#Anfängliche Sperreinstellungen für jedes Verbindungsziel
if not globalLock.has_key(lineApiHost):
globalLock[lineApiHost] = threading.Lock()
#Anfangseinstellung der letzten Zugriffszeit für jedes Verbindungsziel
if not globalLastAccessTime.has_key(lineApiHost):
globalLastAccessTime[lineApiHost] = loadTime
if verbose:
recvBody['_id'] = 'ObjectId("'+str(recvBody['_id'])+'")'
print 'worker['+mynum+'] recvBody: '+str(int(time.time()*1000)-recvBody['acceptTime'])+' ms to here from accept'
print recvBody
opType = recvBody['content'].get('opType')
# blocked from user
if opType == 8:
#MID des Blockbenutzers aus der Benutzerverwaltung(recvBody['content']['params'][0])Löschen
print 'please delete user "'+recvBody['content']['params'][0]+'" from management data.'
return
#Kopieren Sie den Body-Teil von POST
postBody = {}
postBody['to'] = ''
postBody['toChannel'] = postBodyTemplate['toChannel']
postBody['eventType'] = postBodyTemplate['eventType']
postBody['content'] = {}
#Ziel der Nachrichtenantwort
if opType==4: # New user
postBody['to'] = [ recvBody['content']['params'][0] ]
else:
postBody['to'] = [ recvBody['content']['from'] ]
# New user
if opType==4:
#Benutzerprofil abrufen
result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
userProfile = json.loads(result.text)
resText = 'Herzlich willkommen!'
#Das Profil sollte der Benutzerverwaltungs-DB mit der MID des Benutzers hinzugefügt werden
print 'please add '+userProfile['contacts'][0]['displayName']+'\'s profile to user management db.'
print json.dumps(userProfile, sort_keys = True, indent=4)
#Verarbeitung gemäß der Nachricht
contentType = recvBody['content'].get('contentType')
resText = ''
if contentType == 1: # Text
resText = u'Ja,'+recvBody['content']['text']+u',nicht wahr.'
elif contentType == 2: # Image
resText = u'Es ist ein Foto...'
elif contentType == 3: # Video
resText = u'Es ist ein Video...'
elif contentType == 4: # Audio
resText = u'Es ist eine Sprachnachricht...'
elif contentType == 7: # Location
resText = u'Es sind Standortinformationen...'
if verbose:
print recvBody['content']['text'].encode('utf-8')
print recvBody['content']['location']['title'].encode('utf-8')
print recvBody['content']['location']['address'].encode('utf-8')
elif contentType == 8: # Sticker
resText = u'Es ist eine Briefmarke'
if verbose:
print recvBody['content']['contentMetadata']['STKTXT'].encode('utf-8')
elif contentType == 10: # Contact
# Contact(contentType==10)Holen Sie sich das mittlere Profil von contentMetadata
resText = recvBody['content']['contentMetadata']['displayName']+u'Es sind Ihre Kontaktinformationen'
result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, recvBody['content']['contentMetadata']['mid'], accessIntervalMS)
contactProfile = json.loads(result.text)
if verbose:
print '\ncontactProfile: ' + str(contactProfile)
#Antwortnachricht senden
if resText:
#Benutzerprofil abrufen(Ursprünglich zum Zeitpunkt der Benutzerregistrierung in der DB registriert, nach Bedarf erworben)
result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
userProfile = json.loads(result.text)
resText = userProfile['contacts'][0]['displayName'] + u'Herr.' + resText
if verbose:
print '\nprofile: ' + str(userProfile)
#Ausgehende Nachricht ist Text(ContentType=1).Außerdem Bild,Video,Audio,Location,Sticker,multiple messages,Sie können umfangreiche Nachrichten senden
postBody['content'] = {
'contentType': 1,
'toType' : 1,
'text' : resText
}
if verbose:
print '\nworker['+mynum+'] ' + postUrl
print 'worker['+mynum+'] postHeader: ' + json.dumps(header, sort_keys = True, indent = 4)
print 'worker['+mynum+'] postBody: ' + json.dumps(postBody, sort_keys = True, indent = 4)
#Messege senden
r = apiServer(verbose, mynum, 'post', lineApiHost, postUrl, header, postBody, accessIntervalMS)
return
#Zugriff auf den LINE-API-Server
def apiServer(verbose, mynum, method, host, url, header, body, accessIntervalMS):
import requests
global globalLock
global globalLastAccessTime
globalLock[host].acquire() # Lock
#Wie lange sollte ich warten, wenn ich eine bestimmte Zeit habe, um auf den LINE API-Server zuzugreifen?
currentTime = int(time.time()*1000)
remain = accessIntervalMS - (currentTime - globalLastAccessTime[host])
if verbose:
print 'worker['+mynum+'] time since last access(ms): '+str(currentTime - globalLastAccessTime[host])
print 'worker['+mynum+'] remain='+str(remain)+' ms'
# wait accessIntervalMS from last access
if remain > 0:
usleep(remain*1000)
if method=='get':
if body:
payload = { 'mids': body }
r = requests.get(url, params=payload, headers=header)
else:
if verbose:
print url, header
r = requests.get(url, headers=header)
else:
r = requests.post(url, data=json.dumps(body), headers=header)
if verbose and r.status_code!=200:
print 'worker['+mynum+'] HTTP status code: ' + str(r.status_code)
print 'worker['+mynum+'] response: ' + r.text
globalLastAccessTime[host] = int(time.time()*1000)
globalLock[host].release() # release
return r
Ich denke, wir konnten das Grundgerüst von Dispatcher & jowWorker implementieren, einem leichten Warteschlangenmechanismus, der auch dann verwendet werden kann, wenn eine große Anzahl von Nachrichten eingeht. Im Ausgangszustand von 64-Bit-CentOS 7 liegt die Obergrenze für die Anzahl der Threads im gesamten System bei 30118, aber wenn es sich um ..5000 Threads handelt, schlägt die Generierung fehl. (... ich brauche nicht so viel) Ein solcher Mechanismus ist nicht nur für den BOT-Server erforderlich, sondern auch für die effiziente Zustellung einer großen E-Mail-Menge über mehrere SMTP-Server.
Wenn Sie die jobWorker-Seite zu einer anderen Sprachanwendung machen möchten, können Sie sie verwenden, indem Sie sie zu einem Mikrodienst machen oder sie ändern, um in einem anderen Prozess mit Pipe zu kommunizieren. Wenn Sie die Last mit diesem Mechanismus verteilen möchten, machen Sie MongoDB zu einem anderen Server oder Sharding oder bringen Sie "4.consurring message" oder höher auf einen anderen Computer. Wenn Sie mehr Job-Worker verteilen möchten, ist es besser, daraus einen Mikrodienst zu machen oder einen anderen Request-Broker-Mechanismus zu verwenden.
Recommended Posts