Erinnerst du dich an ** Apsetone Deb **? Ich glaube nicht, dass mein Job ohne diese Existenz geboren worden wäre. [OSI-Referenzmodell](http://dic.nicovideo.jp/a/%E3%82%A2%E3%83%97%E3%82%BB%E3%83%88%E3%83%8D%E3 Wie man sich erinnert% 83% 87% E3% 83% 96).
Unter den OSI-Referenzmodellen handelt es sich um die Implementierung eines Socket-Servers, der die Socket-Kommunikation in der Transportschicht durch vollständige Nutzung der Parallelverarbeitung ausführt. Dieser Artikel ist eine Python-Neufassung dessen, was ich in diesem Frühjahr von einem großartigen Ingenieur über C # -Tasks und async / await gelernt habe. "Mastering TCP / IP", das in einer Ecke des Raums mit Staub bedeckt war, wurde in einer Hand installiert.
--Parallele nicht blockierende Verarbeitung kann von Ihnen selbst geschrieben werden. --Gevent und async / await können die Verarbeitung mit Lightweight-Threads implementieren. --C10k Sie werden das Problem ein wenig verstehen können.
Ich habe den Artikel Multithread / Prozesszusammenfassung (Ruby Edition) gelesen und den Code mit der Absicht geschrieben, ihn zu überprüfen. Es ist gut organisiert. Wenn Sie also die Bedeutung der angezeigten Wörter nicht verstehen, sollten Sie es lesen. (Ich schrieb, während ich mich selbst las.)
Typische Protokolle, die die Transportschichtfunktion in TCP / IP ausführen, sind "TCP" und "UDP". Bei der Kommunikation über TCP oder UDP wird häufig die als Socket bezeichnete Betriebssystem-API verwendet. Dieses Mal werden wir den Socket-Server und den Client implementieren. Dabei werden wir die gevent-Version und die asynchrone / await-Version von Lightweight-Threads verwenden.
Es ist eine Erweiterung des Echoservers. Es enthält die Spezifikationen der Trennungserkennungsfunktion und die Funktion zum Echo der vorherigen und aktuellen Minuten.
■ Bedienung des fertigen Produkts gif
Nach dem Öffnen der Kommunikation sendet und empfängt sie insgesamt 10 Mal pro Sekunde zum und vom Socket-Server und trennt die Verbindung. 100 Clients, die eine Socket-Kommunikation durchführen, werden gleichzeitig gestartet und verarbeiten 100 Schleifen.
Client-Ausführungsimage
Serververbindung
Senden:Hello World!:0
Erhalten:Hello World!:0
Senden:Hello World!:1
Erhalten:Hello World!:0 Hello World!:1
....
Senden:Hello World!:8
Erhalten:Hello World!:7 Hello World!:8
Senden:Hello World!:9
Erhalten:Hello World!:8 Hello World!:9
Trennen Sie die Verbindung zum Server
Gemäß den Spezifikationen aktiviert der Client gleichzeitig 100 Socket-Kommunikationen, kommuniziert insgesamt 10 Mal alle 1 Sekunde und trennt dann die Verbindung. Wenn Sie den Socket-Server in Seriennummer schreiben, ist der Durchsatz extrem schlecht, da die nächste Verbindung erst gestartet werden kann, wenn die erste Verbindung hergestellt und getrennt wurde.
py35_tcp_non_parallel_server.py
# -*- coding: utf-8 -*-
import socket
#Erstellen Sie einen INET STREAM-Socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#Verbinden Sie den Socket mit dem bekannten Port des öffentlichen Hosts
server.bind(('127.0.0.1', 1234))
#Werden Sie ein Server-Socket
server.listen(10000)
while True:
client_socket, address = server.accept()
while True:
#Erhalten
fp = client_socket.makefile()
line = fp.readline()
print(line, type(line))
if not line:
#Schleifenende, wenn nicht verbunden
break
#Antwort
client_socket.send(line.encode('utf-8', 'ignore'))
print('send:{}'.format(line))
■ Ausführungsergebnis Es ist sehr langsam, weil es jede Kommunikation der Reihe nach verarbeitet.
In gevent sind wait_read
und wait_write
vorhanden, sodass der Verbindungs-Timeout-Prozess implementiert werden kann. Praktisch. Die gevent-Version und die async / await-Version sind so geschrieben, dass sie kompatibel sind.
Der Punkt ist, dass gevent.monkey.patch_socket ()
sock.accept () in nicht blockierende Verarbeitung ändert und
gevent.spawn` die Serververarbeitung in einen kompakten Thread wirft. Punkt
gevent_tcp_server_by_monkey_patch.py
# -*- coding: utf-8 -*-
from gevent.pool import Group
import gevent
import socket
import gevent.monkey
# gevent.monkey.patch_all()
gevent.monkey.patch_socket()
def task_stream(client_socket):
_prev_message = ""
while True:
#Erhalten
fp = client_socket.makefile()
line = fp.readline()
if not line:
#Schleifenende, wenn nicht verbunden
break
#Antwort
s = _prev_message + line
client_socket.send(s.encode('utf-8', 'ignore'))
print('send:{}'.format(s))
_prev_message = line
def gen_server(host, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((host, port))
sock.listen(10000)
name = 1
while True:
conn, addr = sock.accept()
gevent.spawn(worker, str(name), conn, addr)
name += 1
def worker(name, sock, addr):
print('new connection!')
task = gevent.spawn(task_stream(sock))
group.add(task)
def main():
host, port = '127.0.0.1', 1234
server = gevent.spawn(gen_server, host, port)
group.add(server)
group.join()
server.kill()
group = Group()
main()
Der Punkt ist, dass es nicht blockierend wartet, bis die Kommunikation mit "wait_read" kommt und der Punkt, an dem die Timeout-Verarbeitung hinzugefügt wird.
gevent_tcp_server_by_stream_server.py
# -*- coding: utf-8 -*-
from gevent.pool import Pool
from gevent.server import StreamServer
import socket as py_socket
from gevent.socket import wait_read, wait_write
class TooLong(Exception):
pass
def handle(socket, address):
print('new connection!')
fp = socket.makefile()
try:
_prev_message = ""
while True:
wait_read(socket.fileno(), timeout=5, timeout_exc=TooLong)
line = fp.readline()
if line:
#Generieren Sie eine Antwortzeichenfolge mit der zuletzt empfangenen
s = _prev_message + line
#Senden
socket.send(s.encode('utf-8', 'ignore'))
fp.flush()
print('send:{}'.format(_prev_message + line))
_prev_message = line
except TooLong:
print('timeout')
#Trennen Sie die Verbindung, wenn eine Zeitüberschreitung auftritt
socket.shutdown(py_socket.SHUT_RDWR)
socket.close()
pool = Pool(10000) # do not accept more than 10000 connections
server = StreamServer(('127.0.0.1', 1234), handle, spawn=pool)
server.serve_forever()
gevent_tcp_client.py
# -*- coding: utf-8 -*-
import gevent
from gevent import socket
from functools import wraps
import time
HOST = '127.0.0.1'
PORT = 1234
def client():
conn = socket.create_connection((HOST, PORT))
for x in range(10):
message = 'Hello World!:{}\n'.format(x)
#Senden
conn.send(message.encode('utf-8', 'ignore'))
recv_message = conn.recv(3000)
print('recv:' + recv_message.decode('utf-8'))
#Warten Sie 1 Sekunde
gevent.sleep(1)
conn.close()
def main():
for x in range(100):
jobs = [gevent.spawn(client) for x in range(100)]
gevent.joinall(jobs)
main()
In "async def task_echo (Leser, Schreiber)" sind Leser und Schreiber die Argumente, aber es ist unangenehm, aber im Python3-Dokument [es wurde in dem Beispiel geschrieben, das sich mit Socket-Kommunikation befasst](http: // docs) .python.jp / 3 / library / asyncio-stream.html # tcp-echo-server-using-stream), also habe ich es so implementiert, wie es ist. Dies liegt wahrscheinlich daran, dass es in "StreamReader" eingeschlossen ist, aber etwas ist unangenehm. Wenn es eine Klasse gibt, die eine nicht blockierende Socket-Kommunikation durchführen kann, kann sie aus "StreamReader" umgeschrieben werden, sodass die Möglichkeit besteht, dass sie in Zukunft geändert wird.
py35_tcp_server.py
# -*- coding: utf-8 -*-
import asyncio
HOST = '127.0.0.1'
PORT = 1234
async def task_echo(reader, writer):
print('new connection!')
_prev_message = ''
while True:
line = await reader.readline()
if line == b'':
#Trennungserkennung Wenn die Verbindung getrennt ist, lesen Sie aus irgendeinem Grund.readline()Ist B''Verlassen Sie die Schleife, um zurückzukehren
break
if line:
writer.write(line)
await writer.drain()
print('send:{}'.format(_prev_message + line.decode()))
_prev_message = line.decode()
#Schließen Sie die Socket-Kommunikation, wenn eine Unterbrechung festgestellt wird
print("Close the client socket")
await writer.drain()
writer.close()
def main():
loop = asyncio.get_event_loop()
coro = asyncio.start_server(task_echo, HOST, PORT, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until CTRL+c is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
main()
py35_tcp_client.py
# -*- coding: utf-8 -*-
import asyncio
HOST = '127.0.0.1'
PORT = 1234
async def tcp_echo_client():
reader, writer = await asyncio.open_connection(HOST, PORT)
for x in range(10):
message = 'Hello World!:{}\n'.format(str(x))
#Senden
writer.write(message.encode())
#Erhalten
data = (await reader.read(100)).decode()
print('recv:{}'.format(data))
#Warten Sie 1 Sekunde
await asyncio.sleep(1)
writer.close()
return "1"
def main():
loop = asyncio.get_event_loop()
for x in range(100):
tasks = asyncio.wait([tcp_echo_client() for x in range(100)])
loop.run_until_complete(tasks)
loop.close()
main()
Ich konnte es wegen mangelnder Fähigkeiten nicht zusammenstellen
Als Socket-Server hat die obige Codegruppe ein schwerwiegendes Problem. Es kann nur 1 CPU verarbeiten. Python, das aufgrund der Einschränkung von GIL (Global Interpreter Lock) nur eine CPU verarbeiten kann, ist etwas enttäuschend. (Es wird seit mehr als 5 Jahren gesagt und es hat sich noch nicht verbessert, daher ist dies wahrscheinlich die Grenze.)
Wenn Sie subjektiv sprechen, wollten Sie unbedingt Parallelverarbeitung mit Lightweight-Threads schreiben. Auch wenn der Benutzer sich dessen nicht bewusst ist, wird sie parallel zu einem Multiprozessor und parallel zu ultraschnellem C # verarbeitet Es war ein leichter Thread wie Erlang, der frei kommunizieren konnte und der Benutzer die Anzahl der CPUs überhaupt nicht erkannte.
Die Multiprozessor-Unterstützung von Python kann nur auf um GIL-Probleme zu vermeiden verzweigt oder mehrmals mit einem Daemon gestartet werden.
Wenn die Server-Skala jedoch 10 oder 100 erreicht, tritt das gleiche Phänomen in anderen Sprachen auf. Ich kenne keine praktische Sprache, die als logische Einzelmaschine zwischen verschiedenen Servern ausgeführt wird. Python sollte so konzipiert sein, dass es in einem einzigen Prozess ausgeführt wird, und die Daemons sollten so viele wie die Anzahl der CPUs ausführen. Das Backend ist zwangsläufig so konzipiert, dass Nachrichtenwarteschlangendienste oder -speicher verwendet werden, sodass es problemlos auf mehrere Server skaliert werden kann.
Als Nebeneffekt wird das Design natürlich kompliziert. Wie ist es als Implementierung des Chat-Dienstes richtig, eine Push-Benachrichtigung zum Zeitpunkt der Veröffentlichung an aktive Benutzer im selben Chatroom zu entwerfen, die auf verteilte Weise mit mehreren Servern verbunden sind?
Wenn Sie mit leichten Threads umgehen können, können Sie effiziente Server schreiben, die mit der Außenwelt kommunizieren. Indem die Verarbeitung asynchron auf die Kommunikation wartet, wird es möglich, die CPU parallel anderen Arbeiten zuzuweisen, ohne tatsächlich auf die Wartezeit zu warten.
Leichte Threads, die nur von der externen Bibliothek gevent (Greenlet) verarbeitet werden konnten, können jetzt von reinem Python mit der Implementierung von asyncio in Python 3.4 verarbeitet werden. Darüber hinaus wurde Python 3.5 async / await implementiert, um leichtgewichtige Threads mit einer einfacheren Beschreibung zu verarbeiten.
Als ich die asynchrone / warte-Version schrieb, dachte ich immer wieder darüber nach, wie man eine blockierende E / A zu einer nicht blockierenden E / A macht. Nach dem Schreiben der asynchronen / wartenden Version finde ich das Designkonzept, das IO implizit in nicht blockierende Verarbeitung durch Berühren von gevent konvertiert, wunderbar.
Auszug aus dem Gevent-Tutorial
Die wahre Stärke von gevent ist die kollaborative Planung von Netzwerk- und E / A-gebundenen Funktionen.
Bei Verwendung zur Ausführung. gevent kümmert sich so weit wie möglich um alle Details und Netzwerke
Bewirkt, dass die Bibliothek den Greenlet-Kontext implizit überträgt.
Es gibt viele nützliche Funktionen und Klassen in gevent, die den Juckreiz erreichen können, und ich denke, dass gevent vorerst verwendet wird.
Verwenden Sie in der Praxis unbedingt das TCP- oder UDP-Protokoll, da Raw-Sockets beim Senden großer Datenmengen nicht in Teilen ankommen und die Reihenfolge der Ankunft nicht garantiert ist und die Erkennung der Trennung nicht gut durchgeführt werden kann.
Wenn Sie nach Rücksprache mit einem erstklassigen Techniker im Unternehmen über Lightweight-Threads einen Echoserver schreiben, der auf die vorherigen und aktuellen Empfänge in "node.js" reagiert, * wenn Sie ihn normal schreiben, wird es in der Callback-Hölle sehr schwierig *. Mir wurde jedoch geraten, dass es eine gute Erfahrung sein würde, es einmal zu schreiben, deshalb würde ich es gerne eines Tages versuchen.
Die Timeout-Klasse, die die Ausführungszeit des Gevent-Codeblocks einschränken kann, scheint nützlich zu sein, daher möchte ich sie in reinem Python erneut implementieren.
gevent Tutorial-Timeouts
import gevent
from gevent import Timeout
time_to_wait = 5 # seconds
class TooLong(Exception):
pass
with Timeout(time_to_wait, TooLong):
gevent.sleep(10)
[TCP / IP beherrschen](http://www.amazon.co.jp/%E3%83%9E%E3%82%B9%E3%82%BF%E3%83%AA%E3%83%B3% E3% 82% B0 TCP-IP-% E5% 85% A5% E9% 96% 80% E7% B7% A8-% E7% AC% AC5% E7% 89% 88-% E7% AB% B9% E4% B8 % 8B / dp / 4274068765) gevent tutorial gevent: Asynchronous I/O made easy 18.5. Asyncio - Asynchrone E / A, Ereignisschleifen, Collouts und Aufgaben Tutorial: Writing a TCP server in Python