[PYTHON] Implemented socket server with disconnection detection by gevent or async / await

Do you remember ** Apsetone Deb **? I don't think my job would have even been born without this existence. [OSI Reference Model](http://dic.nicovideo.jp/a/%E3%82%A2%E3%83%97%E3%82%BB%E3%83%88%E3%83%8D%E3 How to remember% 83% 87% E3% 83% 96).

Among the OSI reference models, this is a story of implementing a socket server that performs socket communication in the transport layer by making full use of parallel processing. This article is a Python rewrite of what I had when a great engineer taught me about C # Tasks and async / await this spring. "Mastering TCP / IP" that was covered with dust in one corner of the room was mounted in one hand.

Purpose

--Parallel non-blocking processing can be written by yourself. --Gevent and async / await will be able to implement processing using lightweight threads respectively. --You will be able to understand the C10k problem a little.

I read the article Multithread / Process Summary (Ruby Edition) and wrote the code with the intention of reviewing it. It's well organized, so if you don't understand the meaning of the words that appear, you should read it. (I wrote while reading myself.)

Overview and Socket server specifications

Typical protocols that perform the transport layer function in TCP / IP are "TCP" and "UDP". When communicating using TCP or UDP, the OS API called socket is widely used. This time we will implement the socket server and client. In the process, we will use the gevent version and the async / await version of lightweight threads.

Socket server specifications and operating flow

It is a specification that extends the echo server. The specifications of the disconnection detection function and the function to echo the previous and current minutes are included.

  1. Accept socket communication from client
  2. When a message is received from the client, echo the previous reception and the current reception.
  3. If communication is disconnected, close socket communication

■ Operation of finished product gif loop.gif

Client specifications and operating flow

After opening the communication, it sends and receives to and from the socket server a total of 10 times every second and disconnects. 100 clients that perform socket communication are started at the same time and process 100 loops.

  1. Connect to the socket server
  2. Send HelloWorld {n} to the server. Start from n = 0.
  3. Receive the previous and current transmissions from the server
  4. 1 second matsu ← This time
  5. n ++ and return to the process of 2.
  6. Disconnect when n> = 10

Client execution image


Server connection
Send:Hello World!:0
Receive:Hello World!:0
Send:Hello World!:1
Receive:Hello World!:0 Hello World!:1
....
Send:Hello World!:8
Receive:Hello World!:7 Hello World!:8
Send:Hello World!:9
Receive:Hello World!:8 Hello World!:9
Disconnect from server

Failure case: If you write normally, you will have a server that does not operate in parallel.

According to the specifications, the client activates 100 socket communications at the same time, communicates 10 times in total every second, and then disconnects. If you write the socket server in serial code, the throughput will be extremely poor because the next connection cannot be started until the first connection is completed and disconnected.

py35_tcp_non_parallel_server.py


# -*- coding: utf-8 -*-
import socket

#Create an INET STREAM socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#Connect the socket to the well-known port of the public host
server.bind(('127.0.0.1', 1234))
#Become a server socket
server.listen(10000)

while True:
    client_socket, address = server.accept()
    while True:
        #Receive
        fp = client_socket.makefile()
        line = fp.readline()
        print(line, type(line))
        if not line:
            #End loop if disconnected
            break

        #response
        client_socket.send(line.encode('utf-8', 'ignore'))
        print('send:{}'.format(line))

■ Execution result It is very slow because it processes each communication in order. wrong_loop.gif

Python3.5 gevent version socket server

In gevent, wait_read and wait_write exist, so the connection Timeout process could be implemented. Convenient. The gevent version and the async / await version are written to be compatible.

monkey_patch version of gevent

The point is that gevent.monkey.patch_socket () changes sock.accept () to non-blocking processing, and gevent.spawn throws server processing into lightweight threads. point

gevent_tcp_server_by_monkey_patch.py


# -*- coding: utf-8 -*-
from gevent.pool import Group
import gevent
import socket
import gevent.monkey
# gevent.monkey.patch_all()
gevent.monkey.patch_socket()


def task_stream(client_socket):
    _prev_message = ""
    while True:
        #Receive
        fp = client_socket.makefile()
        line = fp.readline()
        if not line:
            #End loop if disconnected
            break

        #response
        s = _prev_message + line
        client_socket.send(s.encode('utf-8', 'ignore'))
        print('send:{}'.format(s))
        _prev_message = line


def gen_server(host, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind((host, port))
    sock.listen(10000)
    name = 1
    while True:
        conn, addr = sock.accept()
        gevent.spawn(worker, str(name), conn, addr)
        name += 1


def worker(name, sock, addr):
    print('new connection!')
    task = gevent.spawn(task_stream(sock))
    group.add(task)


def main():
    host, port = '127.0.0.1', 1234
    server = gevent.spawn(gen_server, host, port)
    group.add(server)
    group.join()
    server.kill()

group = Group()
main()

StreamServer version of gevent

The point is that it waits non-blocking until communication comes with wait_read and the point that timeout processing is added.

gevent_tcp_server_by_stream_server.py


# -*- coding: utf-8 -*-
from gevent.pool import Pool
from gevent.server import StreamServer
import socket as py_socket
from gevent.socket import wait_read, wait_write


class TooLong(Exception):
    pass


def handle(socket, address):
    print('new connection!')

    fp = socket.makefile()
    try:
        _prev_message = ""
        while True:
            wait_read(socket.fileno(), timeout=5, timeout_exc=TooLong)
            line = fp.readline()
            if line:
                #Generate a response string including the last received
                s = _prev_message + line
                #Send
                socket.send(s.encode('utf-8', 'ignore'))
                fp.flush()
                print('send:{}'.format(_prev_message + line))
                _prev_message = line
    except TooLong:
        print('timeout')

    #Disconnect when Timeout occurs
    socket.shutdown(py_socket.SHUT_RDWR)
    socket.close()


pool = Pool(10000)  # do not accept more than 10000 connections
server = StreamServer(('127.0.0.1', 1234), handle, spawn=pool)
server.serve_forever()

gevent_tcp_client.py


# -*- coding: utf-8 -*-
import gevent
from gevent import socket
from functools import wraps
import time

HOST = '127.0.0.1'
PORT = 1234


def client():
    conn = socket.create_connection((HOST, PORT))
    for x in range(10):
        message = 'Hello World!:{}\n'.format(x)
        #Send
        conn.send(message.encode('utf-8', 'ignore'))
        recv_message = conn.recv(3000)
        print('recv:' + recv_message.decode('utf-8'))
        #Wait 1 second
        gevent.sleep(1)
    conn.close()


def main():
    for x in range(100):
        jobs = [gevent.spawn(client) for x in range(100)]
        gevent.joinall(jobs)


main()


Python3.5 async / await version of socket server

ʻAsync def task_echo (reader, writer)has reader and writer as arguments, which is unpleasant, but the Python3 document says [the sample that handles socket communication says so](http: // docs) .python.jp/3/library/asyncio-stream.html#tcp-echo-server-using-streams), so I implemented it as it is. It may be because it is wrapped withStreamReader, but something is unpleasant .. If there is a class that can perform non-blocking socket communication, it can be rewritten from StreamReader`, so there is a possibility that it will be modified in the future.

py35_tcp_server.py


# -*- coding: utf-8 -*-
import asyncio

HOST = '127.0.0.1'
PORT = 1234


async def task_echo(reader, writer):
    print('new connection!')
    _prev_message = ''
    while True:
        line = await reader.readline()
        if line == b'':
            #Disconnection detection When disconnected, reader for some reason.readline()Is b''Exit the loop to return
            break
        if line:
            writer.write(line)
            await writer.drain()
            print('send:{}'.format(_prev_message + line.decode()))
            _prev_message = line.decode()

    #Close socket communication when disconnection is detected
    print("Close the client socket")
    await writer.drain()
    writer.close()


def main():
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(task_echo, HOST, PORT, loop=loop)
    server = loop.run_until_complete(coro)

    # Serve requests until CTRL+c is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    # Close the server
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()


main()

py35_tcp_client.py


# -*- coding: utf-8 -*-
import asyncio

HOST = '127.0.0.1'
PORT = 1234


async def tcp_echo_client():
    reader, writer = await asyncio.open_connection(HOST, PORT)
    for x in range(10):
        message = 'Hello World!:{}\n'.format(str(x))
        #Send
        writer.write(message.encode())
        #Receive
        data = (await reader.read(100)).decode()
        print('recv:{}'.format(data))
        #Wait 1 second
        await asyncio.sleep(1)
    writer.close()
    return "1"


def main():
    loop = asyncio.get_event_loop()
    for x in range(100):
        tasks = asyncio.wait([tcp_echo_client() for x in range(100)])
        loop.run_until_complete(tasks)
    loop.close()

main()


Uncoordinated summary

I couldn't put it together due to lack of ability

Fatal problem with the above code

As a socket server, the above code group has a fatal problem. It can only handle 1 CPU. Python, which can handle only one CPU due to the restrictions of GIL (Global Interpreter Lock), is a little disappointing. (It's been said for more than 5 years and it hasn't improved yet, so this is probably the limit.)

If you talk subjectively, what you really wanted is to write parallel processing using lightweight threads, and even if the user is not aware of it, it will be processed in parallel with a multiprocessor and super fast C #, and you can create lightweight threads as much as you want with message passing between threads. It was a lightweight thread like Erlang that was free to communicate and the user did not recognize the number of CPUs at all.

Python's multiprocessor support can only be forked to to avoid GIL problems or started multiple times with a daemon.

However, when the server scale reaches 10 or 100, the same phenomenon occurs in other languages. I don't know a practical language that runs between different servers as a logical single machine. Python should be designed to run in a single process and run as many daemons as there are CPUs. Inevitably the backend will be designed to utilize message queuing services or storage, so it should easily scale to multiple servers.

Of course, the side effect is that the design is complicated. When implementing a chat service, how is it correct to design to send push notifications at the time of posting to active users in the same chat room that are connected to multiple servers in a distributed manner?

Benefits of being able to handle lightweight threads

If you can handle lightweight threads, you will be able to write efficient servers that communicate with the outside world. By making the processing waiting for communication asynchronous, it becomes possible to allocate CPU to other work in parallel without actually waiting for the waiting time.

Difference between gevent and async / await

Lightweight threads that could only be handled by the external library gevent (Greenlet) can now be handled by pure Python with the implementation of asyncio in Python 3.4. In addition, Python 3.5's async / await has been implemented to handle lightweight threads with a simpler description.

When I was writing the async / await version, I kept thinking about how to turn a blocking IO into a non-blocking IO. After writing the async / await version, I think the design concept that implicitly converts IO to non-blocking processing by touching gevent is wonderful.

Excerpt from gevent tutorial


The real power of gevent is network and IO bound, co-scheduled functions.
When using it to execute. gevent takes care of all the details and networks as much as possible
Causes the library to implicitly transfer the greenlet context.

There are many useful functions and classes in gevent that can reach the itch, and I think that gevent will be used for the time being.

Use TCP or UDP in practice

In practice, be sure to use TCP or UDP protocol because raw sockets do not arrive in pieces when sending huge data and the order of arrival is not guaranteed, and disconnection detection cannot be performed well.

Homework 1 --Let's write with node.js

After consulting with a top-class engineer in the company about lightweight threads, if you write an echo server that responds to the previous and this time in node.js, * if you write it normally, it will be very difficult in callback hell *. However, I was advised that writing it once would be a good experience, so I would like to try it someday.

Homework 2-Reimplementation of gevent's Timeout class

The Timeout class, which can constrain the execution time of the gevent code block, seems to be useful, so I'd like to reimplement it in pure Python.

gevent tutorial-Timeouts


import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
    pass

with Timeout(time_to_wait, TooLong):
    gevent.sleep(10)

reference

[Mastering TCP / IP](http://www.amazon.co.jp/%E3%83%9E%E3%82%B9%E3%82%BF%E3%83%AA%E3%83%B3% E3% 82% B0 TCP-IP-% E5% 85% A5% E9% 96% 80% E7% B7% A8-% E7% AC% AC5% E7% 89% 88-% E7% AB% B9% E4% B8 % 8B / dp / 4274068765) gevent tutorial gevent: Asynchronous I/O made easy 18.5. Asyncio – Asynchronous I / O, Event Loops, Coroutines and Tasks Tutorial: Writing a TCP server in Python

Recommended Posts

Implemented socket server with disconnection detection by gevent or async / await
[Python] Asynchronous request with async / await
Async / await with Kivy and tkinter
I tried to communicate with a remote server by Socket communication with Python.