[Python] Reactive Extensions learned with RxPY (3.0.1) [Rx]

TL;DR When I tried using RxPY because I wanted to write Rx in Python, it was different from what I thought it would be comfortable to use. Apparently, in the new specification, observable chaining is written using a mechanism called pipe. https://rxpy.readthedocs.io/en/latest/migration.html

Conventional ↓

observable_object.map(lambda x:x*2) \
                 .filter(lambda x:x>3) \
                 .subscribe(print) \

Currently ↓

from rx import operators as ops
observable_object.pipe(
    ops.map(lambda x:x*2),
    ops.filter(lambda x:x>3)
).subscribe(print)

I just wanted to convey this.

However, there aren't many Japanese articles about RxPY, so I'll summarize RxPY in order to spread Rx to beginner Pythonists.

What are Reactive Extensions?

It is an API for smartly writing asynchronous processing by handling data in an Observable stream that can perform Linq-like processing. http://reactivex.io/ It can be used in most major languages. http://reactivex.io/languages.html For Python, RxPY supports it. https://rxpy.readthedocs.io/en/latest/index.html

There are already a lot of explanations about the concept, so I will leave it to the official page and other Qiita articles, but I will not do it here. Instead, I'll show you the primitive code for RxPY.

How to make a stream

Reactive Extensions treats data as streams. Conversely, the data you want Reactive Extensions to work with needs to be converted to a stream.

import rx

# 0,1,2,3,Generate 4 streams.
rx.range(0,5) 

# 'aaa','bbb','ccc'Generate a stream of.
rx.of('aaa','bbb','ccc')

#Convert the list to a stream.
l = [0,1,2,3,4]
rx.from_(l)

Use stream data

We will use each of the flowing data in order. Reactive Extensions subscribes when using stream data. It may be faster to look at the code.

import rx

# 0,1,2,3,4 streams
stream = rx.range(0,5)

#print function is 0,1,2,3,Receive 4 in order.
stream.subscribe(print) 
###output###
# 0
# 1
# 2
# 3
# 4

#Of course, you can also handle your own defined expressions and lambda expressions.
stream.subscribe(lambda x:print('value = '+str(x)))
###output###
# value = 0
# value = 1
# value = 2
# value = 3
# value = 4

#More precisely, you can write the processing when an error occurs and the final processing.
stream.subscribe(
    on_next = lambda x:print('on_next : '+str(x)) #A function that receives data from a stream.
    ,on_error = lambda x:print('on_error : '+str(x)) #What to do when an error occurs.
    ,on_completed = lambda :print('on_completed !') #Executed when all the data in the stream has flowed.
)
###output###
# on_next : 0
# on_next : 1
# on_next : 2
# on_next : 3
# on_next : 4
# on_completed !

Stream data processing

With ordinary Reactive Extensions, method chaining is done from stream, RxPY processes stream data using pipes and operators.

import rx
from rx import operators as ops

# 0,1,2,3,4 streams
stream = rx.range(0,5)

# map
stream.pipe(
    ops.map(lambda x:x*2) #Double the data.
).subscribe(print)
###output###
# 0
# 2
# 4
# 6
# 8

# filter
stream.pipe(
    ops.filter(lambda x:x>2) #2 Filter the following data.
).subscribe(print)
###output###
# 3
# 4

# zip
stream.pipe(
    ops.zip(rx.range(0,10,2)) #Pair the data in each of the two streams.
).subscribe(print)
###output###
# (0, 0)
# (1, 2)
# (2, 4)
# (3, 6)
# (4, 8)

# buffer_with_count
stream.pipe(
    ops.buffer_with_count(2) #Combine the data into two pieces.
).subscribe(print)
###output###
# [0, 1]
# [2, 3]
# [4]

# to_list
stream.pipe(
    ops.to_list() #List the data.
).subscribe(print)
###output###
# [0, 1, 2, 3, 4]

#Operators can chain.
stream.pipe(
    ops.map(lambda x:x*2) #Double the data.
    ,ops.filter(lambda x:x>2) #2 Filter the following data.
    ,ops.map(lambda x:str(x)) #Convert data to characters.
).subscribe(lambda x:print('value = '+x))
###output###
# value = 4
# value = 6
# value = 8

#On when an error occurs during processing_An error is executed and no further data is processed.
stream.pipe(
    ops.map(lambda x:1/(x-2)) #An error occurs in division by zero when 2 is played.
).subscribe(
    on_next = print
    ,on_error = lambda x: print(x)
)
###output###
# -0.5
# -1.0
# division by zero

There are quite a few operators. Find and use the one that suits your purpose. https://rxpy.readthedocs.io/en/latest/reference_operators.html

Stream data

Until now, we were converting existing data into a stream. Here, we will explain how to stream data to the stream at any time.

import rx
from rx.subject import Subject

#Use Subject to create a special stream that allows data to flow at any time.
stream = Subject()

# on_Data can be streamed with next.
#But this stream isn't subscribed so nothing happens.
stream.on_next(1)

#Once subscribed, you will receive it every time data flows.
d = stream.subscribe(print)
stream.on_next(1)
###output###
# 1
stream.on_next(2)
###output###
# 2

#Dispose when unsubscribing.
d.dispose()
stream.on_next(2)

#It is also possible to subscribe to multiple. Feeling to broadcast.
d1 = stream.subscribe(lambda x:print('subscriber 1 got '+str(x)))
d2 = stream.subscribe(lambda x:print('subscriber 2 got '+str(x)))
d3 = stream.subscribe(lambda x:print('subscriber 3 got '+str(x)))
stream.on_next(1)
###output###
# subscriber 1 got 1
# subscriber 2 got 1
# subscriber 3 got 1

#If you don't dispose unnecessary subscribers, they will continue to subscribe forever.
d1.dispose()
d2.dispose()
d3.dispose()

#It is also possible to process the stream and subscribe
stream.pipe(
    ops.filter(lambda x:x%2==0) #Filter in multiples of 2
).subscribe(lambda x:print(str(x)+' is a multiple of 2'))
stream.pipe(
    ops.filter(lambda x:x%3==0) #Filter by multiples of 3
).subscribe(lambda x:print(str(x)+' is a multiple of 3'))
stream.on_next(2)
###output###
# 2 is a multiple of 2
stream.on_next(3)
###output###
# 3 is a multiple of 3
stream.on_next(6)
###output###
# 6 is a multiple of 2
# 6 is a multiple of 3

#Disposing the subject frees resources.
#Everything you subscribe to is also dispose.
#If you dispose, you will not be able to stream the data.
stream.dispose()

There are also several types of subjects. Please use the one that suits your purpose. https://rxpy.readthedocs.io/en/latest/reference_subject.html

Scheduling

Control when and how it is subscribed.

import rx
from rx import operators as ops
import time
import random
from rx.subject import Subject
from rx.scheduler import NewThreadScheduler
from rx.scheduler import CurrentThreadScheduler

def f(s):
    time.sleep(1*random.random())
    print(s)

stream = Subject()

#Set the scheduler to run in the current thread.
#Subscribe is executed one by one in the same thread.
stream_with_scheduler = stream.pipe(
    ops.observe_on(CurrentThreadScheduler()) #Scheduler settings
)

stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))

stream.on_next(1)
#The CurrentThreadScheduler is the same as the default scheduler, so the behavior is the same.
###output###
# 1
# 2
# 3

stream.dispose()
stream = Subject()

#Set up a scheduler to run on a new thread
stream_with_scheduler = stream.pipe(
    ops.observe_on(NewThreadScheduler()) #Scheduler settings
)

stream_with_scheduler.subscribe(lambda x:f('1'))
stream_with_scheduler.subscribe(lambda x:f('2'))
stream_with_scheduler.subscribe(lambda x:f('3'))

stream.on_next(1)
#It runs on a new thread, so they all run at the same time.
###output###
# 2
# 3
# 1

stream.dispose()

There are also several schedulers. Use the one that suits your purpose. https://rxpy.readthedocs.io/en/latest/reference_scheduler.html

Asynchronous processing

I will explain how to do asynchronous processing nicely using the knowledge so far.

1. Meeting

If you have time-consuming processes such as HTTP requests or heavy operations, it may be better to execute them in parallel rather than sequentially. The problem is the processing dependency. Here, we will introduce a meeting method as one solution.

import rx
from rx import operators as ops
from rx.subject import Subject
import threading
import time
import random

stream = Subject()

#Time-consuming process
def f1():
    time.sleep(5*random.random())
    print('f1 done.')
    stream.on_next(1)
def f2():
    time.sleep(5*random.random())
    print('f2 done.')
    stream.on_next(1)
def f3():
    time.sleep(5*random.random())
    print('f3 done.')
    stream.on_next(1)
def f4():
    time.sleep(5*random.random())
    print('f4 done.')
    stream.on_next(1)
def f5():
    time.sleep(5*random.random())
    print('f5 done.')
    stream.on_next(1)

stream.pipe(
    ops.buffer_with_count(5) #It is stocked until 5 pieces of data flow in the stream.
).subscribe(lambda x:print('All done.')) #It is executed after 5 data flows in the stream. That is, f1~It will be executed after all f5 is finished.

#Since it is a time-consuming process, all are executed at the same time.
for f in [f1,f2,f3,f4,f5]:
    threading.Thread(target=f).start()

###output###
# f5 done.
# f4 done.
# f1 done.
# f3 done.
# f2 done.
# All done.

2. Event processing

Here is the assumed case.

It's a little arbitrary, but forgive me because it's to make a simple example ...

First, stream the keyboard state. Specifically, the following inexhaustible output True or False is streamed.

while True:
    print(keyboard.is_pressed('enter'))
###output###
# True
# True
# True
# True
# False
# False
# ...

↓ Changed to stream

import rx
from rx import operators as ops
from rx.subject import Subject

enter_state_stream = Subject()
while True:
    # enter_state_Stream the state of the Enter key to stream
    enter_state_stream.on_next(keyboard.is_pressed('enter'))

Since I haven't subscribed, nothing happens as it is. Let's start by implementing on_press.

import rx
from rx import operators as ops
from rx.subject import Subject

enter_state_stream = Subject()

# on_press
enter_state_stream.pipe(
    ops.buffer_with_count(2,1) #Get two data.
    ,ops.filter(lambda x: x==[False,True]) #The moment you press it is False,True data flows.
).subscribe(lambda x: print('on_press'))

while True:
    enter_state_stream.on_next(keyboard.is_pressed('enter'))

###output(On every time you press the enter key_press is displayed)###
# on_press
# on_press
# on_press
# on_press

Since on_release can be implemented in the same way, skip it once. Next, let's implement on_double_press.

import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime

enter_state_stream = Subject()
on_press_stream = enter_state_stream.pipe(
    ops.buffer_with_count(2,1) #Get the first two data.
    ,ops.filter(lambda x: x==[False,True]) #The moment you press it is False,True data flows.
)

# on_double_press
on_press_stream.pipe(
    ops.timestamp() # on_Timestamp press
    ,ops.buffer_with_count(2,1) # on_Look at the press two by two
    ,ops.map(lambda x:x[1][1]-x[0][1]) #Two on_Convert to press time interval
    ,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2)) # on_press time interval is 0.Filter in less than 2 seconds
).subscribe(lambda x: print('on_double_press'))

while True:
    enter_state_stream.on_next(keyboard.is_pressed('enter'))
###output(Displayed each time the enter key is pressed continuously)###
# on_double_press
# on_double_press
# on_double_press
# on_double_press

You have now implemented on_double_press. Finally, let's put it together in a nice class while making it asynchronous processing.

import rx
from rx import operators as ops
from rx.subject import Subject
import keyboard
import datetime
import threading

class Enter:
    enter_state_stream = None
    on_press_stream = None
    on_release_stream = None
    on_double_press = None
    def __init__(self):
        self.enter_state_stream = Subject()
        self.on_press_stream = self.enter_state_stream.pipe(
            ops.buffer_with_count(2,1) 
            ,ops.filter(lambda x: x==[False,True]) 
        )
        self.on_release_stream = self.enter_state_stream.pipe(
            ops.buffer_with_count(2,1) 
            ,ops.filter(lambda x: x==[True,False]) 
        )
        self.on_double_press = self.on_press_stream.pipe(
            ops.timestamp() 
            ,ops.buffer_with_count(2,1)
            ,ops.map(lambda x:x[1][1]-x[0][1]) 
            ,ops.filter(lambda x:x<datetime.timedelta(seconds=0.2)) 
        )
        def f():
            while True:
                self.enter_state_stream.on_next(keyboard.is_pressed('enter'))
        threading.Thread(target=f).start()

def main():
    enter = Enter()
    #You can write event processing like this!
    enter.on_double_press.subscribe(lambda x:print('on_double_press'))

at the end

If you like, please write anything. I would be grateful if you could comment. Please let me know if something is wrong.

Recommended Posts

[Python] Reactive Extensions learned with RxPY (3.0.1) [Rx]
[Python] Object-oriented programming learned with Pokemon
Perceptron learning experiment learned with Python
Python data structures learned with chemoinformatics
Efficient net pick-up learned with Python
1. Statistics learned with Python 1-1. Basic statistics (Pandas)
Algorithm learned with Python 10th: Binary search
Algorithm learned with Python 5th: Fibonacci sequence
Algorithm learned with Python 9th: Linear search
Algorithm learned with Python 7th: Year conversion
Algorithm learned with Python 8th: Evaluation of algorithm
Algorithm learned with Python 4th: Prime numbers
Algorithm learned with Python 2nd: Vending machine
Algorithm learned with Python 19th: Sorting (heapsort)
Algorithm learned with Python 6th: Leap year
1. Statistics learned with Python 1-3. Calculation of various statistics (statistics)
Algorithm learned with Python 3rd: Radix conversion
Algorithm learned with Python 12th: Maze search
Algorithm learned with Python 11th: Tree structure
FizzBuzz with Python3
Scraping with Python
Statistics with python
Scraping with Python
Algorithm learned with Python 13th: Tower of Hanoi
Python with Go
Algorithm learned with Python 16th: Sorting (insertion sort)
Twilio with Python
Integrate with Python
Play with 2016-Python
Algorithm learned with Python 14th: Tic-tac-toe (ox problem)
AES256 with python
Tested with Python
Algorithm learned with Python 15th: Sorting (selection sort)
1. Statistics learned with Python 1-2. Calculation of various statistics (Numpy)
python starts with ()
with syntax (Python)
Algorithm learned with Python 17th: Sorting (bubble sort)
Reactive Extensions practice
Bingo with python
Zundokokiyoshi with python
Use Python and word2vec (learned) with Azure Databricks
1. Statistics learned with Python 2-1. Probability distribution [discrete variable]
Excel with Python
Microcomputer with Python
"Principle of dependency reversal" learned slowly with Python
Cast with python
I learned Python with a beautiful girl at Paiza # 02
I learned Python with a beautiful girl at Paiza # 01
Algorithm learned with Python 18th: Sorting (stack and queue)
Serial communication with Python
Zip, unzip with python
Django 1.11 started with Python3.6
Python with eclipse + PyDev.
Socket communication with Python
Data analysis with python 2
Scraping with Python (preparation)
Try scraping with Python.
Learning Python with ChemTHEATER 03
Sequential search with Python
"Object-oriented" learning with python
Run Python with VBA