A program struggle to process a floating-point list in parallel on four clusters of Raspberry Pi 4B via Python socket communication and reduce the loop calculation time to 1/4.

1.jpg This post is a record when I created a python program that processes in parallel and shortens the loop calculation time. I used the four Raspberry Pi 4Bs in the photo above for the parallel processing cluster. The Raspberry Pi 4B requires a large heatsink. This time the fans haven't turned it yet. I was thinking about cooling it with water if the fever became a little more intense.

The Python program created this time, which is the subject, has started to work, and parallel processing in four clusters has reduced the calculation time from 4 seconds to 1.1 seconds, which is about 1/4 as shown in the table below. In addition, the calculation time could be reduced from 8 seconds to 4 seconds just by changing from 1 thread to 4 threads with just one unit. **** This partial deletion start **** This is less frequently affected by GIL because the frequency of code that is not thread safe is small **** This partial deletion end **** By 4 threads in one The exception is when parallel processing was effective. However, there were many twists and turns (errors continued to appear) before the program was completed. In this struggle record, I will pick up and introduce the parts that were particularly difficult to clear the error. The program created at the end of this article is attached.

Model Raspberry Pi 4B Raspberry Pi 4B Raspberry Pi 4B
Number of units One One 4 units
Number of threads 1 thread 4 threads 4 threads(各Pi)
processing time 8 seconds 4 seconds 1.1 second
import module time time threading time threading socket

LAN network configuration and OS

Before the struggle, I will briefly introduce the network configuration used this time. It consists of a cluster of four Raspberry Pi 4Bs and one Raspberry Pi Zero. All OS are Raspberry Pi OS with desktop: 2020-12-02-raspios-buster-armhf.img. Each Pi can be operated from the console with keyboard, mouse, monitor and touch, or with ssh via LAN network. For the convenience of the program, all IP addresses are fixed. 2.jpg The OS installation and initial settings for each Pi were performed from the console. There are four specific initial setting points. ① Add an empty ssh file to the SD card for OS installation. ② Similarly, add the setting item for HDMI monitor to the end of the config.txt file of the SD card. Then turn on the power of Pi and start it. ③ After starting up, set the fixed IP address in the network settings as shown on the right in the above figure. The final setting ④ may not be necessary depending on the keyboard model. This time, I bought a new compact keyboard without a numeric keypad, so I couldn't type'@' from the keyboard. ④ Keyboard setting: In Localization, Timezone is Asia Tokyo, Keyboard is 105, US, US, Locale is English, US, US, UTF-8. You can now type'@' from the keyboard.

The initial settings up to this point have passed safely. However, when I created the next python program, I got a series of errors that seemed unsolvable (to me as a beginner). This is the beginning of the strenuous battle record. I'm also using Python 3.7.9 IDLE on Windows 10 (x64) while writing the program. In addition, I used Putty for ssh connection from Windows to each Pi, and WinSCP for file transfer from Windows to each Pi. The encrypted connection with the Raspberry Pi was made automatically, and the operation was simple and excellent, but it was very helpful. It is a bestseller.

At first, the points that I thought were unsolvable

・ 4 thread processing with 1 unit

-Total of calculated values ​​for each thread

-Socket communication between two floating point units

-Socket communication between two units in the list

First, I stumbled upon 4-thread processing on one machine. I've been searching on Google for a long time. But there isn't. While giving up thinking that this was not good, I also visited the following sites. https://docs.python.org/ja/3/library/threading.html?highlight=thread#threading.Thread I understand the definition, but there are many parts that I don't understand what is written, so my program itself didn't work. I wrote that multiprocessing is good, so I tried it. But it doesn't work at all. For both threading and multiprocessing, I gave up parallel processing anyway.

Proceed to socket communication between the two units first. On the contrary, this socket communication is smooth and smooth, and the received character strings are displayed one after another on each other's monitors. Even if I increased the number to 3 or 4, the transmission and reception of character strings by socket communication went well. At this point, the prototype of parallel processing between multiple units was completed. However, the goal is multiple loop calculation, so if you divide it, you will get a floating point calculation result. This must be received from multiple units via socket communication, and the total value must be calculated. However, I stumbled upon this floating-point socket communication between the two.

Socket communication between two floating point units

I just changed the string to floating point in the socket communication of the string that worked. Suddenly I got an error saying that float cannot be converted to byte type. I understood that only byte type can be sent and received in socket communication, encode to byte type and send, and decode on the receiving side to return to a character string. However, the error display shows that floating point cannot be encoded into byte type. This error was cleared by str, which converts a character string to a character string on the sending side, and float, which converts a character string to a floating point number on the receiving side, just by adding one line each, as shown below.

Sender

#Floating point number
total21 = 12345.6789
#Make a floating point number a string
total21str = str(total21) ####Convert to string with str
#Encode the character string to byte type and send it from the connected socket21
socket21.send(bytes(total21str, 'utf-8')

Receiver

#Receive encoded byte type character string on the connected client socket21
total21strEncode = clientsocket21.recv(1016)
#decode back to string
total21str = total21strEncode.decode("utf-8")
#Convert a string back to a floating point number
total21 = float(total21str) ####Convert to floating point numbers with float

Similarly, if you use int which converts a character string to an integer, you can also socket the integer number. Next, I tried socket communication with a list of mixed integer and floating point numbers. I got an error message. Lists cannot be encoded into byte type.

Socket communication between two units in the list

I also tried to send this as a string. Other than that, it was easily converted to a character string and the sender cleared it as follows.

Sender

#list
lists21 = [10.01, 1234]
#Make the list a string
list21str = str(lists21) ####Convert to string with str
#Encode the character string list21str to byte type and send it from the connected socket21
socket21.send(bytes(list21str, 'utf-8')

Receiver

#Receive encoded byte type character string on the connected client socket21
list21strEncode = clientsocket21.recv(1016)
#decode back to string
list21str=list21strEncode.decode("utf-8")
list21str = list21str.replace('[', '')####
list21str = list21str.replace(']', '')####
lists21str=[]
lists21str=list21str.split(', ')
# ["10.01", "1234"]
lists21=[]
lists21.append(float(lists21str[0]))
lists21.append(int(lists21str[1]))
# [10.01, 1234]

The receiver wasn't very clean, but I managed to get the pre-send list.

4-thread processing with one unit

I challenged threading again. I've been searching on Google for a long time. Surprisingly, I solved it by adding one "," after the argument of args. When I was searching on Google for a long time, I had seen it somewhere, but I deleted it. I searched again to find out why it was resolved, but I couldn't find the cause. Addendum: When the argument of the function is a tuple of parentheses (), it means that a comma',' is required after one argument, that is, at the end, as in the specification of a tuple with one element.

thread = threading.Thread(target=calc_1s, args=(n,))


Sum of calculated values ​​for each thread

In parallel processing on the threads that have started running, the calculated values ​​of each thread must be summed. But the variables in the thread couldn't be used. I couldn't use it even if I returned it with return. I calculated it, but I can't help if I can't get the total value. I stopped here for a long time. This was also solved by applying append to totals (list).

totals.append(total21)


 It was described in detail on the following site.
https://docs.python.org/ja/3/tutorial/datastructures.html
 ======= Delete this part start =======
 Usually it is one line that does not bother you. However, in the "created python program" shown below, each thread uses the global totals (list) in common at the same time. In other words, in this "created python program", the code will not be thread-safe. However, in Python, GIL works and "automatically avoids conflicts between threads". It seems to have great features similar to the software version of the DMA channel.
 ======= Delete this part end =======
 Python's GIL was a function that limited to 1 core even in multithreading and secured spare capacity for keyboards, mice, and monitors. It was not a function that "automatically avoids conflicts between threads". The above was a misunderstanding. To use all 4 cores of Raspberry Pi 3 and 4 CPUs, I had to build a program with multiprocessing. However, the keyboard, mouse, and monitor become almost unresponsive.


## Created python program
 Put cluster21.py in the Pi folder, run it in python3 and wait. The other three copy cluster21.py and "replace all" to 22, 23 and 24, respectively. Similarly, put it in the Pi folder, run it with python3, and wait. Below, it is a python program that does not seem to be a python program, but I will publish it.

``` python
import time
import threading
import socket

total   = 0
totalsG0 = []####Add changes
totalsG1 = []####Add changes
totalsG2 = []####Add changes
totalsG3 = []####Add changes
#totals  = []####Change comment out


def calc_1s(n):
    if n == 0:
        print(n)
        total0 = 0
        for t0 in range(list21lint, list21mint):
            for u0 in range(L0):
                total0 += t0*u0*list21cfloat0
        print(f"End of {n}")
        totalsG0.append(total0)####Addition of correction
        print(total0)
        print(totalsG0)
        #totals.append(total0)####GIL is applied only once ← Wrong comment out
    elif n == 1:
        print(n)
        total1 = 0
        for t1 in range(list21nint, list21oint):
            for u1 in range(L1):
                total1 += t1*u1*list21dfloat1
        print(f"End of {n}")
        totalsG1.append(total1)####Addition of correction
        print(total1)
        print(totalsG1)
        #totals.append(total1)####GIL is applied only once ← Wrong comment out
    elif n == 2:
        print(n)
        total2 = 0
        for t2 in range(list21pint, list21qint):
            for u2 in range(L2):
                total2 += t2*u2*list21efloat2
        print(f"End of {n}")
        totalsG2.append(total2)####Addition of correction
        print(total2)
        print(totalsG2)
        #totals.append(total2)####GIL is applied only once ← Wrong comment out
    elif n == 3:
        print(n)
        total3 = 0
        for t3 in range(list21rint, list21sint):
            for u3 in range(L3):
                total3 += t3*u3*list21ffloat3
        print(f"End of {n}")
        totalsG3.append(total3)####Addition of correction
        print(total3)
        print(totalsG3)
        #totals.append(total3)####GIL is applied only once ← Wrong comment out

#Receive list for calculation
socket21 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket21.bind(('192.168.2.21', 60021))
socket21.listen(5)
clientsocket21, address21 = socket21.accept()
list21strEncode = b''
list21strEncode = clientsocket21.recv(1016)
list21str=list21strEncode.decode("utf-8")
print(list21str)
list21str = list21str.replace('[', '')
list21str = list21str.replace(']', '')
lists21str=[]#lists21str:New list for strings
lists21str=list21str.split(', ')
list21aint=int(lists21str[0])
list21bint=int(lists21str[1])
list21cfloat0=float(lists21str[2])
list21cfloat1=list21cfloat0
list21cfloat2=list21cfloat0
list21cfloat3=list21cfloat0
list21dfloat0=float(lists21str[3])
list21dfloat1=list21dfloat0
list21dfloat2=list21dfloat0
list21dfloat3=list21dfloat0
list21efloat0=float(lists21str[4])
list21efloat1=list21efloat0
list21efloat2=list21efloat0
list21efloat3=list21efloat0
list21ffloat0=float(lists21str[5])
list21ffloat1=list21ffloat0
list21ffloat2=list21ffloat0
list21ffloat3=list21ffloat0
list21gfloat=float(lists21str[6])
list21hfloat=float(lists21str[7])
list21ifloat=float(lists21str[8])
list21jfloat=float(lists21str[9])
list21kfloat=float(lists21str[10])
list21lint=int(lists21str[11])
list21mint=int(lists21str[12])
list21nint=int(lists21str[13])
list21oint=int(lists21str[14])
list21pint=int(lists21str[15])
list21qint=int(lists21str[16])
list21rint=int(lists21str[17])
list21sint=int(lists21str[18])
list21tint=int(lists21str[19])
lists21=[]#lists21:New list for numerical values
lists21.clear()####
lists21.append(int(lists21str[0]))
lists21.append(int(lists21str[1]))
lists21.append(float(lists21str[2]))
lists21.append(float(lists21str[3]))
lists21.append(float(lists21str[4]))
lists21.append(float(lists21str[5]))
lists21.append(float(lists21str[6]))
lists21.append(float(lists21str[7]))
lists21.append(float(lists21str[8]))
lists21.append(float(lists21str[9]))
lists21.append(float(lists21str[10]))
lists21.append(int(lists21str[11]))
lists21.append(int(lists21str[12]))
lists21.append(int(lists21str[13]))
lists21.append(int(lists21str[14]))
lists21.append(int(lists21str[15]))
lists21.append(int(lists21str[16]))
lists21.append(int(lists21str[17]))
lists21.append(int(lists21str[18]))
lists21.append(int(lists21str[19]))
print(lists21)
clientsocket21.send(bytes("OK 21 Start ....", 'utf-8'))
clientsocket21.close()
socket21.close()


if list21aint == 1:
    # calc_1s Calcuration Start
    L0 = list21bint
    L1 = list21bint
    L2 = list21bint
    L3 = list21bint
    initial_time = time.time()
    print(f"CPU is calculating now !")
    # Start Threads
    threads = []
    for n in range(4):# 4 thread
        thread = threading.Thread(target=calc_1s, args=(n,))#
        thread.start()
        threads.append(thread)
    # Wait Threads
    for thread in threads:
        thread.join()
    total += totalsG0[0]####Add changes
    total += totalsG1[0]####Add changes
    total += totalsG2[0]####Add changes
    total += totalsG3[0]####Add changes
    #for x in totals:#####Change comment out
    #    total += x#####Change comment out
    print(f"Answer :")
    print (total)
    print(f"Time :")
    print(str(time.time() - initial_time))
    # Calculation Result Return  === Fixed 20 ===
    socket21 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    socket21.connect(('192.168.2.20', 60021))
    totalstr = str(total)
    socket21.send(bytes(totalstr, 'utf-8'))
    msg21 = b''
    msg21 = socket21.recv(16)
    socket21.close()
    print(msg21.decode("utf-8"))

time.sleep(10) # for display
import time
import threading
import socket

##List for 4000 double loop calculations: Approximately 1 with 4 Raspberry Pi 4B.1 second
lists21 = [1, 4001, 1.0, 1.0, 1.0, 1.0, 6.6, 7.7, 8.8, 9.9, 10.10, 0, 251, 251, 501, 501, 751, 751, 1001, 19]
lists22 = [1, 4001, 1.0, 1.0, 1.0, 1.0, 6.6, 7.7, 8.8, 9.9, 10.10, 1001, 1251, 1251, 1501, 1501, 1751, 1751, 2001, 19]
lists23 = [1, 4001, 1.0, 1.0, 1.0, 1.0, 6.6, 7.7, 8.8, 9.9, 10.10, 2001, 2251, 2251, 2501, 2501, 2751, 2751, 3001, 19]
lists24 = [1, 4001, 1.0, 1.0, 1.0, 1.0, 6.6, 7.7, 8.8, 9.9, 10.10, 3001, 3251, 3251, 3501, 3501, 3751, 3751, 4001, 19]

total  = 0.
totalsG21 = [0.]####Add changes
totalsG22 = [0.]####Add changes
totalsG23 = [0.]####Add changes
totalsG24 = [0.]####Add changes
#totals   = []#**#Change comment out

def connectto_ipports(n):
    if n == 60021:
        print(n)
        socket21 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        socket21.connect(('192.168.2.21', 60021))
        list21str=''
        list21str=str(lists21)
        socket21.send(bytes(list21str, 'utf-8'))#Send list to cluster
        msg21 = b''
        msg21 = socket21.recv(16)
        socket21.close()
        print(msg21.decode("utf-8"))
    elif n == 60022:
        print(n)
        socket22 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        socket22.connect(('192.168.2.22', 60022))
        list22str=''
        list22str=str(lists22)
        socket22.send(bytes(list22str, 'utf-8'))#Send list to cluster
        msg22 = b''
        msg22 = socket22.recv(16)
        socket22.close()
        print(msg22.decode("utf-8"))
    elif n == 60023:
        print(n)
        socket23 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        socket23.connect(('192.168.2.23', 60023))
        list23str=''
        list23str=str(lists23)
        socket23.send(bytes(list23str, 'utf-8'))#Send list to cluster
        msg23 = b''
        msg23 = socket23.recv(16)
        socket23.close()
        print(msg23.decode("utf-8"))
    elif n == 60024:
        print(n)
        socket24 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        socket24.connect(('192.168.2.24', 60024))
        list24str=''
        list24str=str(lists24)
        socket24.send(bytes(list24str, 'utf-8'))#Send list to cluster
        msg24 = b''
        msg24 = socket24.recv(16)
        socket24.close()
        print(msg24.decode("utf-8"))

def accept_ports(n):
    if n == 60021:
        print(n)
        socket21 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        socket21.bind(('192.168.2.20', 60021))
        socket21.listen(5)
        clientsocket21, address21 = socket21.accept()
        total21en = b''
        total21en = clientsocket21.recv(1016)
        total21 = float(total21en.decode("utf-8"))
        totalsG21.append(total21)###Addition of correction
        del totalsG21[0]###Addition of correction
        #totals.append(total21)####GIL is applied only once ← Wrong comment out
        print(total21)
        clientsocket21.send(bytes("Thank you ......", 'utf-8'))
        clientsocket21.close()
        socket21.close()
    elif n == 60022:
        print(n)
        socket22 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        socket22.bind(('192.168.2.20', 60022))
        socket22.listen(5)
        clientsocket22, address22 = socket22.accept()
        total22en = b''
        total22en = clientsocket22.recv(1016)
        total22 = float(total22en.decode("utf-8"))
        totalsG22.append(total22)###Addition of correction
        del totalsG22[0]###Addition of correction
        #totals.append(total22)####GIL is applied only once ← Wrong comment out
        print(total22)
        clientsocket22.send(bytes("Thank you ......", 'utf-8'))
        clientsocket22.close()
        socket22.close()
    elif n == 60023:
        print(n)
        socket23 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        socket23.bind(('192.168.2.20', 60023))
        socket23.listen(5)
        clientsocket23, address23 = socket23.accept()
        total23en = b''
        total23en = clientsocket23.recv(1016)
        total23 = float(total23en.decode("utf-8"))
        totalsG23.append(total23)###Addition of correction
        del totalsG23[0]###Addition of correction
        #totals.append(total23)####GIL is applied only once ← Wrong comment out
        print(total23)
        clientsocket23.send(bytes("Thank you ......", 'utf-8'))
        clientsocket23.close()
        socket23.close()
    elif n == 60024:
        print(n)
        socket24 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        socket24.bind(('192.168.2.20', 60024))
        socket24.listen(5)
        clientsocket24, address24 = socket24.accept()
        total24en = b''
        total24en = clientsocket24.recv(1016)
        total24 = float(total24en.decode("utf-8"))
        totalsG24.append(total24)###Addition of correction
        del totalsG24[0]###Addition of correction
        #totals.append(total24)####GIL is applied only once ← Wrong comment out
        print(total24)
        clientsocket24.send(bytes("Thank you ......", 'utf-8'))
        clientsocket24.close()
        socket24.close()

threads = []

# Thread Client Start
for n in range(60024, 60025):
    thread = threading.Thread(target=connectto_ipports, args=(n,)) #
    thread.start()
    threads.append(thread)
# Wait Threads
for thread in threads:
    thread.join()
# Result Display
print("All Clusters Calculation Start")

threads.clear()

# Accept Thread Server Start
for n in range(60024, 60025):
    thread = threading.Thread(target=accept_ports, args=(n,)) #
    thread.start()
    threads.append(thread)
# Wait Threads
for thread in threads:
    thread.join()
total += totalsG21[0]####Add changes
total += totalsG22[0]####Add changes
total += totalsG23[0]####Add changes
total += totalsG24[0]####Add changes
#for x in totals:#####Change comment out
#    total += x  #####Change comment out

# Result Display
print("Calculation Result:")
print(total)

time.sleep(10) # for display

# total20.py fixed IP address 192.168.2.20 (Execute py last and start calculation)
# cluster21.py fixed IP address 192.168.2.21 (Run py first and wait)
# cluster22.py fixed IP address 192.168.2.22 (Run py first and wait)
# cluster23.py fixed IP address 192.168.2.23 (Run py first and wait)
# cluster24.py fixed IP address 192.168.2.24 (Run py first and wait)
##All OS is Raspberry Pi OS with Desktop: 2020-12-02-raspios-buster-armhf.img

Thank you for watching until the end.

This time, I agree with the purpose of Qiita and post it, so you can freely rewrite or modify the published program. There are no copyright issues.

However, when using the Raspberry Pi 4B, a particularly large heat sink is required for the CPU. In the case of this program, where LAN communication is infrequent, the LAN chip does not get hot. However, if the calculation time continues, a considerable amount of power is used as can be seen from the intense heat generated by the CPU. The small black chip behind the power supply USB C type also gets hot, so you also need a fan flow.

Recommended Posts

A program struggle to process a floating-point list in parallel on four clusters of Raspberry Pi 4B via Python socket communication and reduce the loop calculation time to 1/4.
Put the process to sleep for a certain period of time (seconds) or more in Python
[Python] A program that rotates the contents of the list to the left
[Python / Jupyter] Translate the comment of the program copied to the clipboard and insert it in a new cell
Receive a list of the results of parallel processing in Python with starmap
I made a program to check the size of a file in Python
How to stop a program in python until a specific date and time
How to pass the execution result of a shell command in a list in Python
How to get a list of files in the same directory with python
When a file is placed in the shared folder of Raspberry Pi, the process is executed.
How to identify the element with the smallest number of characters in a Python list?
How to check in Python if one of the elements of a list is in another list
I investigated the calculation time of "X in list" (linear search / binary search) and "X in set"
Check the processing time and the number of calls for each process in python (cProfile)
I made a program in Python that changes the 1-minute data of FX to an arbitrary time frame (1 hour frame, etc.)
[C / C ++] Pass the value calculated in C / C ++ to a python function to execute the process, and use that value in C / C ++.
[Golang] Command to check the supported GOOS and GOARCH in a list (Check the supported platforms of the build)
Parallel processing of Python joblib does not work in uWSGI environment. How to process in parallel on uWSGI?
Make a note of what you want to do in the future with Raspberry Pi
Get the value of a specific key up to the specified index in the dictionary list in Python
How to start the PC at a fixed time every morning and execute the python program
Summary of points to keep in mind when writing a program that runs on Python 2.5
[Python] How to delete rows and columns in a table (list of drop method options)
A story about trying to use cron on a Raspberry Pi and getting stuck in space
An easy way to view the time taken in Python and a smarter way to improve it
How to pass the execution result of a shell command in a list in Python (non-blocking version)
[Python] A program to find the number of apples and oranges that can be harvested