[PYTHON] Implementierter Socket-Server mit Unterbrechungserkennung durch gevent oder async / await

Erinnerst du dich an ** Apsetone Deb **? Ich glaube nicht, dass mein Job ohne diese Existenz geboren worden wäre. [OSI-Referenzmodell](http://dic.nicovideo.jp/a/%E3%82%A2%E3%83%97%E3%82%BB%E3%83%88%E3%83%8D%E3 Wie man sich erinnert% 83% 87% E3% 83% 96).

Unter den OSI-Referenzmodellen handelt es sich um die Implementierung eines Socket-Servers, der die Socket-Kommunikation in der Transportschicht durch vollständige Nutzung der Parallelverarbeitung ausführt. Dieser Artikel ist eine Python-Neufassung dessen, was ich in diesem Frühjahr von einem großartigen Ingenieur über C # -Tasks und async / await gelernt habe. "Mastering TCP / IP", das in einer Ecke des Raums mit Staub bedeckt war, wurde in einer Hand installiert.

Zweck

--Parallele nicht blockierende Verarbeitung kann von Ihnen selbst geschrieben werden. --Gevent und async / await können die Verarbeitung mit Lightweight-Threads implementieren. --C10k Sie werden das Problem ein wenig verstehen können.

Ich habe den Artikel Multithread / Prozesszusammenfassung (Ruby Edition) gelesen und den Code mit der Absicht geschrieben, ihn zu überprüfen. Es ist gut organisiert. Wenn Sie also die Bedeutung der angezeigten Wörter nicht verstehen, sollten Sie es lesen. (Ich schrieb, während ich mich selbst las.)

Übersicht und Socket-Server-Spezifikationen

Typische Protokolle, die die Transportschichtfunktion in TCP / IP ausführen, sind "TCP" und "UDP". Bei der Kommunikation über TCP oder UDP wird häufig die als Socket bezeichnete Betriebssystem-API verwendet. Dieses Mal werden wir den Socket-Server und den Client implementieren. Dabei werden wir die gevent-Version und die asynchrone / await-Version von Lightweight-Threads verwenden.

Socket-Server-Spezifikationen und Betriebsablauf

Es ist eine Erweiterung des Echoservers. Es enthält die Spezifikationen der Trennungserkennungsfunktion und die Funktion zum Echo der vorherigen und aktuellen Minuten.

  1. Akzeptieren Sie die Socket-Kommunikation vom Client
  2. Wenn eine Nachricht vom Client empfangen wird, geben Sie den vorherigen und den aktuellen Empfang wieder.
  3. Wenn die Kommunikation unterbrochen ist, schließen Sie die Socket-Kommunikation

■ Bedienung des fertigen Produkts gif loop.gif

Client-Spezifikationen und Betriebsablauf

Nach dem Öffnen der Kommunikation sendet und empfängt sie insgesamt 10 Mal pro Sekunde zum und vom Socket-Server und trennt die Verbindung. 100 Clients, die eine Socket-Kommunikation durchführen, werden gleichzeitig gestartet und verarbeiten 100 Schleifen.

  1. Stellen Sie eine Verbindung zum Socket-Server her
  2. Senden Sie "HelloWorld {n}" an den Server. Beginnen Sie mit n = 0.
  3. Empfangen Sie die vorherigen und aktuellen Übertragungen vom Server
  4. 1 Sekunde Matsu ← Diesmal
  5. n ++ und kehre zum Prozess von 2 zurück.
  6. Trennen Sie die Verbindung, wenn n> = 10 ist

Client-Ausführungsimage


Serververbindung
Senden:Hello World!:0
Erhalten:Hello World!:0
Senden:Hello World!:1
Erhalten:Hello World!:0 Hello World!:1
....
Senden:Hello World!:8
Erhalten:Hello World!:7 Hello World!:8
Senden:Hello World!:9
Erhalten:Hello World!:8 Hello World!:9
Trennen Sie die Verbindung zum Server

Fehlerfall: Ein Server, der nicht parallel arbeitet, wird erstellt, wenn er normal geschrieben wird

Gemäß den Spezifikationen aktiviert der Client gleichzeitig 100 Socket-Kommunikationen, kommuniziert insgesamt 10 Mal alle 1 Sekunde und trennt dann die Verbindung. Wenn Sie den Socket-Server in Seriennummer schreiben, ist der Durchsatz extrem schlecht, da die nächste Verbindung erst gestartet werden kann, wenn die erste Verbindung hergestellt und getrennt wurde.

py35_tcp_non_parallel_server.py


# -*- coding: utf-8 -*-
import socket

#Erstellen Sie einen INET STREAM-Socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#Verbinden Sie den Socket mit dem bekannten Port des öffentlichen Hosts
server.bind(('127.0.0.1', 1234))
#Werden Sie ein Server-Socket
server.listen(10000)

while True:
    client_socket, address = server.accept()
    while True:
        #Erhalten
        fp = client_socket.makefile()
        line = fp.readline()
        print(line, type(line))
        if not line:
            #Schleifenende, wenn nicht verbunden
            break

        #Antwort
        client_socket.send(line.encode('utf-8', 'ignore'))
        print('send:{}'.format(line))

■ Ausführungsergebnis Es ist sehr langsam, weil es jede Kommunikation der Reihe nach verarbeitet. wrong_loop.gif

Python3.5 Gevent Version Socket Server

In gevent sind wait_read und wait_write vorhanden, sodass der Verbindungs-Timeout-Prozess implementiert werden kann. Praktisch. Die gevent-Version und die async / await-Version sind so geschrieben, dass sie kompatibel sind.

monkey_patch Version von gevent

Der Punkt ist, dass gevent.monkey.patch_socket () sock.accept () in nicht blockierende Verarbeitung ändert und gevent.spawn` die Serververarbeitung in einen kompakten Thread wirft. Punkt

gevent_tcp_server_by_monkey_patch.py


# -*- coding: utf-8 -*-
from gevent.pool import Group
import gevent
import socket
import gevent.monkey
# gevent.monkey.patch_all()
gevent.monkey.patch_socket()


def task_stream(client_socket):
    _prev_message = ""
    while True:
        #Erhalten
        fp = client_socket.makefile()
        line = fp.readline()
        if not line:
            #Schleifenende, wenn nicht verbunden
            break

        #Antwort
        s = _prev_message + line
        client_socket.send(s.encode('utf-8', 'ignore'))
        print('send:{}'.format(s))
        _prev_message = line


def gen_server(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind((host, port))
    sock.listen(10000)
    name = 1
    while True:
        conn, addr = sock.accept()
        gevent.spawn(worker, str(name), conn, addr)
        name += 1


def worker(name, sock, addr):
    print('new connection!')
    task = gevent.spawn(task_stream(sock))
    group.add(task)


def main():
    host, port = '127.0.0.1', 1234
    server = gevent.spawn(gen_server, host, port)
    group.add(server)
    group.join()
    server.kill()

group = Group()
main()

Stream Server-Version von gevent

Der Punkt ist, dass es nicht blockierend wartet, bis die Kommunikation mit "wait_read" kommt und der Punkt, an dem die Timeout-Verarbeitung hinzugefügt wird.

gevent_tcp_server_by_stream_server.py


# -*- coding: utf-8 -*-
from gevent.pool import Pool
from gevent.server import StreamServer
import socket as py_socket
from gevent.socket import wait_read, wait_write


class TooLong(Exception):
    pass


def handle(socket, address):
    print('new connection!')

    fp = socket.makefile()
    try:
        _prev_message = ""
        while True:
            wait_read(socket.fileno(), timeout=5, timeout_exc=TooLong)
            line = fp.readline()
            if line:
                #Generieren Sie eine Antwortzeichenfolge mit der zuletzt empfangenen
                s = _prev_message + line
                #Senden
                socket.send(s.encode('utf-8', 'ignore'))
                fp.flush()
                print('send:{}'.format(_prev_message + line))
                _prev_message = line
    except TooLong:
        print('timeout')

    #Trennen Sie die Verbindung, wenn eine Zeitüberschreitung auftritt
    socket.shutdown(py_socket.SHUT_RDWR)
    socket.close()


pool = Pool(10000)  # do not accept more than 10000 connections
server = StreamServer(('127.0.0.1', 1234), handle, spawn=pool)
server.serve_forever()

gevent_tcp_client.py


# -*- coding: utf-8 -*-
import gevent
from gevent import socket
from functools import wraps
import time

HOST = '127.0.0.1'
PORT = 1234


def client():
    conn = socket.create_connection((HOST, PORT))
    for x in range(10):
        message = 'Hello World!:{}\n'.format(x)
        #Senden
        conn.send(message.encode('utf-8', 'ignore'))
        recv_message = conn.recv(3000)
        print('recv:' + recv_message.decode('utf-8'))
        #Warten Sie 1 Sekunde
        gevent.sleep(1)
    conn.close()


def main():
    for x in range(100):
        jobs = [gevent.spawn(client) for x in range(100)]
        gevent.joinall(jobs)


main()


Python3.5 async / await-Version des Socket-Servers

In "async def task_echo (Leser, Schreiber)" sind Leser und Schreiber die Argumente, aber es ist unangenehm, aber im Python3-Dokument [es wurde in dem Beispiel geschrieben, das sich mit Socket-Kommunikation befasst](http: // docs) .python.jp / 3 / library / asyncio-stream.html # tcp-echo-server-using-stream), also habe ich es so implementiert, wie es ist. Dies liegt wahrscheinlich daran, dass es in "StreamReader" eingeschlossen ist, aber etwas ist unangenehm. Wenn es eine Klasse gibt, die eine nicht blockierende Socket-Kommunikation durchführen kann, kann sie aus "StreamReader" umgeschrieben werden, sodass die Möglichkeit besteht, dass sie in Zukunft geändert wird.

py35_tcp_server.py


# -*- coding: utf-8 -*-
import asyncio

HOST = '127.0.0.1'
PORT = 1234


async def task_echo(reader, writer):
    print('new connection!')
    _prev_message = ''
    while True:
        line = await reader.readline()
        if line == b'':
            #Trennungserkennung Wenn die Verbindung getrennt ist, lesen Sie aus irgendeinem Grund.readline()Ist B''Verlassen Sie die Schleife, um zurückzukehren
            break
        if line:
            writer.write(line)
            await writer.drain()
            print('send:{}'.format(_prev_message + line.decode()))
            _prev_message = line.decode()

    #Schließen Sie die Socket-Kommunikation, wenn eine Unterbrechung festgestellt wird
    print("Close the client socket")
    await writer.drain()
    writer.close()


def main():
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(task_echo, HOST, PORT, loop=loop)
    server = loop.run_until_complete(coro)

    # Serve requests until CTRL+c is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    # Close the server
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()


main()

py35_tcp_client.py


# -*- coding: utf-8 -*-
import asyncio

HOST = '127.0.0.1'
PORT = 1234


async def tcp_echo_client():
    reader, writer = await asyncio.open_connection(HOST, PORT)
    for x in range(10):
        message = 'Hello World!:{}\n'.format(str(x))
        #Senden
        writer.write(message.encode())
        #Erhalten
        data = (await reader.read(100)).decode()
        print('recv:{}'.format(data))
        #Warten Sie 1 Sekunde
        await asyncio.sleep(1)
    writer.close()
    return "1"


def main():
    loop = asyncio.get_event_loop()
    for x in range(100):
        tasks = asyncio.wait([tcp_echo_client() for x in range(100)])
        loop.run_until_complete(tasks)
    loop.close()

main()


Unkoordinierte Zusammenfassung

Ich konnte es wegen mangelnder Fähigkeiten nicht zusammenstellen

Schwerwiegendes Problem mit dem obigen Code

Als Socket-Server hat die obige Codegruppe ein schwerwiegendes Problem. Es kann nur 1 CPU verarbeiten. Python, das aufgrund der Einschränkung von GIL (Global Interpreter Lock) nur eine CPU verarbeiten kann, ist etwas enttäuschend. (Es wird seit mehr als 5 Jahren gesagt und es hat sich noch nicht verbessert, daher ist dies wahrscheinlich die Grenze.)

Wenn Sie subjektiv sprechen, wollten Sie unbedingt Parallelverarbeitung mit Lightweight-Threads schreiben. Auch wenn der Benutzer sich dessen nicht bewusst ist, wird sie parallel zu einem Multiprozessor und parallel zu ultraschnellem C # verarbeitet Es war ein leichter Thread wie Erlang, der frei kommunizieren konnte und der Benutzer die Anzahl der CPUs überhaupt nicht erkannte.

Die Multiprozessor-Unterstützung von Python kann nur auf um GIL-Probleme zu vermeiden verzweigt oder mehrmals mit einem Daemon gestartet werden.

Wenn die Server-Skala jedoch 10 oder 100 erreicht, tritt das gleiche Phänomen in anderen Sprachen auf. Ich kenne keine praktische Sprache, die als logische Einzelmaschine zwischen verschiedenen Servern ausgeführt wird. Python sollte so konzipiert sein, dass es in einem einzigen Prozess ausgeführt wird, und die Daemons sollten so viele wie die Anzahl der CPUs ausführen. Das Backend ist zwangsläufig so konzipiert, dass Nachrichtenwarteschlangendienste oder -speicher verwendet werden, sodass es problemlos auf mehrere Server skaliert werden kann.

Als Nebeneffekt wird das Design natürlich kompliziert. Wie ist es als Implementierung des Chat-Dienstes richtig, eine Push-Benachrichtigung zum Zeitpunkt der Veröffentlichung an aktive Benutzer im selben Chatroom zu entwerfen, die auf verteilte Weise mit mehreren Servern verbunden sind?

Vorteile der Handhabung leichter Gewinde

Wenn Sie mit leichten Threads umgehen können, können Sie effiziente Server schreiben, die mit der Außenwelt kommunizieren. Indem die Verarbeitung asynchron auf die Kommunikation wartet, wird es möglich, die CPU parallel anderen Arbeiten zuzuweisen, ohne tatsächlich auf die Wartezeit zu warten.

Unterschied zwischen gevent und async / await

Leichte Threads, die nur von der externen Bibliothek gevent (Greenlet) verarbeitet werden konnten, können jetzt von reinem Python mit der Implementierung von asyncio in Python 3.4 verarbeitet werden. Darüber hinaus wurde Python 3.5 async / await implementiert, um leichtgewichtige Threads mit einer einfacheren Beschreibung zu verarbeiten.

Als ich die asynchrone / warte-Version schrieb, dachte ich immer wieder darüber nach, wie man eine blockierende E / A zu einer nicht blockierenden E / A macht. Nach dem Schreiben der asynchronen / wartenden Version finde ich das Designkonzept, das IO implizit in nicht blockierende Verarbeitung durch Berühren von gevent konvertiert, wunderbar.

Auszug aus dem Gevent-Tutorial


Die wahre Stärke von gevent ist die kollaborative Planung von Netzwerk- und E / A-gebundenen Funktionen.
Bei Verwendung zur Ausführung. gevent kümmert sich so weit wie möglich um alle Details und Netzwerke
Bewirkt, dass die Bibliothek den Greenlet-Kontext implizit überträgt.

Es gibt viele nützliche Funktionen und Klassen in gevent, die den Juckreiz erreichen können, und ich denke, dass gevent vorerst verwendet wird.

Verwenden Sie in der Praxis TCP oder UDP

Verwenden Sie in der Praxis unbedingt das TCP- oder UDP-Protokoll, da Raw-Sockets beim Senden großer Datenmengen nicht in Teilen ankommen und die Reihenfolge der Ankunft nicht garantiert ist und die Erkennung der Trennung nicht gut durchgeführt werden kann.

Hausaufgabe 1 - Schreiben wir in node.js

Wenn Sie nach Rücksprache mit einem erstklassigen Techniker im Unternehmen über Lightweight-Threads einen Echoserver schreiben, der auf die vorherigen und aktuellen Empfänge in "node.js" reagiert, * wenn Sie ihn normal schreiben, wird es in der Callback-Hölle sehr schwierig *. Mir wurde jedoch geraten, dass es eine gute Erfahrung sein würde, es einmal zu schreiben, deshalb würde ich es gerne eines Tages versuchen.

Hausaufgabe 2 - Neuimplementierung der Timeout-Klasse für gevent

Die Timeout-Klasse, die die Ausführungszeit des Gevent-Codeblocks einschränken kann, scheint nützlich zu sein, daher möchte ich sie in reinem Python erneut implementieren.

gevent Tutorial-Timeouts


import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
    pass

with Timeout(time_to_wait, TooLong):
    gevent.sleep(10)

Referenz

[TCP / IP beherrschen](http://www.amazon.co.jp/%E3%83%9E%E3%82%B9%E3%82%BF%E3%83%AA%E3%83%B3% E3% 82% B0 TCP-IP-% E5% 85% A5% E9% 96% 80% E7% B7% A8-% E7% AC% AC5% E7% 89% 88-% E7% AB% B9% E4% B8 % 8B / dp / 4274068765) gevent tutorial gevent: Asynchronous I/O made easy 18.5. Asyncio - Asynchrone E / A, Ereignisschleifen, Collouts und Aufgaben Tutorial: Writing a TCP server in Python

Recommended Posts

Implementierter Socket-Server mit Unterbrechungserkennung durch gevent oder async / await
[Python] Asynchrone Anfrage mit async / await
Async / warte mit Kivy und tkinter
Ich habe versucht, mit einem Remote-Server über Socket-Kommunikation mit Python zu kommunizieren.