Ich schreibe einen Daemon, der 24 Stunden am Tag in Python läuft. Ich habe beschlossen, eine REST-API hinzuzufügen, um den Betriebsstatus zu überprüfen und die Einstellungen dynamisch zu ändern. Wir haben nach einer Möglichkeit gesucht, sie mit minimalem Aufwand zu implementieren, ohne größere Änderungen an der Hauptroutine vorzunehmen.
Sie benötigen eine Art Webserver, um die REST-API bereitzustellen. Da es nicht nach außen offen ist, verwenden wir den in Flask enthaltenen Entwicklungswebserver. Obwohl es für die Entwicklung vorgesehen ist, unterstützt es auch den gleichzeitigen Zugriff mit Threads, sodass es der praktischen Verwendung standhält.
Ich habe mich für die Flask-API entschieden, die einfach zu implementieren ist und sogar eine Test-Benutzeroberfläche hat.
Das folgende Experiment verwendet das Flask-API-Beispiel. Kopieren Sie es und speichern Sie es als Datei mit dem Namen example.py. http://www.flaskapi.org/#example
Um die Hauptroutine des Dämons und des Webservers gleichzeitig auszuführen, müssen Sie anscheinend Asyncio oder Threads verwenden. Flask sollte nicht mit Asyncio verwendet werden, daher werde ich Threads verwenden.
Welche sollten die Initiative, die Hauptroutine oder den Webserver ergreifen? In Anbetracht der Tatsache, dass die REST-API eine zusätzliche Funktion ist, scheint es besser, den Hauptthread als Dämon und den abgeleiteten Thread als Webserver zu verwenden. (Postscript: Es kann unbegründet sein. Aus der Sicht der Verwaltung von Daemons ist das Web das Hauptthema, und Worker-Threads sind für Daemons natürlicher.)
Erstellen Sie einen Thread in der Hauptroutine und starten Sie den Server von Flask von dort aus. Zeigen Sie jede Sekunde die Zeit an, um sicherzustellen, dass der Haupt-Thread nicht blockiert ist.
$ diff -u example.py example_threaded.py
--- example.py 2016-12-20 16:19:19.000000000 -0800
+++ example_threaded.py 2016-12-20 16:23:43.000000000 -0800
@@ -1,6 +1,13 @@
+import logging
+import threading
+
from flask import request, url_for
from flask.ext.api import FlaskAPI, status, exceptions
+FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
+logging.basicConfig(format=FORMAT, level=logging.INFO)
+log = logging.getLogger()
+
app = FlaskAPI(__name__)
@@ -53,5 +60,14 @@
if __name__ == "__main__":
- app.run(debug=True)
+
+ log.info('start')
+ rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(debug=True))
+ rest_service_thread.start()
+ log.info('main thread is mine!')
+ import time
+ while True:
+ print(time.ctime())
+ time.sleep(1)
+ rest_service_thread.join()
** Ausführungsergebnis **
Werkzeug, der Webserver in Flask, gibt einige Fehler aus. Da der Hauptthread aktiv ist, wird die Uhrzeit angezeigt, die REST-API antwortet jedoch nicht.
2016-12-20 16:30:32,129 root MainThread start
2016-12-20 16:30:32,130 root MainThread main thread is mine!
Tue Dec 20 16:30:32 2016
2016-12-20 16:30:32,141 werkzeug reset_service * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
Exception in thread reset_service:
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/flask/app.py", line 843, in run
run_simple(host, port, self, **options)
File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/werkzeug/serving.py", line 692, in run_simple
reloader_type)
File "/Users/knoguchi/.virtualenvs/flask3/lib/python3.5/site-packages/werkzeug/_reloader.py", line 242, in run_with_reloader
signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
File "/usr/local/Cellar/python3/3.5.2_3/Frameworks/Python.framework/Versions/3.5/lib/python3.5/signal.py", line 47, in signal
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread
Tue Dec 20 16:30:33 2016
Tue Dec 20 16:30:34 2016
Es scheint, dass der Signalhandler MainThread sein muss. Es scheint, dass es eine praktische Funktion in der Entwicklung von Flask ist, Änderungen zu überwachen und Dateien neu zu laden. Da dies jedoch nicht erforderlich ist, löschen Sie debug = True
und versuchen Sie es erneut.
** Ausführungsergebnis 2 **
Diesmal lief es gut. Wenn Sie die REST-API unter http: // localhost: 5000 aufrufen, reagiert sie ordnungsgemäß.
2016-12-20 16:38:54,214 root MainThread start
2016-12-20 16:38:54,215 root MainThread main thread is mine!
Tue Dec 20 16:38:54 2016
2016-12-20 16:38:54,224 werkzeug reset_service * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
Tue Dec 20 16:38:55 2016
2016-12-20 16:38:55,840 werkzeug reset_service 127.0.0.1 - - [20/Dec/2016 16:38:55] "GET / HTTP/1.1" 200 -
Tue Dec 20 16:38:56 2016
2016-12-20 16:38:56,827 werkzeug reset_service 127.0.0.1 - - [20/Dec/2016 16:38:56] "GET / HTTP/1.1" 200 -
Tue Dec 20 16:38:57 2016
Jetzt haben Sie die Grundlagen, um die REST-API bereitzustellen und gleichzeitig den Hauptthread für den Dämon zuzuweisen.
Der einfachste Weg, Befehle von der REST-API an den Dämon zu senden, ist die Verwendung von "Warteschlange". Pythons Warteschlange ist threadsicher, sodass Sie ein globales Warteschlangenobjekt erstellen und ohne Inkonsistenzen aus mehreren Threads ablegen können. Es gibt jedoch nur eine begrenzte Anzahl von Fällen, in denen dies alles ist, was benötigt wird.
Lassen Sie uns example_threaded.py weiter modifizieren und mit der Aktualisierung der Notes-Variablen aus dem Hauptthread und dem Webserver-Thread experimentieren. Der Hauptthread fügt den von STDIN eingegebenen String zu Notizen hinzu.
Da Notizen Wörterbücher sind, treten Inkonsistenzen auf, wenn gleichzeitig auf sie zugegriffen wird, und ein Fehler tritt insbesondere dann auf, wenn beim Zugriff in einer for-Schleife zusätzliche oder gelöschte Schlüssel auftreten. Also habe ich versucht, es mit einem "synchronisierten" Dekorateur zu verriegeln, um den kritischen Bereich zu schützen.
$ diff -u example_threaded.py example_threaded2.py
--- example_threaded.py 2016-12-20 17:17:10.000000000 -0800
+++ example_threaded2.py 2016-12-20 17:29:06.000000000 -0800
@@ -23,7 +23,38 @@
'text': notes[key]
}
+def synchronized(lock):
+ """ Synchronization decorator. """
+ def wrap(f):
+ def newFunction(*args, **kw):
+ lock.acquire()
+ try:
+ return f(*args, **kw)
+ finally:
+ lock.release()
+ return newFunction
+ return wrap
+
+glock = threading.Lock()
+
+@synchronized(glock)
+def list_notes():
+ return [note_repr(idx) for idx in sorted(notes.keys())]
+
+@synchronized(glock)
+def add_note(note):
+ idx = max(notes.keys()) + 1
+ notes[idx] = note
+ return idx
+@synchronized(glock)
+def update_note(key, note):
+ notes[key] = note
+
+@synchronized(glock)
+def delete_note(key):
+ return notes.pop(key, None)
+
@app.route("/", methods=['GET', 'POST'])
def notes_list():
"""
@@ -31,12 +62,11 @@
"""
if request.method == 'POST':
note = str(request.data.get('text', ''))
- idx = max(notes.keys()) + 1
- notes[idx] = note
+ idx = add_note(note)
return note_repr(idx), status.HTTP_201_CREATED
# request.method == 'GET'
- return [note_repr(idx) for idx in sorted(notes.keys())]
+ return list_notes()
@app.route("/<int:key>/", methods=['GET', 'PUT', 'DELETE'])
@@ -46,11 +76,11 @@
"""
if request.method == 'PUT':
note = str(request.data.get('text', ''))
- notes[key] = note
+ update_note(key, note)
return note_repr(key)
elif request.method == 'DELETE':
- notes.pop(key, None)
+ delete_note(key)
return '', status.HTTP_204_NO_CONTENT
# request.method == 'GET'
@@ -60,14 +90,15 @@
if __name__ == "__main__":
-
+
log.info('start')
- rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(debug=True))
+ rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict())
rest_service_thread.start()
log.info('main thread is mine!')
- import time
- while True:
- print(time.ctime())
- time.sleep(1)
+ import sys
+ for line in iter(sys.stdin):
+ if not line:
+ break
+ add_note(line.strip())
rest_service_thread.join()
** Ausführungsergebnis **
Geben Sie nach dem Start eine entsprechende Zeichenfolge über das Terminal ein. Wenn Sie die Liste von der REST-API erhalten, können Sie bestätigen, dass die eingegebene Zeichenfolge hinzugefügt wurde.
Hintergrundprozesse werden lose als Dämonen bezeichnet, aber gut erzogene Dämonen haben viel zu tun.
Für Python daemonize und python-daemon ) Kann leicht gemacht werden. Verwenden wir Python-Daemon mit den minimalen Einstellungen. Ich kann stdin nicht verwenden, daher habe ich versucht, stattdessen alle 3 Sekunden eine Notiz hinzuzufügen. Wenn Sie auf die REST-API zugreifen, können Sie feststellen, dass die Notizen alle 3 Sekunden zunehmen.
--- example_threaded2.py 2016-12-25 16:25:08.000000000 -0800
+++ example_daemon.py 2016-12-25 16:29:29.000000000 -0800
@@ -1,8 +1,9 @@
import logging
import threading
+import daemon
from flask import request, url_for
-from flask.ext.api import FlaskAPI, status, exceptions
+from flask_api import FlaskAPI, status, exceptions
FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
@@ -89,16 +90,16 @@
return note_repr(key)
-if __name__ == "__main__":
-
+def main():
log.info('start')
rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(threaded=True))
rest_service_thread.start()
log.info('main thread is mine!')
- import sys
- for line in iter(sys.stdin):
- if not line:
- break
- add_note(line.strip())
+ import time
+ for i in range(10):
+ add_note("note{}".format(i))
+ time.sleep(3)
rest_service_thread.join()
+with daemon.DaemonContext():
+ main()
Laufen Sie und sehen Sie
$ python example_daemon.py
$
$ ps xao pid,ppid,pgid,sid,comm | grep python
2860 1 2859 2859 python
Under System V–based systems, some people recommend calling fork again at this point, terminating the parent, and continuing the daemon in the child. This guarantees that the daemon is not a session leader, which prevents it from acquiring a controlling terminal under the System V rules (Section 9.6). Alternatively, to avoid acquiring a controlling terminal, be sure to specify O_NOCTTY whenever opening a terminal device.
lsof -p 2860
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
python 2860 root cwd DIR 254,1 4096 2 /
python 2860 root rtd DIR 254,1 4096 2 /
python 2860 root txt REG 254,1 3781768 264936 /usr/bin/python2.7
python 2860 root mem REG 254,1 47712 1045078 /lib/x86_64-linux-gnu/libnss_files-2.19.so
python 2860 root mem REG 254,1 54248 391882 /usr/lib/python2.7/lib-dynload/_json.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 18904 1044589 /lib/x86_64-linux-gnu/libuuid.so.1.3.0
python 2860 root mem REG 254,1 31048 265571 /usr/lib/x86_64-linux-gnu/libffi.so.6.0.2
python 2860 root mem REG 254,1 141184 392622 /usr/lib/python2.7/lib-dynload/_ctypes.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 10464 796274 /usr/lib/python2.7/dist-packages/markupsafe/_speedups.so
python 2860 root mem REG 254,1 29464 392892 /usr/lib/python2.7/lib-dynload/_hashlib.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 2066816 264782 /usr/lib/x86_64-linux-gnu/libcrypto.so.1.0.0
python 2860 root mem REG 254,1 395176 264784 /usr/lib/x86_64-linux-gnu/libssl.so.1.0.0
python 2860 root mem REG 254,1 97872 392612 /usr/lib/python2.7/lib-dynload/_ssl.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 11248 392000 /usr/lib/python2.7/lib-dynload/resource.x86_64-linux-gnu.so
python 2860 root mem REG 254,1 1607712 269275 /usr/lib/locale/locale-archive
python 2860 root mem REG 254,1 1738176 1045067 /lib/x86_64-linux-gnu/libc-2.19.so
python 2860 root mem REG 254,1 1051056 1045072 /lib/x86_64-linux-gnu/libm-2.19.so
python 2860 root mem REG 254,1 109144 1044580 /lib/x86_64-linux-gnu/libz.so.1.2.8
python 2860 root mem REG 254,1 10680 1045291 /lib/x86_64-linux-gnu/libutil-2.19.so
python 2860 root mem REG 254,1 14664 1045071 /lib/x86_64-linux-gnu/libdl-2.19.so
python 2860 root mem REG 254,1 137440 1044987 /lib/x86_64-linux-gnu/libpthread-2.19.so
python 2860 root mem REG 254,1 140928 1044988 /lib/x86_64-linux-gnu/ld-2.19.so
python 2860 root 0u CHR 1,3 0t0 5593 /dev/null
python 2860 root 1u CHR 1,3 0t0 5593 /dev/null
python 2860 root 2u CHR 1,3 0t0 5593 /dev/null
python 2860 root 3u IPv4 3596677 0t0 TCP localhost:5000 (LISTEN)
Die übergeordnete Prozess-ID ist 1 und wird übernommen. Die Sitzungs-ID und die Prozessgruppen-ID stimmen überein. Die Sitzungs-ID von bash in der Login-Shell war übrigens 2693, es handelt sich also erwartungsgemäß um eine neue Sitzung. Es scheint, dass die Gabel zweimal gemacht wird und pid und pgid unterschiedlich sind. Der Grund für das zweimalige Ausführen von fork ist folgender (aus der erweiterten Programmierung in Unix, Kapitel 13). Sehen Sie sich den Dateideskriptor mit lsof -p 2860 an. Oh! STDIN (0), STDOUT (1), STDERR (2) sind wunderbar / dev / null. Und das aktuelle Arbeitsverzeichnis (cwd) ist /. Das Stammverzeichnis (rtd) ist ebenfalls /, dies kann jedoch geändert werden, indem die Einstellung an chroot übergeben wird. Es ist ein wahrhaft orthodoxer Dämon! Es funktioniert auch dann gut, wenn ich mich abmelde oder vom RC-Skript aus starte. Zusammenfassung - Führen Sie den Entwicklungswebserver von Flask erfolgreich in einem abgeleiteten Thread mit der Hauptroutine im Hauptthread aus. - Es wurde bestätigt, dass Variablen zwischen der Hauptroutine und der REST-API gemeinsam genutzt / aktualisiert werden können.
Wir experimentieren immer noch mit verschiedenen Dingen. Bitte teilen Sie uns mit, ob es eine leichtere Methode gibt oder eine Methode, die ohne globale Sperre realisiert werden kann.
Der Code ist Python3, aber mit ein paar Änderungszeilen sollte er auch in Python2 funktionieren. threaded = True dient zum gleichzeitigen Akzeptieren mehrerer Anforderungen und hat nichts mit dem Thread in diesem Dokument zu tun.
import logging
import threading
import daemon
from flask import request, url_for
from flask_api import FlaskAPI, status, exceptions
FORMAT = '%(asctime)-15s %(name)s %(threadName)s %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
log = logging.getLogger()
app = FlaskAPI(__name__)
notes = {
0: 'do the shopping',
1: 'build the codez',
2: 'paint the door',
}
def note_repr(key):
return {
'url': request.host_url.rstrip('/') + url_for('notes_detail', key=key),
'text': notes[key]
}
def synchronized(lock):
""" Synchronization decorator. """
def wrap(f):
def newFunction(*args, **kw):
lock.acquire()
try:
return f(*args, **kw)
finally:
lock.release()
return newFunction
return wrap
glock = threading.Lock()
@synchronized(glock)
def list_notes():
return [note_repr(idx) for idx in sorted(notes.keys())]
@synchronized(glock)
def add_note(note):
idx = max(notes.keys()) + 1
notes[idx] = note
return idx
@synchronized(glock)
def update_note(key, note):
notes[key] = note
@synchronized(glock)
def delete_note(key):
return notes.pop(key, None)
@app.route("/", methods=['GET', 'POST'])
def notes_list():
"""
List or create notes.
"""
if request.method == 'POST':
note = str(request.data.get('text', ''))
idx = add_note(note)
return note_repr(idx), status.HTTP_201_CREATED
# request.method == 'GET'
return list_notes()
@app.route("/<int:key>/", methods=['GET', 'PUT', 'DELETE'])
def notes_detail(key):
"""
Retrieve, update or delete note instances.
"""
if request.method == 'PUT':
note = str(request.data.get('text', ''))
update_note(key, note)
return note_repr(key)
elif request.method == 'DELETE':
delete_note(key)
return '', status.HTTP_204_NO_CONTENT
# request.method == 'GET'
if key not in notes:
raise exceptions.NotFound()
return note_repr(key)
def main():
log.info('start')
rest_service_thread = threading.Thread(name='reset_service', target=app.run, kwargs=dict(threaded=True))
rest_service_thread.start()
log.info('main thread is mine!')
import time
for i in range(10):
add_note("note{}".format(i))
time.sleep(3)
rest_service_thread.join()
with daemon.DaemonContext():
main()
Recommended Posts