[Python] Extensions réactives apprises avec RxPY (3.0.1) [Rx]

TL;DR Quand j'ai essayé d'utiliser RxPY parce que je voulais écrire Rx en Python, c'était différent de ce que je pensais qu'il serait confortable à utiliser. Apparemment, dans la nouvelle spécification, le chaînage observable est écrit à l'aide d'un mécanisme appelé pipe. https://rxpy.readthedocs.io/en/latest/migration.html

Conventionnel ↓

observable_object.map(lambda x:x*2) \
                 .filter(lambda x:x>3) \
                 .subscribe(print) \

Actuellement ↓

from rx import operators as ops
observable_object.pipe(
    ops.map(lambda x:x*2),
    ops.filter(lambda x:x>3)
).subscribe(print)

Je voulais juste transmettre ceci.

Cependant, il n'y a pas trop d'articles sur RxPY en japonais, donc je vais résumer RxPY afin de diffuser Rx aux pythonistes débutants.

Que sont les extensions réactives?

C'est une API pour écrire intelligemment un traitement asynchrone en gérant des données dans un flux observable qui peut effectuer un traitement de type Linq. http://reactivex.io/ Il peut être utilisé dans la plupart des langues principales. http://reactivex.io/languages.html Pour Python, RxPY est pris en charge. https://rxpy.readthedocs.io/en/latest/index.html

Il y a déjà beaucoup d'explications sur le concept, donc je vais laisser la page officielle et d'autres articles de Qiita, mais je ne le ferai pas ici. Au lieu de cela, je vais vous montrer le code primitif de RxPY.

Comment créer un flux

Reactive Extensions traite les données comme des flux. Inversement, les données avec lesquelles vous souhaitez que les extensions réactives fonctionnent doivent être converties en flux.

import rx

# 0,1,2,3,Générez 4 flux.
rx.range(0,5) 

# 'aaa','bbb','ccc'Pour générer un flux de.
rx.of('aaa','bbb','ccc')

#Convertissez une liste en flux.
l = [0,1,2,3,4]
rx.from_(l)

Utiliser les données de flux

Nous utiliserons chacune des données en circulation dans l'ordre. Reactive Extensions s'abonne lors de l'utilisation de données de flux. Il peut être plus rapide de consulter le code.

import rx

# 0,1,2,3,4 flux
stream = rx.range(0,5)

#la fonction d'impression est 0,1,2,3,Recevez 4 dans l'ordre.
stream.subscribe(print) 
###production###
# 0
# 1
# 2
# 3
# 4

#Bien sûr, vous pouvez également gérer vos propres expressions définies et expressions lambda.
stream.subscribe(lambda x:print('value = '+str(x)))
###production###
# value = 0
# value = 1
# value = 2
# value = 3
# value = 4

#Plus strictement, vous pouvez écrire le traitement lorsqu'une erreur se produit et le traitement final.
stream.subscribe(
    on_next = lambda x:print('on_next : '+str(x)) #Une fonction qui reçoit des données de flux.
    ,on_error = lambda x:print('on_error : '+str(x)) #Que faire lorsqu'une erreur se produit.
    ,on_completed = lambda :print('on_completed !') #Exécuté lorsque toutes les données du flux ont circulé.
)
###production###
# on_next : 0
# on_next : 1
# on_next : 2
# on_next : 3
# on_next : 4
# on_completed !

Traitement des données de flux

Avec les extensions réactives ordinaires, le chaînage des méthodes est effectué à partir du flux, RxPY utilise des tubes et des opérateurs pour traiter les données de flux.

import rx
from rx import operators as ops

# 0,1,2,3,4 flux
stream = rx.range(0,5)

# map
stream.pipe(
    ops.map(lambda x:x*2) #Doublez les données.
).subscribe(print)
###production###
# 0
# 2
# 4
# 6
# 8

# filter
stream.pipe(
    ops.filter(lambda x:x>2) #2 Filtrez les données suivantes.
).subscribe(print)
###production###
# 3
# 4

# zip
stream.pipe(
    ops.zip(rx.range(0,10,2)) #Associez les données dans chacun des deux flux.
).subscribe(print)
###production###
# (0, 0)
# (1, 2)
# (2, 4)
# (3, 6)
# (4, 8)

# buffer_with_count
stream.pipe(
    ops.buffer_with_count(2) #Combinez les données en deux.
).subscribe(print)
###production###
# [0, 1]
# [2, 3]
# [4]

# to_list
stream.pipe(
    ops.to_list() #Listez les données.
).subscribe(print)
###production###
# [0, 1, 2, 3, 4]

#Les opérateurs peuvent enchaîner.
stream.pipe(
    ops.map(lambda x:x*2) #Doublez les données.
    ,ops.filter(lambda x:x>2) #2 Filtrez les données suivantes.
    ,ops.map(lambda x:str(x)) #Convertissez les données en caractères.
).subscribe(lambda x:print('value = '+x))
###production###
# value = 4
# value = 6
# value = 8

#Allumé lorsqu'une erreur se produit pendant le traitement_Une erreur est exécutée et aucune autre donnée n'est traitée.
stream.pipe(
    ops.map(lambda x:1/(x-2)) #Quand 2 est joué, une erreur se produit dans la division à zéro.
).subscribe(
    on_next = print
    ,on_error = lambda x: print(x)
)
###production###
# -0.5
# -1.0
# division by zero

Il y a pas mal d'opérateurs. Trouvez et utilisez celui qui vous convient. https://rxpy.readthedocs.io/en/latest/reference_operators.html

Flux de données

Jusqu'à présent, nous convertissions les données existantes en flux. Ici, nous expliquerons comment envoyer des données au flux à tout moment.

import rx
from rx.subject import Subject

#Utilisez Subject pour créer un flux spécial qui permet aux données de circuler à tout moment.
stream = Subject()

# on_Les données peuvent être diffusées avec next.
#Mais ce flux n'est pas abonné, donc rien ne se passe.
stream.on_next(1)

#Une fois abonné, vous le recevrez à chaque flux de données.
d = stream.subscribe(print)
stream.on_next(1)
###production###
# 1
stream.on_next(2)
###production###
# 2

#Jeter lors de la désinscription.
d.dispose()
stream.on_next(2)

#Il est également possible de s'abonner à plusieurs. Envie de diffuser.
d1 = stream.subscribe(lambda x:print('subscriber 1 got '+str(x)))
d2 = stream.subscribe(lambda x:print('subscriber 2 got '+str(x)))
d3 = stream.subscribe(lambda x:print('subscriber 3 got '+str(x)))
stream.on_next(1)
###production###
# subscriber 1 got 1
# subscriber 2 got 1
# subscriber 3 got 1

#Si vous ne disposez pas d'abonnés inutiles, ils continueront à s'abonner pour toujours.
d1.dispose()
d2.dispose()
d3.dispose()

#Il est également possible de traiter le flux et de s'abonner
stream.pipe(
    ops.filter(lambda x:x%2==0) #Filtrer par multiples de 2
).subscribe(lambda x:print(str(x)+' is a multiple of 2'))
stream.pipe(
    ops.filter(lambda x:x%3==0) #Filtrer par multiples de 3
).subscribe(lambda x:print(str(x)+' is a multiple of 3'))
stream.on_next(2)
###production###
# 2 is a multiple of 2
stream.on_next(3)
###production###
# 3 is a multiple of 3
stream.on_next(6)
###production###
# 6 is a multiple of 2
# 6 is a multiple of 3

#L'élimination du sujet libère des ressources.
#Tout ce à quoi vous vous abonnez est également éliminé.
#Si vous supprimez, vous ne pourrez pas diffuser de données.
stream.dispose()

Il existe également plusieurs types de sujets. Veuillez utiliser celui qui correspond à votre objectif. https://rxpy.readthedocs.io/en/latest/reference_subject.html

Planification

Contrôlez quand et comment il est abonné.

import rx
from rx import operators as ops
import time
import random
from rx.subject import Subject
from rx.scheduler import NewThreadScheduler
from rx.scheduler import CurrentThreadScheduler

def f(s):
    time.sleep(1*random.random())
    print(s)

stream = Subject()

#Configurez le planificateur pour qu'il s'exécute dans le thread actuel.
#S'abonner est exécuté un par un dans le même thread.
stream_with_scheduler = stream.pipe(
    ops.observe_on(CurrentThreadScheduler()) #Paramètres du planificateur
)

stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))

stream.on_next(1)
#Le CurrentThreadScheduler est le même que le planificateur par défaut, de sorte que le comportement reste le même.
###production###
# 1
# 2
# 3

stream.dispose()
stream = Subject()

#Configurer un planificateur pour s'exécuter sur un nouveau thread
stream_with_scheduler = stream.pipe(
    ops.observe_on(NewThreadScheduler()) #Paramètres du planificateur
)

stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))

stream.on_next(1)
#Il fonctionne sur un nouveau thread, donc ils s'exécutent tous en même temps.
###production###
# 2
# 3
# 1

stream.dispose()

Il existe également plusieurs planificateurs. Utilisez celui qui vous convient. https://rxpy.readthedocs.io/en/latest/reference_scheduler.html

Traitement asynchrone

J'expliquerai comment faire un traitement asynchrone bien en utilisant les connaissances jusqu'à présent.

1. Réunion

Si vous avez des processus chronophages tels que des requêtes HTTP ou des opérations lourdes, il peut être préférable de les exécuter en parallèle plutôt que séquentiellement. Le problème devient alors la dépendance de traitement. Ici, nous présenterons une méthode de réunion comme solution unique.

import rx
from rx import operators as ops
from rx.subject import Subject
import threading
import time
import random

stream = Subject()

#Processus chronophage
def f1():
    time.sleep(5*random.random())
    print('f1 done.')
    stream.on_next(1)
def f2():
    time.sleep(5*random.random())
    print('f2 done.')
    stream.on_next(1)
def f3():
    time.sleep(5*random.random())
    print('f3 done.')
    stream.on_next(1)
def f4():
    time.sleep(5*random.random())
    print('f4 done.')
    stream.on_next(1)
def f5():
    time.sleep(5*random.random())
    print('f5 done.')
    stream.on_next(1)

stream.pipe(
    ops.buffer_with_count(5) #Stocké jusqu'à 5 éléments de flux de données dans le flux.
).subscribe(lambda x:print('All done.')) #Il est exécuté après 5 éléments de flux de données dans le flux. Autrement dit, f1~Il est exécuté une fois que tout f5 est terminé.

#Puisqu'il s'agit d'un processus qui prend du temps, tous sont exécutés en même temps.
for f in [f1,f2,f3,f4,f5]:
    threading.Thread(target=f).start()

###production###
# f5 done.
# f4 done.
# f1 done.
# f3 done.
# f2 done.
# All done.

2. Traitement des événements

Voici le cas supposé.

C'est un peu arbitraire, mais pardonnez-moi car c'est pour faire un exemple simple ...

Tout d'abord, diffusez l'état du clavier. Plus précisément, la sortie inépuisable suivante True ou False est diffusée.

while True:
    print(keyboard.is_pressed('enter'))
###production###
# True
# True
# True
# True
# False
# False
# ...

↓ Changé pour diffuser

import rx
from rx import operators as ops
from rx.subject import Subject

enter_state_stream = Subject()
while True:
    # enter_state_Diffuser l'état de la touche Entrée pour diffuser
    enter_state_stream.on_next(keyboard.is_pressed('enter'))

Comme je ne suis pas abonné, rien ne se passe tel quel. Nous allons implémenter on_press comme point de départ.

import rx
from rx import operators as ops
from rx.subject import Subject

enter_state_stream = Subject()

# on_press
enter_state_stream.pipe(
    ops.buffer_with_count(2,1) #Obtenez deux données.
    ,ops.filter(lambda x: x==[False,True]) #Le moment où vous appuyez dessus est faux,De vrais flux de données.
).subscribe(lambda x: print('on_press'))

while True:
    enter_state_stream.on_next(keyboard.is_pressed('enter'))

###production(Allumé chaque fois que vous appuyez sur la touche Entrée_appuyez sur s'affiche)###
# on_press
# on_press
# on_press
# on_press

Comme on_release peut être implémenté de la même manière, ignorez-le une fois. Ensuite, implémentons on_double_press.

import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime

enter_state_stream = Subject()
on_press_stream = enter_state_stream.pipe(
    ops.buffer_with_count(2,1) #Obtenez les deux premières données.
    ,ops.filter(lambda x: x==[False,True]) #Le moment où vous appuyez dessus est faux,De vrais flux de données.
)

# on_double_press
on_press_stream.pipe(
    ops.timestamp() # on_Ajouter un horodatage à appuyer
    ,ops.buffer_with_count(2,1) # on_Regardez la presse deux par deux
    ,ops.map(lambda x:x[1][1]-x[0][1]) #Deux sur_Convertir en intervalle de temps de presse
    ,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2)) # on_l'intervalle de temps de presse est de 0.Filtrer en moins de 2 secondes
).subscribe(lambda x: print('on_double_press'))

while True:
    enter_state_stream.on_next(keyboard.is_pressed('enter'))
###production(S'affiche chaque fois que la touche Entrée est enfoncée en continu)###
# on_double_press
# on_double_press
# on_double_press
# on_double_press

Vous avez maintenant implémenté on_double_press. Enfin, mettons-le ensemble dans une belle classe tout en faisant un traitement asynchrone.

import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime
import threading

class Enter:
    enter_state_stream = None
    on_press_stream = None
    on_release_stream = None
    on_double_press = None
    def __init__(self):
        self.enter_state_stream = Subject()
        self.on_press_stream = self.enter_state_stream.pipe(
            ops.buffer_with_count(2,1) 
            ,ops.filter(lambda x: x==[False,True]) 
        )
        self.on_release_stream = self.enter_state_stream.pipe(
            ops.buffer_with_count(2,1) 
            ,ops.filter(lambda x: x==[True,False]) 
        )
        self.on_double_press = self.on_press_stream.pipe(
            ops.timestamp() 
            ,ops.buffer_with_count(2,1)
            ,ops.map(lambda x:x[1][1]-x[0][1]) 
            ,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2)) 
        )
        def f():
            while True:
                self.enter_state_stream.on_next(keyboard.is_pressed('enter'))
        threading.Thread(target=f).start()

def main():
    enter = Enter()
    #Vous pouvez écrire un traitement d'événement comme celui-ci!
    enter.on_double_press.subscribe(lambda x:print('on_double_press'))

à la fin

Si vous le souhaitez, veuillez écrire quelque chose. Je vous serais reconnaissant de bien vouloir commenter. Veuillez me faire savoir si quelque chose ne va pas.

Recommended Posts

[Python] Extensions réactives apprises avec RxPY (3.0.1) [Rx]
[Python] Programmation orientée objet apprise avec Pokemon
Expérience d'apprentissage Perceptron apprise avec Python
Structure de données Python apprise avec la chimioinfomatique
Ramassage efficace du réseau avec Python
1. Statistiques apprises avec Python 1-1. Statistiques de base (Pandas)
1. Statistiques apprises avec Python 1-3. Calcul de diverses statistiques (statistiques)
FizzBuzz en Python3
Grattage avec Python
Statistiques avec python
Grattage avec Python
Python avec Go
Twilio avec Python
Intégrer avec Python
Jouez avec 2016-Python
AES256 avec python
Testé avec Python
1. Statistiques apprises avec Python 1-2. Calcul de diverses statistiques (Numpy)
python commence par ()
avec syntaxe (Python)
Pratique des extensions réactives
Bingo avec python
Zundokokiyoshi avec python
Utiliser Python et word2vec (appris) avec Azure Databricks
1. Statistiques apprises avec Python 2-1. Distribution de probabilité [variable discrète]
Excel avec Python
Micro-ordinateur avec Python
Apprenez lentement avec Python "Principe de l'inversion des dépendances"
Cast avec python
J'ai appris Python avec une belle fille à Paiza # 02
J'ai appris Python avec une belle fille à Paiza # 01
Communication série avec Python
Zip, décompressez avec python
Django 1.11 a démarré avec Python3.6
Python avec eclipse + PyDev.
Communication de socket avec Python
Analyse de données avec python 2
Grattage en Python (préparation)
Essayez de gratter avec Python.
Apprendre Python avec ChemTHEATER 03
Recherche séquentielle avec Python
"Orienté objet" appris avec python
Exécutez Python avec VBA