Masquer les websockets async / attendent dans Python3

Je me demande si la fonction utilisant websockets qui contient ʻasyncio se répandra au sommet et deviendra ʻasync / await pour toujours ... Je pensais j'ai réussi à le faire avec ma propre bibliothèque, donc un mémorandum de cette partie

Solution

--J'ai utilisé un décorateur

environnement

code

my_library.py

import websockets
import asyncio

class EntitySpec(object):
    def __init__(self, ctx):
        self.ctx = ctx
        self.uri = "ws://{}:{}".format(
            ctx.hostname, ctx.port,
        )

    def __call__(self, func):
        async def stream(func):
            async with websockets.connect(self.uri) as ws:
                while not ws.closed:
                    try:
                        response = await ws.recv()
                        func(response)
                    except websockets.exceptions.ConnectionClosedOK:
                        self.loop.stop()
        self.loop = asyncio.get_event_loop()
        self.loop.create_task(stream(func))
        return stream

    def run(self):
        self.loop.run_forever()

class Context(object):
    def __init__(
        self,
        hostname='localhost',
        port=8765,
    ):
        self.hostname = hostname
        self.port = port        
        self.websocket = EntitySpec(self)

main.py

import my_library

api = my_library.Context(
    hostname = 'localhost',
    port = 8765,  #exemple de port standard de serveur websockets
)

@api.websocket
def reciever(msg):
    print(msg)

api.websocket.run()

Recommended Posts

Masquer les websockets async / attendent dans Python3
python async / attend curio
Scraping à l'aide de Python 3.5 async / await
Convertir l'API asynchrone de style callback en async / await en Python
Benchmark des performances de thread léger utilisant async / await implémenté dans Python 3.5
[Python] Requête asynchrone avec async / await
Python en optimisation
CURL en Python
Métaprogrammation avec Python
Python 3.3 avec Anaconda
Géocodage en python
Méta-analyse en Python
Unittest en Python
Époque en Python
Discord en Python
Jouer Python async
Allemand en Python
DCI en Python
tri rapide en python
nCr en python
N-Gram en Python
Programmation avec Python
Plink en Python
Constante en Python
FizzBuzz en Python
Sqlite en Python
Étape AIC en Python
LINE-Bot [0] en Python
CSV en Python
Assemblage inversé avec Python
Réflexion en Python
Constante en Python
nCr en Python.
format en python
Scons en Python 3
Puyopuyo en python
python dans virtualenv
PPAP en Python
Quad-tree en Python
Réflexion en Python
Chimie avec Python
Hashable en Python
DirectLiNGAM en Python
LiNGAM en Python
Aplatir en Python
Aplatir en python
AtCoder # 36 quotidien avec Python
Texte de cluster en Python
Daily AtCoder # 32 en Python
Daily AtCoder # 6 en Python
Daily AtCoder # 18 en Python
Modifier les polices en Python
Motif singleton en Python
Opérations sur les fichiers en Python
Lire DXF avec python
Daily AtCoder # 53 en Python
Séquence de touches en Python