Considérant un serveur LINE BOT qui peut être réellement exploité, un traitement asynchrone est requis comme écrit par yoichiro6642 à l'URL de référence suivante. URL de référence: Architecture de serveur LINE BOT qui est sûre même si un grand nombre de messages arrivent
J'ai écrit un serveur LINE BOT (squelette) selon ce qui précède dans le but de pouvoir supporter un grand nombre de messages dans un environnement de petite à moyenne échelle. Le dernier "Réduire le nombre d'appels API" (spécifier plusieurs MID dans la transmission des messages pour réduire le nombre d'appels PI) n'est pas implémenté. L'environnement utilisé est le suivant.
J'avais l'option Amazon API Gateway + Lambda + DynamoDB, mais je me demandais si Node.js + MongoDB + Python pouvait implémenter un Dispatcher & jobWorker léger avec moins de frais généraux.
Pour Queue, RabbitMQ, memcached, Redis, etc. ont été pris en compte, mais j'ai utilisé MongoDB pour les raisons suivantes.
J'ai spécifié la réplication et oplogSizeMB.
text:mongod.line_bot.conf
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod.line_bot.log
storage:
dbPath: /var/lib/mongo/line_bot
journal:
enabled: true
processManagement:
fork: false # fork and run in background
pidFilePath: /var/run/mongodb/mongod.line_bot.pid # location of pidfile
net:
port: 27017
bindIp: 127.0.0.1 # Listen to local interface only, comment to listen on all interfaces.
replication:
oplogSizeMB: 3072
Démarrez en mode maître.
$ mongod --master -f mongod.line_bot.conf
La collection est une collection plafonnée afin que vous n'ayez pas à vous soucier de la croissance de la capacité.
create_collection
#!/bin/bach -v
mongo --port=27017 <<EOF
use line_bot;
db.createCollection("recvq", {
capped: true,
size: 1048576000 // 1GB
});
EOF
BOT Server(Node.js) frontDesk.js reçoit un message de LINE Server et renvoie une réponse immédiate.
frontDesk.js
// Settings of the this program
var httpsPort = 443;
var allowPath = "/callback";
var httpsOpt = {
"caKey" : "/etc/letsencrypt/live/xxx/privkey.pem",
"caCert" : "/etc/letsencrypt/live/xxx/fullchain.pem",
"caCa" : "/etc/letsencrypt/live/xxx/chain.pem"
};
local = {};
local['channelSecret'] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
// Settings of the MongoDB
var mongoHost = "127.0.0.1";
var mongoPort = 27017;
var mongoDb = "line_bot";
var mongoCol = "recvq";
var express= require('express'),
bodyParser = require('body-parser'),
log4js = require('log4js'),
https = require('https'),
fs = require('fs'),
mongo = require('mongodb'),
path = require('path');
var accept = require(__dirname+'/accept');
var app = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({ extended: true }));
// MongoDB
var MongoClient = require('mongodb').MongoClient, assert = require('assert');
var mongoUrl = 'mongodb://'+mongoHost + ":" + mongoPort + "/" + mongoDb;
set_col(local, mongoUrl, function(rc, local, mongoUrl) {
if (!rc) {
console.log("set_col.rc:"+rc);
local.db.close();
process.exit(1);
}
console.log("Connected succesfully to "+mongoUrl);
});
// handle a request
app.post(allowPath, function(req, res, next) {
local['acceptTime'] = new Date().getTime(); // record accept time(ms)
// response ASAP
res.status(200).send("OK");
res.end();
accept.post_callback(req, res, next); // Handle the request
});
// server certificate authority
var httpsOpt = {
key: fs.readFileSync(httpsOpt.caKey),
cert: fs.readFileSync(httpsOpt.caCert),
ca: fs.readFileSync(httpsOpt.caCa)
};
// listen port
var httpsServer = https.createServer(httpsOpt, app);
httpsServer.listen(httpsPort, function() {
console.log('Listening on port '+httpsPort+'...');
}).on('error', function(err) {
if (err.errno === 'EADDRINUSE') {
console.log('This program is already running.');
} else {
console.log(err);
}
process.exit(1);
});
function set_col(local, url, callback) {
// Use connect method to connect to the MongoServer
MongoClient.connect(url, function(err, db) {
if (err) {
console.log("MongoDB connection error."); console.log(err);
process.exit(1);
}
local['db'] = db;
local.db.collection(mongoCol, function(err, collection) {
if (err) {
console.log("MongoDB collection error."); console.log(err);
process.exit(1);
}
local.db['collection'] = collection;
callback(true, local, url);
});
});
}
Après cela, utilisez accept.js pour vérifier la signature et l'enregistrer dans MongoDB.
accept.js
var crypto = require('crypto');
var assert = require('assert');
exports.post_callback = function(req, res) {
//Vérifier la signature
if ((! req.headers) || (! req.headers["x-line-channelsignature"])) {
console.log("400. Bad Request. The request does not have a x-line-channelsignature");
return;
}
//Vérifiez le résultat de la demande
if ((! req.body) ||
(! req.body['result'])) {
console.log("400. Bad Request. The request does not have result");
return;
}
var result_num = req.body.result.length;
//Chiffrement Sha256 du corps HTTP avec channelSecret,Demandez un condensé base64.
var body_str = new Buffer(JSON.stringify(req.body), 'utf8');
computedSignature = crypto.createHmac("sha256",local['channelSecret']).update(body_str).digest("base64");
//Comparez les signatures et confirmez la validité
if (req.headers["x-line-channelsignature"] != computedSignature) {
console.log("400. Bad Request. The x-line-channelsignature is wrong.");
return;
}
//Entrez l'heure de réception
for (var i=0; i<Object.keys(req.body.result).length; i++) {
req.body.result[i]['acceptTime'] = local['acceptTime'];
}
//Enregistrer le message dans MongoDB
local.db.collection.insertMany(req.body.result, function(err, r) {
assert.equal(null, err);
assert.equal(result_num, r.insertedCount);
toQueueTime = new Date().getTime() - local['acceptTime'];
console.log("necessary time to store to queue: "+toQueueTime+" ms");
return;
});
}
Dispatcher & jobWorker Implémenté en multithreading Python. Le thread jobWorker attend threading.Event () avec wait () lors de sa création. Le thread de déclenchement surveille oplog avec ts et commence le traitement lorsqu'il est ajouté à la file d'attente. Attribuez le contenu de la file d'attente de lecture à un thread jobWorker gratuit, définissez Event et laissez jobWorker démarrer le traitement.
Je suis conscient des références de thread et des mises à jour des listes et des variables, donc je ne les verrouille pas ... J'avais l'intention de le faire, mais lorsque j'ai accédé au serveur LINE API dans plusieurs threads, une erreur concernant le nombre de connexions simultanées s'est produite. Par conséquent, le verrou exclusif est utilisé dans acquiert () pour accéder au serveur API LINE à partir de jobWorker. Puisque le document ne décrit pas cette zone, je l'ai mis à 1 multiplex et intervalle d'accès de 100 ms. Je suis nouveau dans le multithreading Python, veuillez donc signaler toute erreur.
dispatcher.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Settings of the this program
NumOfThread = 20
searchInterval = 100000 # uSec
mainSleepInterval = 60 # Sec
# Settings of the MongoDB
mongoHost = "127.0.0.1";
mongoPort = 27017;
mongoDb = "line_bot";
mongoCol = "recvq";
import os,os.path
import sys
import threading
import time
import json
import pymongo
from pymongo.cursor import CursorType
from datetime import datetime
import datetime
import jobWorker
usleep = lambda x: time.sleep(x/1000000.0) #Micro secondes de sommeil
#####fil de travail
def workerThread(tt):
tno = str(tt[0])
while True:
tt[2].clear() #Effacer l'événement et attendre qu'Evant se produise
tt[3] = 'w'
tt[2].wait()
if verbose: #L'attente se termine. Commencer le traitement
print '\nworker['+tno+']: wake up'
#Appelez la fonction de traitement actuelle ici
jobWorker.jobWorker(verbose, tno, tt[4]['o'])
#####Thread de déclenchement MongoDB
def TriggerMongo(t, tchain, last, searchInterval, host, port, db, col):
dbCol = db + '.' + col
c = pymongo.MongoClient(host, port)
# Uncomment this for master/slave.
oplog = c.local.oplog['$main']
# Uncomment this for replica sets.
#oplog = c.local.oplog.rs
first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
ts = first['ts']
while True:
cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
while cursor.alive:
for doc in cursor:
#Régulièrement{h:0,ts:Timestamp(nn.., 1),o:{},v:2,ns:'',op:'n'}Retour
#Est op:'n'C'est juste une information. ignorer.
if doc['ns']==dbCol and doc['op']!='n':
#Trouver des fils gratuits
i = tchain[last]
while t[i][3] != 'w':
i = tchain[i]
if i == tchain[last]: #Si vous cherchez un tour
usleep(searchInterval)
t[i][4] = doc #Fil gratuit t[n][4]Stocker les données dans
t[i][3] = 'r'
t[i][2].set() # t[n]Traitement de l'instruction de démarrage
last = i
# Work with doc here
ts = doc['ts']
print "got out of a while corsor.alive loop"
#######################################################################
# Check of the parameter
verbose = False
if len(sys.argv)==2 and sys.argv[1]=='-v':
verbose = True
elif len(sys.argv)!=1:
print "Usage: %s [-v]" % (sys.argv[0],)
quit()
#création de données de gestion des threads de travail&création de thread de travail
# [ThreadNo, ThreadObj ,EvantObj, status,Données à transmettre au thread]
# (status ' ':en préparation, 'w':En attente / libre, 'r':Fonctionnement)
# :
t = [ [0 for i in range(5)] for i in range(NumOfThread)]
for i in range(NumOfThread):
t[i][0] = i # Thread No.
t[i][2] = threading.Event() #Génération d'objets Evant
t[i][3] = ' ' # is_running
#création de thread de travail
t[i][1] = threading.Thread(name='worker['+str(i)+']', target=workerThread,
args=(t[i],))
t[i][1].setDaemon(True)
# Thread list of circulation
tc = [0 for i in range(NumOfThread)] #La valeur est le fil suivant Non.
for i in range(NumOfThread):
tc[i] = i+1
tc[i] = 0 # make a list of circulation
lastThread = i #Dernier fil utilisé.Vient ensuite tc[lastThread]Utilisez le deuxième fil.
#début du thread de travail
for i in range(NumOfThread):
t[i][1].start()
#Attendre l'état d'attente après le démarrage du thread de travail
yetAllThread = True
while yetAllThread:
for i in range(NumOfThread):
if t[i][3] == ' ':
break
else:
usleep(100) #L'intervalle de surveillance est de 0.1 milliseconde
if i == NumOfThread-1:
yetAllThread = False
else:
usleep(100) #L'intervalle de surveillance est de 0.1 milliseconde
#Génération de threads de déclenchement MongoDB
t_mongo = threading.Thread(name='t_mongo', target=TriggerMongo, args=(t,tc,lastThread,searchInterval,mongoHost,mongoPort,mongoDb,mongoCol,))
t_mongo.setDaemon(True)
t_mongo.start() #début
#fil conducteur
while True:
time.sleep(mainSleepInterval)
jobWorker.py est le thread qui effectue le traitement réel. Il s'agit d'un échantillon qui ne renvoie les perroquets qu'en fonction du type de contenu à envoyer. Veuillez noter que la méthode de prise de MID (de) diffère selon l'opType.
jobWorker.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Note of caution:
# - This program is one of the threads.
# - Must not do exit(),quit()
# - Please use only return()
# Settings of the LINE API Server
lineApiHost = "trialbot-api_line_me"
accessIntervalMS = 100 # ms
getProfilesUrl = "https://trialbot-api.line.me/v1/profiles"
postUrl = "https://trialbot-api.line.me/v1/events"
getContentUrl = "https://trialbot-api.line.me/v1/bot/message/$messageId/content"
header = {
"Content-Type" : "application/json; charser=UTF-8",
"X-Line-ChannelID" : "xxxxxxxxxx",
"X-Line-ChannelSecret" : "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"X-Line-Trusted-User-With-ACL": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
}
postBodyTemplate = {
"to" : [],
"toChannel" : 1383378250,
"eventType" : "138311608800106203",
"content" : {}
}
import threading
import time
import datetime
import json
usleep = lambda x: time.sleep(x/1000000.0) #Micro secondes de sommeil
#Verrou lié pour éviter l'accès simultané au serveur API LINE à partir de plusieurs threads de travail
globalLock = {} #Verrouiller pour chaque destination de connexion
globalLastAccessTime = {} #Heure du dernier accès pour chaque destination de connexion
loadTime = int(time.time()*1000)
def jobWorker(verbose, mynum, recvBody):
global globalLock
global globalLastAccessTime
#Paramètres de verrouillage initiaux pour chaque destination de connexion
if not globalLock.has_key(lineApiHost):
globalLock[lineApiHost] = threading.Lock()
#Réglage initial de l'heure du dernier accès pour chaque destination de connexion
if not globalLastAccessTime.has_key(lineApiHost):
globalLastAccessTime[lineApiHost] = loadTime
if verbose:
recvBody['_id'] = 'ObjectId("'+str(recvBody['_id'])+'")'
print 'worker['+mynum+'] recvBody: '+str(int(time.time()*1000)-recvBody['acceptTime'])+' ms to here from accept'
print recvBody
opType = recvBody['content'].get('opType')
# blocked from user
if opType == 8:
#MID du bloc utilisateur de la gestion des utilisateurs(recvBody['content']['params'][0])Effacer
print 'please delete user "'+recvBody['content']['params'][0]+'" from management data.'
return
#Copier la partie corps de POST
postBody = {}
postBody['to'] = ''
postBody['toChannel'] = postBodyTemplate['toChannel']
postBody['eventType'] = postBodyTemplate['eventType']
postBody['content'] = {}
#Destination de la réponse au message
if opType==4: # New user
postBody['to'] = [ recvBody['content']['params'][0] ]
else:
postBody['to'] = [ recvBody['content']['from'] ]
# New user
if opType==4:
#Obtenir le profil utilisateur
result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
userProfile = json.loads(result.text)
resText = 'Bienvenue!'
#Le profil doit être ajouté à la base de données de gestion des utilisateurs avec le MID de l'utilisateur
print 'please add '+userProfile['contacts'][0]['displayName']+'\'s profile to user management db.'
print json.dumps(userProfile, sort_keys = True, indent=4)
#Traitement selon le message
contentType = recvBody['content'].get('contentType')
resText = ''
if contentType == 1: # Text
resText = u'Oui,'+recvBody['content']['text']+u',n'est-ce pas.'
elif contentType == 2: # Image
resText = u'C'est une photo...'
elif contentType == 3: # Video
resText = u'C'est une vidéo...'
elif contentType == 4: # Audio
resText = u'C'est un message vocal...'
elif contentType == 7: # Location
resText = u'C'est des informations de localisation...'
if verbose:
print recvBody['content']['text'].encode('utf-8')
print recvBody['content']['location']['title'].encode('utf-8')
print recvBody['content']['location']['address'].encode('utf-8')
elif contentType == 8: # Sticker
resText = u'C'est un timbre'
if verbose:
print recvBody['content']['contentMetadata']['STKTXT'].encode('utf-8')
elif contentType == 10: # Contact
# Contact(contentType==10)Obtenez le profil intermédiaire de contentMetadata
resText = recvBody['content']['contentMetadata']['displayName']+u'Ce sont vos coordonnées'
result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, recvBody['content']['contentMetadata']['mid'], accessIntervalMS)
contactProfile = json.loads(result.text)
if verbose:
print '\ncontactProfile: ' + str(contactProfile)
#Envoyer un message de réponse
if resText:
#Obtenir le profil utilisateur(Enregistré à l'origine dans la base de données au moment de l'enregistrement de l'utilisateur, acquis au besoin)
result = apiServer(verbose, mynum, 'get', lineApiHost, getProfilesUrl, header, postBody['to'][0], accessIntervalMS)
userProfile = json.loads(result.text)
resText = userProfile['contacts'][0]['displayName'] + u'M.' + resText
if verbose:
print '\nprofile: ' + str(userProfile)
#Le message sortant est un texte(ContentType=1).En outre, Image,Video,Audio,Location,Sticker,multiple messages,Vous pouvez envoyer des messages riches
postBody['content'] = {
'contentType': 1,
'toType' : 1,
'text' : resText
}
if verbose:
print '\nworker['+mynum+'] ' + postUrl
print 'worker['+mynum+'] postHeader: ' + json.dumps(header, sort_keys = True, indent = 4)
print 'worker['+mynum+'] postBody: ' + json.dumps(postBody, sort_keys = True, indent = 4)
#Envoyer un message
r = apiServer(verbose, mynum, 'post', lineApiHost, postUrl, header, postBody, accessIntervalMS)
return
#Accès au serveur de l'API LINE
def apiServer(verbose, mynum, method, host, url, header, body, accessIntervalMS):
import requests
global globalLock
global globalLastAccessTime
globalLock[host].acquire() # Lock
#Combien de temps dois-je attendre si j'ai un certain temps pour accéder au serveur de l'API LINE?
currentTime = int(time.time()*1000)
remain = accessIntervalMS - (currentTime - globalLastAccessTime[host])
if verbose:
print 'worker['+mynum+'] time since last access(ms): '+str(currentTime - globalLastAccessTime[host])
print 'worker['+mynum+'] remain='+str(remain)+' ms'
# wait accessIntervalMS from last access
if remain > 0:
usleep(remain*1000)
if method=='get':
if body:
payload = { 'mids': body }
r = requests.get(url, params=payload, headers=header)
else:
if verbose:
print url, header
r = requests.get(url, headers=header)
else:
r = requests.post(url, data=json.dumps(body), headers=header)
if verbose and r.status_code!=200:
print 'worker['+mynum+'] HTTP status code: ' + str(r.status_code)
print 'worker['+mynum+'] response: ' + r.text
globalLastAccessTime[host] = int(time.time()*1000)
globalLock[host].release() # release
return r
Je pense que nous avons pu implémenter le squelette de Dispatcher & jowWorker, un mécanisme de file d'attente léger qui peut être utilisé même lorsqu'un grand nombre de messages arrivent. Dans l'état initial de 64 bits CentOS 7, la limite supérieure du nombre de threads dans tout le système est 30118, mais s'il s'agit de ..5000 threads, la génération échouera. (... je n'ai pas besoin de tant que ça) Un tel mécanisme est nécessaire non seulement pour le serveur BOT, mais également pour livrer efficacement une grande quantité de courrier en utilisant plusieurs serveurs SMTP.
Si vous souhaitez faire du côté jobWorker une application de langue différente, vous pouvez l'utiliser en en faisant un micro service ou en le modifiant pour qu'il communique avec pipe dans un autre processus. Si vous voulez distribuer la charge avec ce mécanisme, faites de MongoDB un autre serveur ou Sharding, ou apportez "4. message récurrent" ou plus tard sur une autre machine. Si vous souhaitez répartir les travailleurs plus que cela, il est préférable d'en faire un micro-service ou d'utiliser un autre mécanisme de courtier de demandes.
Recommended Posts