TL;DR Als ich versuchte, RxPY zu verwenden, weil ich Rx in Python schreiben wollte, war es anders als ich dachte, dass es bequem zu verwenden wäre. Anscheinend wird in der neuen Spezifikation die beobachtbare Verkettung unter Verwendung eines Mechanismus namens Rohr geschrieben. https://rxpy.readthedocs.io/en/latest/migration.html
Konventionell ↓
observable_object.map(lambda x:x*2) \
.filter(lambda x:x>3) \
.subscribe(print) \
Derzeit ↓
from rx import operators as ops
observable_object.pipe(
ops.map(lambda x:x*2),
ops.filter(lambda x:x>3)
).subscribe(print)
Ich wollte das nur vermitteln.
Es gibt jedoch nicht zu viele Artikel über RxPY auf Japanisch, daher werde ich RxPY zusammenfassen, um Rx an Anfänger-Pythonisten zu verbreiten.
Es ist eine API zum intelligenten Schreiben einer asynchronen Verarbeitung, indem Daten in einem Observable-Stream verarbeitet werden, der eine Linq-ähnliche Verarbeitung ausführen kann. http://reactivex.io/ Es kann in den meisten wichtigen Sprachen verwendet werden. http://reactivex.io/languages.html Für Python unterstützt RxPY dies. https://rxpy.readthedocs.io/en/latest/index.html
Es gibt bereits viele Erklärungen zu dem Konzept, daher werde ich es der offiziellen Seite und anderen Qiita-Artikeln überlassen, aber ich werde es hier nicht tun. Stattdessen zeige ich Ihnen den primitiven Code für RxPY.
Reactive Extensions behandelt Daten als Streams. Umgekehrt müssen die Daten, mit denen Reactive Extensions arbeiten sollen, in einen Stream konvertiert werden.
import rx
# 0,1,2,3,Generiere 4 Streams.
rx.range(0,5)
# 'aaa','bbb','ccc'So generieren Sie einen Stream von.
rx.of('aaa','bbb','ccc')
#Konvertieren Sie eine Liste in einen Stream.
l = [0,1,2,3,4]
rx.from_(l)
Wir werden alle fließenden Daten der Reihe nach verwenden. Reactive Extensions abonniert bei Verwendung von Stream-Daten. Es kann schneller sein, den Code zu betrachten.
import rx
# 0,1,2,3,4 Streams
stream = rx.range(0,5)
#Druckfunktion ist 0,1,2,3,Erhalte 4 in der richtigen Reihenfolge.
stream.subscribe(print)
###Ausgabe###
# 0
# 1
# 2
# 3
# 4
#Natürlich können Sie auch Ihre eigenen definierten Ausdrücke und Lambda-Ausdrücke verarbeiten.
stream.subscribe(lambda x:print('value = '+str(x)))
###Ausgabe###
# value = 0
# value = 1
# value = 2
# value = 3
# value = 4
#Streng genommen können Sie die Verarbeitung schreiben, wenn ein Fehler auftritt, und die endgültige Verarbeitung.
stream.subscribe(
on_next = lambda x:print('on_next : '+str(x)) #Eine Funktion, die Stream-Daten empfängt.
,on_error = lambda x:print('on_error : '+str(x)) #Was tun, wenn ein Fehler auftritt?
,on_completed = lambda :print('on_completed !') #Wird ausgeführt, wenn alle Daten im Stream geflossen sind.
)
###Ausgabe###
# on_next : 0
# on_next : 1
# on_next : 2
# on_next : 3
# on_next : 4
# on_completed !
Bei gewöhnlichen Reactive Extensions erfolgt die Verkettung der Methode aus dem Stream. RxPY verwendet Pipes und Operatoren, um Stream-Daten zu verarbeiten.
import rx
from rx import operators as ops
# 0,1,2,3,4 Streams
stream = rx.range(0,5)
# map
stream.pipe(
ops.map(lambda x:x*2) #Verdoppeln Sie die Daten.
).subscribe(print)
###Ausgabe###
# 0
# 2
# 4
# 6
# 8
# filter
stream.pipe(
ops.filter(lambda x:x>2) #2 Filtern Sie die folgenden Daten.
).subscribe(print)
###Ausgabe###
# 3
# 4
# zip
stream.pipe(
ops.zip(rx.range(0,10,2)) #Koppeln Sie die Daten in jedem der beiden Streams.
).subscribe(print)
###Ausgabe###
# (0, 0)
# (1, 2)
# (2, 4)
# (3, 6)
# (4, 8)
# buffer_with_count
stream.pipe(
ops.buffer_with_count(2) #Kombinieren Sie die Daten in zwei.
).subscribe(print)
###Ausgabe###
# [0, 1]
# [2, 3]
# [4]
# to_list
stream.pipe(
ops.to_list() #Listen Sie die Daten auf.
).subscribe(print)
###Ausgabe###
# [0, 1, 2, 3, 4]
#Bediener können verketten.
stream.pipe(
ops.map(lambda x:x*2) #Verdoppeln Sie die Daten.
,ops.filter(lambda x:x>2) #2 Filtern Sie die folgenden Daten.
,ops.map(lambda x:str(x)) #Konvertieren Sie Daten in Zeichen.
).subscribe(lambda x:print('value = '+x))
###Ausgabe###
# value = 4
# value = 6
# value = 8
#Ein, wenn während der Verarbeitung ein Fehler auftritt_Ein Fehler wird ausgeführt und es werden keine weiteren Daten verarbeitet.
stream.pipe(
ops.map(lambda x:1/(x-2)) #Wenn 2 gespielt wird, tritt ein Fehler bei der Division auf Null auf.
).subscribe(
on_next = print
,on_error = lambda x: print(x)
)
###Ausgabe###
# -0.5
# -1.0
# division by zero
Es gibt einige Betreiber. Finden und verwenden Sie diejenige, die Ihrem Zweck entspricht. https://rxpy.readthedocs.io/en/latest/reference_operators.html
Bisher haben wir vorhandene Daten in Streams konvertiert. Hier erklären wir, wie Sie jederzeit Daten an den Stream senden können.
import rx
from rx.subject import Subject
#Verwenden Sie Betreff, um einen speziellen Stream zu erstellen, mit dem Daten jederzeit fließen können.
stream = Subject()
# on_Daten können mit next gestreamt werden.
#Dieser Stream ist jedoch nicht abonniert, sodass nichts passiert.
stream.on_next(1)
#Einmal abonniert, erhalten Sie es jedes Mal, wenn Daten fließen.
d = stream.subscribe(print)
stream.on_next(1)
###Ausgabe###
# 1
stream.on_next(2)
###Ausgabe###
# 2
#Bei Abmeldung entsorgen.
d.dispose()
stream.on_next(2)
#Es ist auch möglich, mehrere zu abonnieren. Gefühl zu senden.
d1 = stream.subscribe(lambda x:print('subscriber 1 got '+str(x)))
d2 = stream.subscribe(lambda x:print('subscriber 2 got '+str(x)))
d3 = stream.subscribe(lambda x:print('subscriber 3 got '+str(x)))
stream.on_next(1)
###Ausgabe###
# subscriber 1 got 1
# subscriber 2 got 1
# subscriber 3 got 1
#Wenn Sie nicht über unnötige Abonnenten verfügen, werden diese für immer abonniert.
d1.dispose()
d2.dispose()
d3.dispose()
#Es ist auch möglich, den Stream zu verarbeiten und zu abonnieren
stream.pipe(
ops.filter(lambda x:x%2==0) #Filtern Sie nach einem Vielfachen von 2
).subscribe(lambda x:print(str(x)+' is a multiple of 2'))
stream.pipe(
ops.filter(lambda x:x%3==0) #Filtern Sie nach Vielfachen von 3
).subscribe(lambda x:print(str(x)+' is a multiple of 3'))
stream.on_next(2)
###Ausgabe###
# 2 is a multiple of 2
stream.on_next(3)
###Ausgabe###
# 3 is a multiple of 3
stream.on_next(6)
###Ausgabe###
# 6 is a multiple of 2
# 6 is a multiple of 3
#Durch die Entsorgung des Motivs werden Ressourcen freigesetzt.
#Alles, was Sie abonnieren, ist ebenfalls verfügbar.
#Wenn Sie entsorgen, können Sie keine Daten streamen.
stream.dispose()
Es gibt auch verschiedene Arten von Themen. Bitte verwenden Sie diejenige, die Ihrem Zweck entspricht. https://rxpy.readthedocs.io/en/latest/reference_subject.html
Steuern Sie, wann und wie es abonniert wird.
import rx
from rx import operators as ops
import time
import random
from rx.subject import Subject
from rx.scheduler import NewThreadScheduler
from rx.scheduler import CurrentThreadScheduler
def f(s):
time.sleep(1*random.random())
print(s)
stream = Subject()
#Stellen Sie den Scheduler so ein, dass er im aktuellen Thread ausgeführt wird.
#Subscribe wird einzeln im selben Thread ausgeführt.
stream_with_scheduler = stream.pipe(
ops.observe_on(CurrentThreadScheduler()) #Scheduler-Einstellungen
)
stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))
stream.on_next(1)
#Der CurrentThreadScheduler ist derselbe wie der Standardplaner, daher bleibt das Verhalten gleich.
###Ausgabe###
# 1
# 2
# 3
stream.dispose()
stream = Subject()
#Richten Sie einen Scheduler ein, der auf einem neuen Thread ausgeführt werden soll
stream_with_scheduler = stream.pipe(
ops.observe_on(NewThreadScheduler()) #Scheduler-Einstellungen
)
stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))
stream.on_next(1)
#Es wird auf einem neuen Thread ausgeführt, sodass alle gleichzeitig ausgeführt werden.
###Ausgabe###
# 2
# 3
# 1
stream.dispose()
Es gibt auch mehrere Scheduler. Verwenden Sie diejenige, die Ihrem Zweck entspricht. https://rxpy.readthedocs.io/en/latest/reference_scheduler.html
Ich werde erklären, wie man asynchrone Verarbeitung mit dem bisherigen Wissen gut macht.
Wenn Sie zeitaufwändige Prozesse wie HTTP-Anforderungen oder schwere Vorgänge haben, ist es möglicherweise besser, diese parallel auszuführen, als sie nacheinander auszuführen. Das Problem wird dann zur Verarbeitungsabhängigkeit. Hier werden wir eine Besprechungsmethode als eine Lösung vorstellen.
import rx
from rx import operators as ops
from rx.subject import Subject
import threading
import time
import random
stream = Subject()
#Zeitaufwändiger Prozess
def f1():
time.sleep(5*random.random())
print('f1 done.')
stream.on_next(1)
def f2():
time.sleep(5*random.random())
print('f2 done.')
stream.on_next(1)
def f3():
time.sleep(5*random.random())
print('f3 done.')
stream.on_next(1)
def f4():
time.sleep(5*random.random())
print('f4 done.')
stream.on_next(1)
def f5():
time.sleep(5*random.random())
print('f5 done.')
stream.on_next(1)
stream.pipe(
ops.buffer_with_count(5) #Bevorratet, bis 5 Daten im Stream fließen.
).subscribe(lambda x:print('All done.')) #Es wird nach 5 Datenflüssen im Stream ausgeführt. Das heißt, f1~Es wird ausgeführt, nachdem alle f5 beendet sind.
#Da dies ein zeitaufwändiger Prozess ist, werden alle gleichzeitig ausgeführt.
for f in [f1,f2,f3,f4,f5]:
threading.Thread(target=f).start()
###Ausgabe###
# f5 done.
# f4 done.
# f1 done.
# f3 done.
# f2 done.
# All done.
Hier ist der angenommene Fall.
Es ist ein wenig willkürlich, aber vergib mir, weil es ein einfaches Beispiel ist ...
Streamen Sie zunächst den Tastaturstatus. Insbesondere wird die folgende unerschöpfliche Ausgabe True oder False gestreamt.
while True:
print(keyboard.is_pressed('enter'))
###Ausgabe###
# True
# True
# True
# True
# False
# False
# ...
↓ Zum Streamen geändert
import rx
from rx import operators as ops
from rx.subject import Subject
enter_state_stream = Subject()
while True:
# enter_state_Streamen Sie den Status der Eingabetaste, um zu streamen
enter_state_stream.on_next(keyboard.is_pressed('enter'))
Da ich nicht abonniert habe, passiert nichts so wie es ist. Wir werden on_press als Ausgangspunkt implementieren.
import rx
from rx import operators as ops
from rx.subject import Subject
enter_state_stream = Subject()
# on_press
enter_state_stream.pipe(
ops.buffer_with_count(2,1) #Holen Sie sich zwei Daten.
,ops.filter(lambda x: x==[False,True]) #In dem Moment, in dem Sie darauf drücken, ist Falsch,Echte Datenflüsse.
).subscribe(lambda x: print('on_press'))
while True:
enter_state_stream.on_next(keyboard.is_pressed('enter'))
###Ausgabe(Bei jedem Drücken der Eingabetaste_Drücken Sie wird angezeigt)###
# on_press
# on_press
# on_press
# on_press
Da on_release auf die gleiche Weise implementiert werden kann, überspringen Sie es einmal. Als nächstes implementieren wir on_double_press.
import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime
enter_state_stream = Subject()
on_press_stream = enter_state_stream.pipe(
ops.buffer_with_count(2,1) #Holen Sie sich die ersten beiden Daten.
,ops.filter(lambda x: x==[False,True]) #In dem Moment, in dem Sie darauf drücken, ist Falsch,Echte Datenflüsse.
)
# on_double_press
on_press_stream.pipe(
ops.timestamp() # on_Fügen Sie einen Zeitstempel zum Drücken hinzu
,ops.buffer_with_count(2,1) # on_Schauen Sie sich die Presse zwei mal zwei an
,ops.map(lambda x:x[1][1]-x[0][1]) #Zwei auf_In Zeitintervall konvertieren
,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2)) # on_Drücken Sie das Zeitintervall ist 0.Filtern Sie in weniger als 2 Sekunden
).subscribe(lambda x: print('on_double_press'))
while True:
enter_state_stream.on_next(keyboard.is_pressed('enter'))
###Ausgabe(Wird jedes Mal angezeigt, wenn die Eingabetaste kontinuierlich gedrückt wird)###
# on_double_press
# on_double_press
# on_double_press
# on_double_press
Sie haben jetzt on_double_press implementiert. Lassen Sie es uns schließlich in einer schönen Klasse zusammenstellen, während Sie es asynchron verarbeiten.
import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime
import threading
class Enter:
enter_state_stream = None
on_press_stream = None
on_release_stream = None
on_double_press = None
def __init__(self):
self.enter_state_stream = Subject()
self.on_press_stream = self.enter_state_stream.pipe(
ops.buffer_with_count(2,1)
,ops.filter(lambda x: x==[False,True])
)
self.on_release_stream = self.enter_state_stream.pipe(
ops.buffer_with_count(2,1)
,ops.filter(lambda x: x==[True,False])
)
self.on_double_press = self.on_press_stream.pipe(
ops.timestamp()
,ops.buffer_with_count(2,1)
,ops.map(lambda x:x[1][1]-x[0][1])
,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2))
)
def f():
while True:
self.enter_state_stream.on_next(keyboard.is_pressed('enter'))
threading.Thread(target=f).start()
def main():
enter = Enter()
#Sie können die Ereignisverarbeitung so schreiben!
enter.on_double_press.subscribe(lambda x:print('on_double_press'))
Wenn Sie möchten, schreiben Sie bitte etwas. Ich wäre Ihnen dankbar, wenn Sie einen Kommentar abgeben könnten. Bitte lassen Sie mich wissen, wenn etwas nicht stimmt.
Recommended Posts