Ein weiteres persönliches Memo.
Da es wie ↓ aussieht, habe ich es geschrieben, weil ich den Prozess beim Warten auf den Empfang nach Anforderungsquelle und Nachrichtentyp trennen wollte.
main.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import procs
workers = {}
source = procs.Source()
try:
while True:
#Umfrage aus der Quelle
data = source.poll()
key = data['key']
#Wird generiert, wenn dem Schlüssel kein Worker entspricht
if key not in workers:
workers[key] = procs.Worker()
#Delegieren Sie die Verarbeitung an den Mitarbeiter
workers[key].delegate(data)
finally:
#
source.terminate()
[ w.terminate() for _,w in workers ]
procs.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import abc
import multiprocessing
class Proc(metaclass=abc.ABCMeta):
def __init__(self):
self._stop = multiprocessing.Event()
self._queue = multiprocessing.SimpleQueue()
self._process = multiprocessing.Process(target=self._run)
self._process.start()
def terminate(self):
self._stop.set()
self._process.join()
self._process.terminate()
def _run(self):
while not self._stop.is_set():
self._do()
@abc.abstractmethod
def _do(self, **kwargs):
pass
class Source(Proc):
def _do(self, **kwargs):
#Nachrichten empfangen
data = { 'key' : 'some-data' }
self._queue.put(data)
def poll(self):
return self._queue.get()
class Worker(Proc):
def _do(self, **kwargs):
data = self._queue.get()
#Verarbeiten Sie die empfangenen Daten
def delegate(self, data):
return self._queue.put(data)
Recommended Posts