Heuschrecke ist gut, nicht wahr? Heuschrecke. Alle kürzlich durchgeführten Belastungstests wurden mit Heuschrecke durchgeführt. Es ist wirklich einfach, Python auch für komplizierte Szenarien zu verwenden.
In letzter Zeit nimmt jedoch die Anzahl der ständig aktiven Apps zu, und es scheint, dass es viele Situationen gibt, in denen ein einfaches Anforderungs- / Res-Modell nicht gut angewendet werden kann.
Daher habe ich versucht, die Locust-Aufgabe mit Websocket zu laden, damit sie über den Webbildschirm bestätigt werden kann.
Standardmäßig ist die Einheit ms, aber in der Socket-Kommunikation wird normalerweise 0 wiederholt, wenn es ms ist, sodass die Einheit μs ist. Wie Sie dem Code entnehmen können, habe ich gerade einen Socket in Task erstellt, die Heuschrecke über das Ergebnis informiert und benachrichtigt, sodass ich die Heuschrecke selbst überhaupt nicht berührt habe.
locust hat ein einfaches Design, und es ist gut, dies leicht tun zu können, da der Teil zum Erstellen einer verteilten Umgebung und der Teil zum Anfordern und Aggregieren der Ergebnisse getrennt sind.
locustfile.py
# -*- coding:utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import print_function
import json
import uuid
import time
import gevent
from websocket import create_connection
import six
from locust import HttpLocust, TaskSet, task
from locust.events import request_success
class ChatTaskSet(TaskSet):
def on_start(self):
self.user_id = six.text_type(uuid.uuid4())
ws = create_connection('ws://127.0.0.1:5000/chat')
self.ws = ws
def _receive():
while True:
res = ws.recv()
data = json.loads(res)
end_at = time.time()
response_time = int((end_at - data['start_at']) * 1000000)
request_success.fire(
request_type='WebSocket Recv',
name='test/ws/chat',
response_time=response_time,
response_length=len(res),
)
gevent.spawn(_receive)
@task
def sent(self):
start_at = time.time()
body = json.dumps({'message': 'hello, world', 'user_id': self.user_id, 'start_at': start_at})
self.ws.send(body)
request_success.fire(
request_type='WebSocket Sent',
name='test/ws/chat',
response_time=int((time.time() - start_at) * 1000000),
response_length=len(body),
)
class ChatLocust(HttpLocust):
task_set = ChatTaskSet
min_wait = 0
max_wait = 100
Der Punkt ist, das Ergebnis mit events.request_success.fire zu benachrichtigen und einen Gevent-Thread zum Empfangen zu starten. Wenn Sie den Empfang normalerweise in @task definieren, wird die Aufgabe während dieser Zeit angehalten, und andere Aufgaben in demselben TaskSet werden ebenfalls angehalten.
Der Beispiel-Echo & PubSub-Server mit der Last ist übrigens das folgende Skript.
server.py
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import unicode_literals
from __future__ import print_function
from collections import defaultdict
import json
from geventwebsocket.handler import WebSocketHandler
from gevent.pywsgi import WSGIServer
from flask import Flask, request
from werkzeug.exceptions import abort
app = Flask(__name__)
ctr = defaultdict(int)
@app.route('/echo')
def echo():
ws = request.environ['wsgi.websocket']
if not ws:
abort(400)
while True:
message = ws.receive()
if message is not None:
r = json.loads(message)
ctr[r['user_id']] += 1
ws.send(message)
@app.route('/report')
def report():
return '\n'.join(['{}:\t{}'.format(user_id, count) for user_id, count in ctr.items()])
socket_handlers = set()
@app.route('/chat')
def chat():
ws = request.environ['wsgi.websocket']
socket_handlers.add(ws)
while True:
message = ws.receive()
for socket_handler in socket_handlers:
try:
socket_handler.send(message)
except:
socket_handlers.remove(socket_handler)
if __name__ == '__main__':
http_server = WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
http_server.serve_forever()
Die Datei wird auch in das folgende Repository hochgeladen. https://gist.github.com/yamionp/9112dd6e54694d594306
Recommended Posts