[Python] Mit RxPY (3.0.1) gelernte reaktive Erweiterungen [Rx]

TL;DR Als ich versuchte, RxPY zu verwenden, weil ich Rx in Python schreiben wollte, war es anders als ich dachte, dass es bequem zu verwenden wäre. Anscheinend wird in der neuen Spezifikation die beobachtbare Verkettung unter Verwendung eines Mechanismus namens Rohr geschrieben. https://rxpy.readthedocs.io/en/latest/migration.html

Konventionell ↓

observable_object.map(lambda x:x*2) \
                 .filter(lambda x:x>3) \
                 .subscribe(print) \

Derzeit ↓

from rx import operators as ops
observable_object.pipe(
    ops.map(lambda x:x*2),
    ops.filter(lambda x:x>3)
).subscribe(print)

Ich wollte das nur vermitteln.

Es gibt jedoch nicht zu viele Artikel über RxPY auf Japanisch, daher werde ich RxPY zusammenfassen, um Rx an Anfänger-Pythonisten zu verbreiten.

Was sind reaktive Erweiterungen?

Es ist eine API zum intelligenten Schreiben einer asynchronen Verarbeitung, indem Daten in einem Observable-Stream verarbeitet werden, der eine Linq-ähnliche Verarbeitung ausführen kann. http://reactivex.io/ Es kann in den meisten wichtigen Sprachen verwendet werden. http://reactivex.io/languages.html Für Python unterstützt RxPY dies. https://rxpy.readthedocs.io/en/latest/index.html

Es gibt bereits viele Erklärungen zu dem Konzept, daher werde ich es der offiziellen Seite und anderen Qiita-Artikeln überlassen, aber ich werde es hier nicht tun. Stattdessen zeige ich Ihnen den primitiven Code für RxPY.

Wie erstelle ich einen Stream?

Reactive Extensions behandelt Daten als Streams. Umgekehrt müssen die Daten, mit denen Reactive Extensions arbeiten sollen, in einen Stream konvertiert werden.

import rx

# 0,1,2,3,Generiere 4 Streams.
rx.range(0,5) 

# 'aaa','bbb','ccc'So generieren Sie einen Stream von.
rx.of('aaa','bbb','ccc')

#Konvertieren Sie eine Liste in einen Stream.
l = [0,1,2,3,4]
rx.from_(l)

Verwenden Sie Stream-Daten

Wir werden alle fließenden Daten der Reihe nach verwenden. Reactive Extensions abonniert bei Verwendung von Stream-Daten. Es kann schneller sein, den Code zu betrachten.

import rx

# 0,1,2,3,4 Streams
stream = rx.range(0,5)

#Druckfunktion ist 0,1,2,3,Erhalte 4 in der richtigen Reihenfolge.
stream.subscribe(print) 
###Ausgabe###
# 0
# 1
# 2
# 3
# 4

#Natürlich können Sie auch Ihre eigenen definierten Ausdrücke und Lambda-Ausdrücke verarbeiten.
stream.subscribe(lambda x:print('value = '+str(x)))
###Ausgabe###
# value = 0
# value = 1
# value = 2
# value = 3
# value = 4

#Streng genommen können Sie die Verarbeitung schreiben, wenn ein Fehler auftritt, und die endgültige Verarbeitung.
stream.subscribe(
    on_next = lambda x:print('on_next : '+str(x)) #Eine Funktion, die Stream-Daten empfängt.
    ,on_error = lambda x:print('on_error : '+str(x)) #Was tun, wenn ein Fehler auftritt?
    ,on_completed = lambda :print('on_completed !') #Wird ausgeführt, wenn alle Daten im Stream geflossen sind.
)
###Ausgabe###
# on_next : 0
# on_next : 1
# on_next : 2
# on_next : 3
# on_next : 4
# on_completed !

Datenverarbeitung streamen

Bei gewöhnlichen Reactive Extensions erfolgt die Verkettung der Methode aus dem Stream. RxPY verwendet Pipes und Operatoren, um Stream-Daten zu verarbeiten.

import rx
from rx import operators as ops

# 0,1,2,3,4 Streams
stream = rx.range(0,5)

# map
stream.pipe(
    ops.map(lambda x:x*2) #Verdoppeln Sie die Daten.
).subscribe(print)
###Ausgabe###
# 0
# 2
# 4
# 6
# 8

# filter
stream.pipe(
    ops.filter(lambda x:x>2) #2 Filtern Sie die folgenden Daten.
).subscribe(print)
###Ausgabe###
# 3
# 4

# zip
stream.pipe(
    ops.zip(rx.range(0,10,2)) #Koppeln Sie die Daten in jedem der beiden Streams.
).subscribe(print)
###Ausgabe###
# (0, 0)
# (1, 2)
# (2, 4)
# (3, 6)
# (4, 8)

# buffer_with_count
stream.pipe(
    ops.buffer_with_count(2) #Kombinieren Sie die Daten in zwei.
).subscribe(print)
###Ausgabe###
# [0, 1]
# [2, 3]
# [4]

# to_list
stream.pipe(
    ops.to_list() #Listen Sie die Daten auf.
).subscribe(print)
###Ausgabe###
# [0, 1, 2, 3, 4]

#Bediener können verketten.
stream.pipe(
    ops.map(lambda x:x*2) #Verdoppeln Sie die Daten.
    ,ops.filter(lambda x:x>2) #2 Filtern Sie die folgenden Daten.
    ,ops.map(lambda x:str(x)) #Konvertieren Sie Daten in Zeichen.
).subscribe(lambda x:print('value = '+x))
###Ausgabe###
# value = 4
# value = 6
# value = 8

#Ein, wenn während der Verarbeitung ein Fehler auftritt_Ein Fehler wird ausgeführt und es werden keine weiteren Daten verarbeitet.
stream.pipe(
    ops.map(lambda x:1/(x-2)) #Wenn 2 gespielt wird, tritt ein Fehler bei der Division auf Null auf.
).subscribe(
    on_next = print
    ,on_error = lambda x: print(x)
)
###Ausgabe###
# -0.5
# -1.0
# division by zero

Es gibt einige Betreiber. Finden und verwenden Sie diejenige, die Ihrem Zweck entspricht. https://rxpy.readthedocs.io/en/latest/reference_operators.html

Daten streamen

Bisher haben wir vorhandene Daten in Streams konvertiert. Hier erklären wir, wie Sie jederzeit Daten an den Stream senden können.

import rx
from rx.subject import Subject

#Verwenden Sie Betreff, um einen speziellen Stream zu erstellen, mit dem Daten jederzeit fließen können.
stream = Subject()

# on_Daten können mit next gestreamt werden.
#Dieser Stream ist jedoch nicht abonniert, sodass nichts passiert.
stream.on_next(1)

#Einmal abonniert, erhalten Sie es jedes Mal, wenn Daten fließen.
d = stream.subscribe(print)
stream.on_next(1)
###Ausgabe###
# 1
stream.on_next(2)
###Ausgabe###
# 2

#Bei Abmeldung entsorgen.
d.dispose()
stream.on_next(2)

#Es ist auch möglich, mehrere zu abonnieren. Gefühl zu senden.
d1 = stream.subscribe(lambda x:print('subscriber 1 got '+str(x)))
d2 = stream.subscribe(lambda x:print('subscriber 2 got '+str(x)))
d3 = stream.subscribe(lambda x:print('subscriber 3 got '+str(x)))
stream.on_next(1)
###Ausgabe###
# subscriber 1 got 1
# subscriber 2 got 1
# subscriber 3 got 1

#Wenn Sie nicht über unnötige Abonnenten verfügen, werden diese für immer abonniert.
d1.dispose()
d2.dispose()
d3.dispose()

#Es ist auch möglich, den Stream zu verarbeiten und zu abonnieren
stream.pipe(
    ops.filter(lambda x:x%2==0) #Filtern Sie nach einem Vielfachen von 2
).subscribe(lambda x:print(str(x)+' is a multiple of 2'))
stream.pipe(
    ops.filter(lambda x:x%3==0) #Filtern Sie nach Vielfachen von 3
).subscribe(lambda x:print(str(x)+' is a multiple of 3'))
stream.on_next(2)
###Ausgabe###
# 2 is a multiple of 2
stream.on_next(3)
###Ausgabe###
# 3 is a multiple of 3
stream.on_next(6)
###Ausgabe###
# 6 is a multiple of 2
# 6 is a multiple of 3

#Durch die Entsorgung des Motivs werden Ressourcen freigesetzt.
#Alles, was Sie abonnieren, ist ebenfalls verfügbar.
#Wenn Sie entsorgen, können Sie keine Daten streamen.
stream.dispose()

Es gibt auch verschiedene Arten von Themen. Bitte verwenden Sie diejenige, die Ihrem Zweck entspricht. https://rxpy.readthedocs.io/en/latest/reference_subject.html

Planung

Steuern Sie, wann und wie es abonniert wird.

import rx
from rx import operators as ops
import time
import random
from rx.subject import Subject
from rx.scheduler import NewThreadScheduler
from rx.scheduler import CurrentThreadScheduler

def f(s):
    time.sleep(1*random.random())
    print(s)

stream = Subject()

#Stellen Sie den Scheduler so ein, dass er im aktuellen Thread ausgeführt wird.
#Subscribe wird einzeln im selben Thread ausgeführt.
stream_with_scheduler = stream.pipe(
    ops.observe_on(CurrentThreadScheduler()) #Scheduler-Einstellungen
)

stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))

stream.on_next(1)
#Der CurrentThreadScheduler ist derselbe wie der Standardplaner, daher bleibt das Verhalten gleich.
###Ausgabe###
# 1
# 2
# 3

stream.dispose()
stream = Subject()

#Richten Sie einen Scheduler ein, der auf einem neuen Thread ausgeführt werden soll
stream_with_scheduler = stream.pipe(
    ops.observe_on(NewThreadScheduler()) #Scheduler-Einstellungen
)

stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))

stream.on_next(1)
#Es wird auf einem neuen Thread ausgeführt, sodass alle gleichzeitig ausgeführt werden.
###Ausgabe###
# 2
# 3
# 1

stream.dispose()

Es gibt auch mehrere Scheduler. Verwenden Sie diejenige, die Ihrem Zweck entspricht. https://rxpy.readthedocs.io/en/latest/reference_scheduler.html

Asynchrone Verarbeitung

Ich werde erklären, wie man asynchrone Verarbeitung mit dem bisherigen Wissen gut macht.

1. Treffen

Wenn Sie zeitaufwändige Prozesse wie HTTP-Anforderungen oder schwere Vorgänge haben, ist es möglicherweise besser, diese parallel auszuführen, als sie nacheinander auszuführen. Das Problem wird dann zur Verarbeitungsabhängigkeit. Hier werden wir eine Besprechungsmethode als eine Lösung vorstellen.

import rx
from rx import operators as ops
from rx.subject import Subject
import threading
import time
import random

stream = Subject()

#Zeitaufwändiger Prozess
def f1():
    time.sleep(5*random.random())
    print('f1 done.')
    stream.on_next(1)
def f2():
    time.sleep(5*random.random())
    print('f2 done.')
    stream.on_next(1)
def f3():
    time.sleep(5*random.random())
    print('f3 done.')
    stream.on_next(1)
def f4():
    time.sleep(5*random.random())
    print('f4 done.')
    stream.on_next(1)
def f5():
    time.sleep(5*random.random())
    print('f5 done.')
    stream.on_next(1)

stream.pipe(
    ops.buffer_with_count(5) #Bevorratet, bis 5 Daten im Stream fließen.
).subscribe(lambda x:print('All done.')) #Es wird nach 5 Datenflüssen im Stream ausgeführt. Das heißt, f1~Es wird ausgeführt, nachdem alle f5 beendet sind.

#Da dies ein zeitaufwändiger Prozess ist, werden alle gleichzeitig ausgeführt.
for f in [f1,f2,f3,f4,f5]:
    threading.Thread(target=f).start()

###Ausgabe###
# f5 done.
# f4 done.
# f1 done.
# f3 done.
# f2 done.
# All done.

2. Ereignisverarbeitung

Hier ist der angenommene Fall.

Es ist ein wenig willkürlich, aber vergib mir, weil es ein einfaches Beispiel ist ...

Streamen Sie zunächst den Tastaturstatus. Insbesondere wird die folgende unerschöpfliche Ausgabe True oder False gestreamt.

while True:
    print(keyboard.is_pressed('enter'))
###Ausgabe###
# True
# True
# True
# True
# False
# False
# ...

↓ Zum Streamen geändert

import rx
from rx import operators as ops
from rx.subject import Subject

enter_state_stream = Subject()
while True:
    # enter_state_Streamen Sie den Status der Eingabetaste, um zu streamen
    enter_state_stream.on_next(keyboard.is_pressed('enter'))

Da ich nicht abonniert habe, passiert nichts so wie es ist. Wir werden on_press als Ausgangspunkt implementieren.

import rx
from rx import operators as ops
from rx.subject import Subject

enter_state_stream = Subject()

# on_press
enter_state_stream.pipe(
    ops.buffer_with_count(2,1) #Holen Sie sich zwei Daten.
    ,ops.filter(lambda x: x==[False,True]) #In dem Moment, in dem Sie darauf drücken, ist Falsch,Echte Datenflüsse.
).subscribe(lambda x: print('on_press'))

while True:
    enter_state_stream.on_next(keyboard.is_pressed('enter'))

###Ausgabe(Bei jedem Drücken der Eingabetaste_Drücken Sie wird angezeigt)###
# on_press
# on_press
# on_press
# on_press

Da on_release auf die gleiche Weise implementiert werden kann, überspringen Sie es einmal. Als nächstes implementieren wir on_double_press.

import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime

enter_state_stream = Subject()
on_press_stream = enter_state_stream.pipe(
    ops.buffer_with_count(2,1) #Holen Sie sich die ersten beiden Daten.
    ,ops.filter(lambda x: x==[False,True]) #In dem Moment, in dem Sie darauf drücken, ist Falsch,Echte Datenflüsse.
)

# on_double_press
on_press_stream.pipe(
    ops.timestamp() # on_Fügen Sie einen Zeitstempel zum Drücken hinzu
    ,ops.buffer_with_count(2,1) # on_Schauen Sie sich die Presse zwei mal zwei an
    ,ops.map(lambda x:x[1][1]-x[0][1]) #Zwei auf_In Zeitintervall konvertieren
    ,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2)) # on_Drücken Sie das Zeitintervall ist 0.Filtern Sie in weniger als 2 Sekunden
).subscribe(lambda x: print('on_double_press'))

while True:
    enter_state_stream.on_next(keyboard.is_pressed('enter'))
###Ausgabe(Wird jedes Mal angezeigt, wenn die Eingabetaste kontinuierlich gedrückt wird)###
# on_double_press
# on_double_press
# on_double_press
# on_double_press

Sie haben jetzt on_double_press implementiert. Lassen Sie es uns schließlich in einer schönen Klasse zusammenstellen, während Sie es asynchron verarbeiten.

import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime
import threading

class Enter:
    enter_state_stream = None
    on_press_stream = None
    on_release_stream = None
    on_double_press = None
    def __init__(self):
        self.enter_state_stream = Subject()
        self.on_press_stream = self.enter_state_stream.pipe(
            ops.buffer_with_count(2,1) 
            ,ops.filter(lambda x: x==[False,True]) 
        )
        self.on_release_stream = self.enter_state_stream.pipe(
            ops.buffer_with_count(2,1) 
            ,ops.filter(lambda x: x==[True,False]) 
        )
        self.on_double_press = self.on_press_stream.pipe(
            ops.timestamp() 
            ,ops.buffer_with_count(2,1)
            ,ops.map(lambda x:x[1][1]-x[0][1]) 
            ,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2)) 
        )
        def f():
            while True:
                self.enter_state_stream.on_next(keyboard.is_pressed('enter'))
        threading.Thread(target=f).start()

def main():
    enter = Enter()
    #Sie können die Ereignisverarbeitung so schreiben!
    enter.on_double_press.subscribe(lambda x:print('on_double_press'))

schließlich

Wenn Sie möchten, schreiben Sie bitte etwas. Ich wäre Ihnen dankbar, wenn Sie einen Kommentar abgeben könnten. Bitte lassen Sie mich wissen, wenn etwas nicht stimmt.

Recommended Posts

[Python] Mit RxPY (3.0.1) gelernte reaktive Erweiterungen [Rx]
[Python] Mit Pokemon erlernte objektorientierte Programmierung
Perceptron-Lernexperiment mit Python
Python-Datenstruktur mit Chemoinfomatik gelernt
Effiziente Netzaufnahme mit Python
1. Mit Python 1-1 gelernte Statistiken. Grundlegende Statistiken (Pandas)
1. Mit Python 1-3 gelernte Statistiken. Berechnung verschiedener Statistiken (Statistiken)
FizzBuzz in Python3
Scraping mit Python
Statistik mit Python
Scraping mit Python
Python mit Go
Twilio mit Python
In Python integrieren
Spielen Sie mit 2016-Python
AES256 mit Python
Getestet mit Python
1. Mit Python 1-2 gelernte Statistiken. Berechnung verschiedener Statistiken (Numpy)
Python beginnt mit ()
mit Syntax (Python)
Reactive Extensions üben
Bingo mit Python
Zundokokiyoshi mit Python
Verwenden Sie Python und word2vec (gelernt) mit Azure Databricks
1. Mit Python 2-1 gelernte Statistiken. Wahrscheinlichkeitsverteilung [diskrete Variable]
Excel mit Python
Mikrocomputer mit Python
Lernen Sie langsam mit Python "Prinzip der Abhängigkeitsumkehr"
Mit Python besetzen
Ich habe Python mit einem schönen Mädchen in Paiza # 02 gelernt
Ich habe Python mit einem schönen Mädchen in Paiza # 01 gelernt
Serielle Kommunikation mit Python
Zip, entpacken mit Python
Django 1.11 wurde mit Python3.6 gestartet
Python mit Eclipse + PyDev.
Socket-Kommunikation mit Python
Datenanalyse mit Python 2
Scraping in Python (Vorbereitung)
Versuchen Sie es mit Python.
Python lernen mit ChemTHEATER 03
Sequentielle Suche mit Python
"Objektorientiert" mit Python gelernt
Führen Sie Python mit VBA aus