Führen Sie die Verarbeitung mit Celery durch, das die verteilte TaskQueue-Verarbeitung durchführt Ich habe die Probe beschrieben.
Installation von Sellerie
pip install celery
Bei Fenstern wird Sellerie 4 oder höher nicht unterstützt Gibt die letzte unterstützte Windows-Version an.
Installation von Sellerie (Fenster)
pip install celery==3.1.25
Verwenden Sie die Worker-Methode, um den Prozess tatsächlich auszuführen.
tasks.py
from celery import Celery
app = Celery('tasks', result='rpc://', broker='amqp://[email protected]//')
@app.task
def add(x, y):
return x, y
Beginnen Sie dies als Arbeiter. Verwenden Sie Rabbit MQ, das um 192.168.0.3 gestartet wurde. Geben Sie rpc: // an, wo das Ergebnis gespeichert werden soll (es scheint das Ergebnis-Backend zu sein). Wenn es sich um eine Produktion handelt, scheint Redis usw. das Speicherziel zu sein.
$ celery -A tasks worker --loglevel=info
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.
If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
You must only enable the serializers that you will actually use.
warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
-------------- celery@DESKTOP-GJOIME5 v3.1.25 (Cipater)
---- **** -----
--- * *** * -- Windows-10-10.0.14393-SP0
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x22d0e56d080
- ** ---------- .> transport: amqp://guest:**@192.168.0.3:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2017-06-19 06:45:10,040: INFO/MainProcess] Connected to amqp://guest:**@192.168.0.3:5672//
[2017-06-19 06:45:10,118: INFO/MainProcess] mingle: searching for neighbors
[2017-06-19 06:45:11,262: INFO/MainProcess] mingle: all alone
[2017-06-19 06:45:11,332: WARNING/MainProcess] celery@DESKTOP-GJOIME5 ready.
Es begann sicher.
Anrufercode
>>> from tasks import add
>>> async_result = add.delay(1,2)
>>> async_result
<AsyncResult: 69bf0ccf-6e74-46e0-ae5a-1fb566bb0657>
#Es wird mit dem in Redis usw. gespeicherten Ergebnis unter Verwendung der UUID von AsyncResult verknüpft???
>>> async_result.ready()
True
>>> async_result.result
3
Aufgaben können durch Aufrufen mit der Verzögerungsmethode in die Warteschlange gestellt werden. Das Ergebnis kann aus dem Ergebnis erhalten werden, nachdem das Ergebnis von result.ready () True geworden ist.
Verhalten beim Werfen einer Aufgabe in einen bereits gestarteten Worker. Soweit ich sehen kann, scheint es, dass die angegebene Aufgabe sicher ausgeführt werden kann.
Verhalten auf der Arbeiterseite
[2017-06-19 06:56:23,934: INFO/MainProcess] Received task: tasks.add[ff679978-8edd-47db-b599-79aa3c8844eb]
[2017-06-19 06:56:23,934: INFO/MainProcess] Task tasks.add[ff679978-8edd-47db-b599-79aa3c8844eb] succeeded in
0s: 3
Geben Sie beim Speichern einer Sellerie-Instanz das Backend an, um das Ergebnis der Aufgabenausführung zu speichern. Dieses Mal wird rpc: // angegeben, es wird jedoch empfohlen, es im Produktionsbetrieb in Redis usw. zu speichern. (https://blog.ozacc.com/docs/celery/getting-started/first-steps-with-celery.html#keeping-results)
Das Anstehen bei Sellerie erwies sich als ziemlich einfach. Verwenden Sie für einen einfachen Mechanismus zur Aufgabenverarbeitung das Ziel zum Speichern von Sellerie + RabbitMQ + Ergebnis. Es scheint, dass es schnell erstellt werden kann.
Recommended Posts