[Einführung in Python3, Tag 22] Kapitel 11 Parallele Verarbeitung und Vernetzung (11.1 bis 11.3)

11.1 Parallelverarbeitung

Wenn Sie auf etwas auf Ihrem Computer warten, gibt es zwei Gründe:

Ein Begriff, der sich auf die Parallelverarbeitung bezieht.

11.1.1 Warteschlange

--Cue-Elemente werden an einem Ende hinzugefügt und am anderen Ende entfernt.

11.1.2 Prozess

dishes.py



import multiprocessing as mp

def washer(dishes, output):
    for dish in dishes:
        print("Washing",dish, "dish")
        output.put(dish)

#get()Entfernt das Element aus der Warteschlange und gibt es zurück.
#task_fertig ist bekommen()Wenn es danach aufgerufen wird, teilt es der Warteschlange mit, dass der Vorgang abgeschlossen ist.
#join()Blockiert, bis alle Elemente in der Warteschlange abgerufen wurden.
def dryer(input):
    while True:
        dish=input.get()
        print("Drying",dish,"dish")
        input.task_done()

dish_queue=mp.JoinableQueue()
dryer_proc=mp.Process(target=dryer, args=(dish_queue,))
dryer_proc.daemon=True
dryer_proc.start()

dishes=["salad","bread","entree","dessert"]
washer(dishes, dish_queue)
dish_queue.join()

Ausführungsergebnis



python dishes.py
Washing salad dish
Washing bread dish
Washing entree dish
Washing dessert dish
Drying salad dish
Drying bread dish
Drying entree dish
Drying dessert dish

11.1.3 Thread

threads.py



import threading

def do_this(what):
    whoami(what)

#threading.current_thread()Erstellt ein Objekt, das dem Thread der Funktionsaufrufverarbeitung entspricht.
def whoami(what):
    print("Thread %s says: %s" % (threading.current_thread(),what))

#Wenn Sie ein Thread-Objekt erstellen, wird der Thread gestartet()Rufen Sie die Methode auf, um die Aktivität zu starten.
#Ziel ist ein aufrufbares Objekt, das von der Ausführungsmethode aufgerufen wird.
#args ist das Argument tapple beim Aufrufen von target.
if __name__=="__main__":
    whoami("I am the main program")
    for n in range(4):
        p=threading.Thread(target=do_this, args=("I am function %s" % n,))
        p.start()

Ausführungsergebnis



python threads.py
Thread <_MainThread(MainThread, started 4530769344)> says: I am the main program
Thread <Thread(Thread-1, started 123145448275968)> says: I am function 0
Thread <Thread(Thread-2, started 123145453531136)> says: I am function 1
Thread <Thread(Thread-3, started 123145448275968)> says: I am function 2
Thread <Thread(Thread-4, started 123145453531136)> says: I am function 3

thread_dishes.py



import threading, dish_queue
import time

def washer(dishes, dish_queue):
    for dish in dishes:
        print("Washing",dish)
        time.sleep(5)
        dish_queue.put(dish) #Warteschlangenelement.

def dryer(dish_queue):
    while True:
        dish=dish_queue.get() #Entfernt ein Element aus der Warteschlange und gibt es zurück.
        print("Drying", dish)
        time.sleep(10)
        dish_queue.task_done() #get()Nach der Aufgabe_done()Wird aufgerufen, um anzuzeigen, dass die Verarbeitung für die abgerufene Aufgabe abgeschlossen ist.

dish_queue =queue.Queue() #FIFO-Warteschlangenkonstruktor.
for n in range(2):
    dryer_thread=threading.Thread(target=dryer, args=(dish_queue,))
    dryer_thread.start()

dishes=["salad","bread","entree","dessert"]
washer(dishes, dish_queue)
dish_queue.join() #Blockieren, bis alle Elemente in der Warteschlange abgerufen und verarbeitet wurden. Entsperren Sie, wenn die Aufgabe abgeschlossen ist.

11.1.4 Grüner Faden und Gevent

--gevent schreibt viele der Python-Standardobjekte wie Sockets neu, sodass sie den Gevent-Mechanismus verwenden, ohne ihn zu blockieren.

gevent_test.py


import gevent
from gevent import monkey
monkey.patch_socket()

hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com', 'www.taxidermy.net']
jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
    print(job.value)

Ausführungsergebnis


python gevent_test.py
66.6.44.4
104.27.172.75
104.18.63.71

gevent_monkey.py


import gevent
from gevent import monkey; monkey.patch_all()
import socket
hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com', 'www.taxidermy.net']
jobs = [gevent.spawn(socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
    print(job.value)

Ausführungsergebnis


python gevent_monkey.py
66.6.44.4
104.27.173.75
104.18.62.71

11.1.5 twisted

knock_sever.py



from twisted.internet import protocol, reactor

class Knock(protocol.Protocol):
    def dataReceived(self, data):
        print(Client, data)
        if data.startswith("Knock, knock"):
            response = "Who is there?"
        else:
            response = data + " who?"
        print(Server, response)
        self.transport.write(response)

class KnockFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Knock()

reactor.listenTCP(8000, KnockFactory())
reactor.run()

knock_client.py


from twisted.internet import reactor, protocol

class KnockClient(protocol.Protocol):
    def connectionMade(self):
        self.transport.write("Knock, knock")

    def dataReceived(self, data):
        if data.startswith("Who is there"):
            response = "Disappearing client"
            self.transport.write(response)
        else:
            self.transport.loseConnection()
            reactor.stop()

class KnockFactory(protocol.ClientFactory):
    protocol = KnockClient

def main():
    f = KnockFactory()
    reactor.connectTCP('localhost', 8000, f)
    reactor.run()

if __name__ == '__main__':
    main()

11.1.7 Redis

redis_washer.py



#"dishes"Wird auf dem Redis-Server generiert.
import redis
conn=redis.Redis()
print("Washer is starting")
dishes=["salad","bread","entree","dessert"]
for dish in dishes:
    msg=dish.encode("utf-8")
    conn.rpush("dishes",msg)#rpush fügt am Ende ein neues Element hinzu.
    print("Washed",dish)
conn.rpush("dishes","quit")
print("Washer is done")

redis_dryer.py


#"dishes"Warten Sie auf die mit gekennzeichneten Nachrichten und zeigen Sie eine Nachricht an, die angibt, dass jede getrocknet wurde.
import redis
conn=redis.Redis()
print("Dryer is starting")
while True:
    msg=conn.blpop("dishes")
    if not msg:
        break
    val=msg[1].decode("utf-8")
    if val=="quit":
        break
    print("Dried",val)
print("Dishes are dried")

Ausführungsergebnis


$ python redis_dryer.py &
[1] 43950

$ Dryer is starting

$ python redis_washer.py
Washer is starting
Washed salad
Dried salad
Washed bread
Dried bread
Washed entree
Dried entree
Washed dessert
Dried dessert
Washer is done
Dishes are dried
[1]+  Done                    python redis_dryer.py


redis_dryer2.py



#Erstellen Sie mehrere Trocknerprozesse.
#Dem Trocknerprozess wurde eine Timeout-Funktion hinzugefügt, anstatt nach einem Schutz zu suchen.
def dryer():
    import redis
    import os
    import time
    conn=redis.Redis()
    pid=os.getpid()
    timeout=20
    print("Dryer process % is starting" %pid)
    while True:
        msg=conn.blpop("dishes",timeout)#Extrahieren Sie das erste Element der Liste(LPOP)Rückkehr mit Schlüssel.
        if not msg:
            break
        val=msg[1].decode("utf-8")#BLPOP ist ein Array aus zwei Elementen, wobei das erste Element der Schlüssel und das zweite Element der Popup-Wert ist.
        #Daher msg[0]Nicht msg[1]Wird sein.
        if val=="quit":
            break
        print("&%s: dried %s" % (pid,val))
        time.sleep(0.1)
    print("Dryer process %s is done" %pid)

import multiprocessing
DRYERS=3
for num in range(DRYERS):
    p = multiprocessing.Process(target=dryer)
    p.start()

Ausführungsergebnis



python redis_dryer2.py &
[1] 44162

$ Dryer process  44179s starting
Dryer process  44178s starting
Dryer process  44180s starting
Dryer process 44180 is done
Dryer process 44178 is done
Dryer process 44179 is done

[1]+  Done                    python redis_dryer2.py


11.2 Netzwerk

11.2.1 Muster

11.2.2 Modell veröffentlichen / abonnieren

11.2.2.1 Redis

redis_pub.py


import redis
import random

conn = redis.Redis()
cats = ['siamese', 'persian', 'maine coon', 'norweigian forest']
hats = ['stovepipe', 'bowler', 'tam-o-shanter', 'fedora']
for msg in range(10):
    cat = random.choice(cats)
    hat = random.choice(hats)
    print('Publish: %s wears a %s' % (cat, hat))
    conn.publish(cat, hat)

redis_sub.py


import redis
conn=redis.Redis()

topics=["maine coon", "persian"]
sub=conn.pubsub()
sub.subscribe(topics)
for msg in sub.listen():
    if msg["type"]=="message":
        cat=msg["channel"]
        hat=msg["data"]
        print("Subscribe: %s wears a %s" % (cat, hat))

Ausführungsergebnis



$ python redis_pub.py
Publish: maine coon wears a bowler
Publish: maine coon wears a bowler
Publish: norweigian forest wears a stovepipe
Publish: siamese wears a bowler
Publish: maine coon wears a bowler
Publish: norweigian forest wears a tam-o-shanter
Publish: persian wears a stovepipe
Publish: persian wears a stovepipe
Publish: persian wears a fedora
Publish: persian wears a bowler

$ python redis_sub.py
Subscribe: b'persian' wears a b'fedora'
Subscribe: b'persian' wears a b'bowler'
Subscribe: b'persian' wears a b'bowler'
Subscribe: b'maine coon' wears a b'bowler'


11.2.2.2 ZeroMQ

--ZeroMQ verfügt nicht über einen zentralen Server, sodass einzelne Publisher an alle Abonnenten schreiben.

zmq_pub.py


import zmq
import random
import time
host="*"
port=6789
ctx=zmq.Context()
pub=ctx.socket(zmq.PUB)
pub.bind('tcp://%s:%s' % (host, port))
cats=["siamese", "persian", "maine coon", "norwegian forest"]
hats=["stovepipe", "bowler", "tam-o-shanter", "fedora"]
time.sleep(1)
#UTF zu Themen- und Wertzeichenfolgen-Beachten Sie, dass wir 8 verwenden.
for msg in range(10):
    cat=random.choice(cats)
    cat_bytes=cat.encode("utf-8")
    hat=random.choice(hats)
    hat_bytes=hat.encode("utf-8")
    print("Publish: %s wears a %s" % (cat, hat))
    pub.send_multipart([cat_bytes, hat_bytes])

zmq_sub.py


import zmq
host="127.0.0.1"
port=6789
ctx=zmq.Context()
sub=ctx.socket(zmq.SUB)
sub.connect('tcp://%s:%s' % (host, port))
topics=["maine coon", "persian"]
for topic in topics:
    sub.setsockopt(zmq.SUBSCRIBE, topic.encode("utf-8"))
while True:
    cat_bytes, hat_bytes=sub.recv_multipart()
    cat=cat_bytes.decode("utf-8")
    hat=hat_bytes.decode("utf-8")
    print("Subscribe: %s wears a %s" % (cat, hat))

Ausführungsergebnis



$ python zmq_pub.py
Publish: maine coon wears a fedora
Publish: maine coon wears a stovepipe
Publish: persian wears a fedora
Publish: norwegian forest wears a fedora
Publish: persian wears a stovepipe
Publish: persian wears a fedora
Publish: norwegian forest wears a fedora
Publish: norwegian forest wears a tam-o-shanter
Publish: persian wears a stovepipe
Publish: maine coon wears a bowler

$ python zmq_sub.py
Subscribe: maine coon wears a fedora
Subscribe: maine coon wears a stovepipe
Subscribe: persian wears a fedora
Subscribe: persian wears a stovepipe
Subscribe: persian wears a fedora
Subscribe: persian wears a stovepipe
Subscribe: maine coon wears a bowler

11.2.3 TCP/IP

-UDP: Wird zum Austausch von Kurzdaten verwendet. --TCP: Wird für Verbindungen mit einer längeren Lebensdauer als UDP verwendet.

11.2.4 Buchse

udp_server.py


from datetime import datetime
import socket

server_address=("localhost",6789)
max_size=4096

print("Starting the server at", datetime.now())
print("Waiting for a client to call.")
#Die erste Methode erstellt einen Socket und die zweite Methode bindet an den Socket.(Achten Sie auf Daten, die an dieser IP-Adresse und an diesem Port ankommen.)
#AF_INET bedeutet, das Internet zu machen.
#SOCK_DGRAM verwendet UDP in dem Sinne, dass es Datagramme sendet und empfängt.
server=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(server_address)

#recvfrom wartet auf das Eintreffen des Datagramms.
data, client=server.recvfrom(max_size)

print("At", datetime.now(), client , "said", data)
server.sendto(b"Are you talking to me?", client)
server.close()

udp_client.py


import socket
from datetime import datetime

server_address=("localhost",6789)
max_size=4096

print("Starting the client at", datetime.now())
client=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client.sendto(b"Hey!", server_address)
data, server=client.recvfrom(max_size)
print("At", datetime.now(), server, "said", data)
client.close()

Ausführungsergebnis



$ python udp_server.py
Starting the server at 2020-02-01 09:51:33.707462
Waiting for a client to call.
At 2020-02-01 09:52:24.053328 ('127.0.0.1', 54667) said b'Hey!'

$ python udp_client.py
Starting the client at 2020-02-01 09:52:48.897087
At 2020-02-01 09:52:48.898221 ('127.0.0.1', 6789) said b'Are you talking to me?'

tcp_client.py


from datetime import datetime
import socket

address=("localhost",6789)
max_size=4096

print("Starting the server at", datetime.now())
#SOCK zur Verwendung von TCP, einem Streaming-Protokoll_Verwenden Sie STREAM.
client=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#Stellen Sie eine Verbindung her, um den Stream einzurichten()Hinzufügen
client.connect(address)
#Der UDP-Server antwortet dem Client.sendto()Beachten Sie, dass ich angerufen habe.
client.sendall(b"Hey!")
data=client.recv(max_size)
print("At", datetime.now(), "someone replied", data)
client.close()

tcp_server.py


from datetime import datetime
import socket

address=("localhost",6789)
max_size=4096

print("Starting the server at", datetime.now())
print("Waiting for a client to call.")
server=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(address)
server.listen(5)

client, addr=server.accept()
data=client.recv(max_size)


print("At", datetime.now(), client , "said", data)
client.sendall(b"Are you talking to me?")
server.close()
server.close()

Ausführungsergebnis



$ python tcp_server.py
Starting the server at 2020-02-01 10:16:53.333266
Waiting for a client to call.
At 2020-02-01 10:16:57.520042 <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 6789), raddr=('127.0.0.1', 49223)> said b'Hey!'


$ python tcp_client.py
Starting the server at 2020-02-01 10:15:25.298030
At 2020-02-01 10:15:25.301961 someone replied b''


11.2.5 ZeroMQ

--ZeroMQ ist eine Bibliothek, wird aber manchmal als erweiterter Socket bezeichnet.

zmq_server.py


import zmq

host="127.0.0.1"
port=6789
#Context()Erstellt ein ZeroMQ-Objekt, das den Status verwaltet
context=zmq.Context()
#Der Server gibt eine Antwort-REP aus.
server=context.socket(zmq.REP)
#Ich möchte, dass der Server auf eine bestimmte IP-Adresse und einen bestimmten Pod wartet.
server.bind("tcp://%s:%s" %(host, port))
while True:
#wait for next request from recv()
    request_bytes=server.recv()
    request_str=request_bytes.decode("utf-8")
    print("That voice in my head says: %s" %request_str)
    reply_str="Stop saying:%s" %request_str
    reply_bytes=bytes(reply_str, "utf-8")
    server.send(reply_bytes)

zmq_client.py


import zmq

host="127.0.0.1"
port=6789

context=zmq.Context()
#Der Client sendet eine Anforderungsanforderung an den Server.
client=context.socket(zmq.REQ)
#bind()Nicht verbunden()verwenden.
client.connect("tcp://%s:%s" %(host, port))
for num in range(1,6):
#wait for next request from recv()
    request_str="message #%s" % num
    request_bytes=request_str.encode("utf-8")
    client.send(request_bytes)
    reply_bytes=client.recv()
    reply_str=reply_bytes.decode("utf-8")
    print("Sent %s, received %s" % (request_str, reply_str))

Ausführungsergebnis



$ python zmq_client.py
That voice in my head says: message #1
Sent message #1, received Stop saying:message #1
That voice in my head says: message #2
Sent message #2, received Stop saying:message #2
That voice in my head says: message #3
Sent message #3, received Stop saying:message #3
That voice in my head says: message #4
Sent message #4, received Stop saying:message #4
That voice in my head says: message #5
Sent message #5, received Stop saying:message #5


python zmq_server.py &
[2] 47417
$ Traceback (most recent call last):
  File "zmq_server.py", line 8, in <module>
    server.bind("tcp://%s:%s" %(host, port))
  File "zmq/backend/cython/socket.pyx", line 550, in zmq.backend.cython.socket.Socket.bind
  File "zmq/backend/cython/checkrc.pxd", line 25, in zmq.backend.cython.checkrc._check_rc
zmq.error.ZMQError: Address already in use

11.2.7 Internetdienste

11.2.7.1 DNS


>>> import socket
>>> socket.gethostbyname("www.crappytaxidermy.com")
`66.6.44.4`
>>> socket.gethostbyname_ex("www.crappytaxidermy.com")
(`crappytaxidermy.com`, [`www.crappytaxidermy.com`], [`66.6.44.4`])

>>> socket.getaddrinfo("www.crappytaxidermy.com",80)
[(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_DGRAM: 2>, 17, ``, (`66.6.44.4`, 80)), (<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, ``, (`66.6.44.4`, 80))]
>>> socket.getaddrinfo("www.crappytaxidermy.com",80,socket.AF_INET,
... socket.SOCK_STREAM)
[(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, ``, (`66.6.44.4`, 80))]

>>> import socket
>>> socket.getservbyname("http")
80
>>> socket.getservbyport(80)
`http`


11.2.9 Fernverarbeitung

11.2.9.1 RPC

xmlrpc_server.py



from xmlrpc.server import SimpleXMLRPCServer

def double(num):
    return num*2

server=SimpleXMLRPCServer(("localhost",6666))
server.register_function(double,"double")
server.serve_forever()

xmlrpc_client.py


import xmlrpc.client

proxy=xmlrpc.client.ServerProxy("http://localhost:6666/")
num=7
result=proxy.double(num)
print("Double %s is %s" % (num, result))

Ausführungsergebnis



$ python xmlrpc_client.py
Double 7 is 14

$ python xmlrpc_server.py
127.0.0.1 - - [01/Feb/2020 14:54:50] "POST / HTTP/1.1" 200 -

msgpack_server.py


from msgpackrpc import Server, Address

class Services():
    def double(self, num):
        return num*2

server =Server(Services())
server.listen(Address("localhost",5555))
server.start()


msgpack_client.py


from msgpackrpc import Client,Address

client=Client(Address("localhost",5555))
num=8
result=client.call("double",num)
print("Double %s is %s" % (num, result))


Ausführungsergebnis


$ python msppack_client.py
Double 8 is 16

11.3 Zuordnung überprüfen

11-1 Implementieren Sie den aktuellen Dienst mithilfe eines einfachen Sockets. Wenn der Client die Zeichenfolgenzeit an den Server sendet, gibt er das aktuelle Datum und die aktuelle Uhrzeit in ISO-Zeichen zurück.

udp_time_server.py


import socket
from datetime import datetime

server_address=("localhost",6111)
max_size=4096

print("Starting the client at", datetime.now())
server=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(server_address)

while True:
    data, client_addr=server.recvfrom(max_size)
    if data == b"time":
        now=str(datetime.utcnow())
        data=now.encode("utf-8")
        server.sendto(data, client_addr)
        print("Server sent", data)
server.close()


udp_time_client.py


import socket
from datetime import datetime
from time import sleep

address=("localhost",6111)
max_size=4096

print("Starting the client at", datetime.now())
client=socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
while True:
    sleep(5)
    client.sendto(b"time", address)
    data, server_addr=client.recvfrom(max_size)
    print("Client read", data)
client.close()

Ausführungsergebnis


$ python udp_time_server.py
Starting the client at 2020-02-01 17:11:51.527771
Server sent b'2020-02-01 08:11:59.365796'
Server sent b'2020-02-01 08:12:04.370495'
Server sent b'2020-02-01 08:12:09.371627'

$ python udp_time_client.py
Starting the client at 2020-02-01 17:10:03.510044
Client read b'2020-02-01 08:10:08.514726'
Client read b'2020-02-01 08:10:13.521450'
Client read b'2020-02-01 08:10:18.527667'
Client read b'2020-02-01 08:10:23.529492'
Client read b'2020-02-01 08:10:28.531994'
Client read b'2020-02-01 08:10:33.535134'
Client read b'2020-02-01 08:10:38.541067'


11-2 Machen wir dasselbe mit den REQ- und REP-Sockets von ZeroMQ.

zmq_time_server.py



import zmq
from datetime import datetime

host="127.0.0.1"
port=1111

context=zmq.Context()
server=context.socket(zmq.REP)
server.bind("tcp://%s:%s" % (host, port))
print("Server started at", datetime.utcnow())

while True:
    message=server.recv()
    if message == b"time":
        now=datetime.utcnow()
        reply=str(now)
        server.send(bytes(reply,"utf-8"))
        print("Server sent", reply)

zmq_time_client.py


import zmq
from datetime import datetime
from time import sleep

host="127.0.0.1"
port=1111

context=zmq.Context()
server=context.socket(zmq.REQ)
server.bind("tcp://%s:%s" % (host, port))
print("Client started at", datetime.utcnow())

while True:
    sleep(5)
    requst=b"time"
    client.send(request)
    reply=client.recv()
    print("Client sent", reply)

Ausführungsergebnis



$ python zmq_time_server.py
Server started at 2020-02-01 08:27:16.448842

11-3 Machen wir dasselbe mit XMLRPC.

xmlrpc_time_server.py


from xmlrpc.server import SimpleXMLRPCServer
from datetime import datetime

def current_time():
    now = str(datetime.now())
    print('Server sent %s', now)
    return now

server = SimpleXMLRPCServer(("localhost", 6789))
server.register_function(current_time, "current_time")
server.serve_forever()


cmlrpc_time_client.py


import xmlrpc.client
from datetime import datetime
from time import sleep

proxy = xmlrpc.client.ServerProxy("http://localhost:6789/")
while True:
    sleep(3)
    result = proxy.current_time()
    print("Current time is %s" % result)

Ausführungsergebnis



$ python xmlrpc_time_server.py
Server sent %s 2020-02-01 17:44:06.341654
127.0.0.1 - - [01/Feb/2020 17:44:06] "POST / HTTP/1.1" 200 -
Server sent %s 2020-02-01 17:44:09.346517
127.0.0.1 - - [01/Feb/2020 17:44:09] "POST / HTTP/1.1" 200 -
Server sent %s 2020-02-01 17:44:12.352605
127.0.0.1 - - [01/Feb/2020 17:44:12] "POST / HTTP/1.1" 200 -

$ python cmlrpc_time_client.py
Current time is 2020-02-01 17:44:06.341654
Current time is 2020-02-01 17:44:09.346517
Current time is 2020-02-01 17:44:12.352605

11-4

redis_choc_supply.py


import redis
import random
from time import sleep

conn=redis.Redis()
varieties=["T","C","C","N"]
conveyor="Chocolates"

while True:
    seconds=random.random()
    sleep(seconds)
    piece=random.choice(varieties)
    conn.rpush(conveyor, piece)

redis_lucy.py


import redis
from datetime import datetime
from time import sleep

conn=redis.Redis()
timeout=10
conveyor="Chocolates"
while True:
    sleep(0.5)
    msg=conn.blpop(conveyor, timeout)
    remaining=conn.llen(conveyor)
    if msg:
        piece=msg[1]
        print("Lucy got a", piece, "at", datetime.utcnow(),
        ", only", remaining, "left")

Ausführungsergebnis


$ python redis_lucy.py
Lucy got a b'T' at 2020-02-01 09:05:54.780153 , only 116 left
Lucy got a b'N' at 2020-02-01 09:05:55.282109 , only 117 left
Lucy got a b'T' at 2020-02-01 09:05:55.783487 , only 117 left
Lucy got a b'N' at 2020-02-01 09:05:56.284971 , only 118 left
Lucy got a b'C' at 2020-02-01 09:05:56.787798 , only 118 left
Lucy got a b'T' at 2020-02-01 09:05:57.289434 , only 117 left
Lucy got a b'N' at 2020-02-01 09:05:57.794357 , only 118 left
Lucy got a b'C' at 2020-02-01 09:05:58.295897 , only 119 left
Lucy got a b'C' at 2020-02-01 09:05:58.800536 , only 119 left
Lucy got a b'C' at 2020-02-01 09:05:59.303087 , only 119 left
Lucy got a b'C' at 2020-02-01 09:05:59.805465 , only 119 left
Lucy got a b'C' at 2020-02-01 09:06:00.308003 , only 119 left
Lucy got a b'C' at 2020-02-01 09:06:00.810408 , only 120 left
Lucy got a b'C' at 2020-02-01 09:06:01.312918 , only 120 left
Lucy got a b'C' at 2020-02-01 09:06:01.818497 , only 119 left
Lucy got a b'N' at 2020-02-01 09:06:02.324028 , only 120 left
Lucy got a b'C' at 2020-02-01 09:06:02.826697 , only 119 left
Lucy got a b'T' at 2020-02-01 09:06:03.329229 , only 120 left
Lucy got a b'T' at 2020-02-01 09:06:03.835205 , only 120 left


11-5 Verwenden Sie ZeroMQ, um die Wörter in Vers 7.3 einzeln zu veröffentlichen. Schreiben Sie außerdem einen ZeroMQ-Abonnenten, der Wörter anzeigt, die mit einem Vokal beginnen, und einen anderen Abonnenten, der Wörter mit fünf Zeichen anzeigt.

poem_pub.py


import string
import zmq
from time import sleep

host="127.0.0.1"
port=9999
ctx=zmq.Context()
pub=ctx.socket(zmq.PUB)
pub.bind("tcp://%s:%s" % (host, port))
sleep(1)

with open("mammonth.txt","rt") as poem:
    words=poem.read()
for word in words.split():
    word=word.strip(string.punctuation)
    data=word.encode("utf-8")
    if word.startswith(("a", "e", "i", "u","o","A","E","I","U","O")):
        print("vowels",data)
        pub.send_multipart([b"vowels", data])
    if len(word) ==5:
        print("five",data)
        pub.send_multipart([b"five", data])

poem_sub.py


import string
import zmq

host="127.0.0.1"
port=9999
ctx=zmq.Context()
sub=ctx.socket(zmq.SUB)
sub.connect("tcp://%s:%s" % (host,port))
sub.setsockopt(zmq.SUBSCRIBE, b"vowels")
sub.setsockopt(zmq.SUBSCRIBE, b"five")

while True:
    topic, word=sub.recv_multipart()
    print(topic,word)

Ausführungsergebnis


#Erste paar Zeilen
$ python poem_pub.py
five b'queen'
vowels b'of'
five b'Lying'
vowels b'at'
vowels b'ease'
vowels b'evening'
five b'flies'


$ python poem_sub.py
b'five' b'queen'
b'vowels' b'of'
b'five' b'Lying'
b'vowels' b'at'
b'vowels' b'ease'
b'vowels' b'evening'
b'five' b'flies'
b'five' b'seize'

Impressionen

Es war eine ziemlich grobe Bewertung. Es war ein Volumen, aber kürzlich habe ich mich gefragt, ob ich eine Cloud wie AWS oder Open Stack verwenden werde, ohne selbst einen Server einrichten zu müssen. Da das Verständnis von Parallelverarbeitung und Mehrfachverarbeitung nicht eindeutig ist, frage ich mich, ob ich bei der Verwendung lernen muss.

Verweise

"Einführung in Python3 von Bill Lubanovic (veröffentlicht von O'Reilly Japan)"

"Warteschlange --- synchrone Warteschlangenklasse" https://docs.python.org/ja/3/library/queue.html#queue.Queue.task_done

"Threading --- Thread-basierte Parallelverarbeitung" https://docs.python.org/ja/3/library/threading.html

"Multiprocessing --- Prozessbasierte Parallelverarbeitung¶" https://docs.python.org/ja/3/library/multiprocessing.html#pipes-and-queues

「Redis」 http://redis.shibu.jp/commandreference/lists.html

Recommended Posts

[Einführung in Python3, Tag 22] Kapitel 11 Parallele Verarbeitung und Vernetzung (11.1 bis 11.3)
[Einführung in Python3 Tag 12] Kapitel 6 Objekte und Klassen (6.3-6.15)
[Einführung in Python3 Tag 11] Kapitel 6 Objekte und Klassen (6.1-6.2)
[Einführung in Python3 Tag 1] Programmierung und Python
[Einführung in Python3 Tag 13] Kapitel 7 Zeichenfolgen (7.1-7.1.1.1)
[Einführung in Python3 Tag 14] Kapitel 7 Zeichenfolgen (7.1.1.1 bis 7.1.1.4)
[Einführung in Python3 Tag 15] Kapitel 7 Zeichenfolgen (7.1.2-7.1.2.2)
[Einführung in Python3 Tag 21] Kapitel 10 System (10.1 bis 10.5)
[Einführung in Python3, Tag 17] Kapitel 8 Datenziele (8.1-8.2.5)
[Einführung in Python3, Tag 17] Kapitel 8 Datenziele (8.3-8.3.6.1)
[Einführung in Python3 Tag 19] Kapitel 8 Datenziele (8.4-8.5)
[Einführung in Python3 Tag 18] Kapitel 8 Datenziele (8.3.6.2 bis 8.3.6.3)
[Einführung in Python3, Tag 23] Kapitel 12 Werden Sie Paisonista (12.1 bis 12.6)
[Einführung in Python3 Tag 20] Kapitel 9 Enträtseln des Webs (9.1-9.4)
[Einführung in Python3 Tag 8] Kapitel 4 Py Skin: Codestruktur (4.1-4.13)
[Einführung in Python3 Tag 3] Kapitel 2 Py-Komponenten: Numerische Werte, Zeichenfolgen, Variablen (2.2 bis 2.3.6)
[Einführung in Python3 Tag 2] Kapitel 2 Py-Komponenten: Numerische Werte, Zeichenfolgen, Variablen (2.1)
[Einführung in Python3 Tag 4] Kapitel 2 Py-Komponenten: Numerische Werte, Zeichenfolgen, Variablen (2.3.7 bis 2.4)
Einführung in die Überprüfung der Wirksamkeit Kapitel 1 in Python geschrieben
[Einführung in Python3 Tag 7] Kapitel 3 Py Tools: Listen, Taples, Wörterbücher, Mengen (3.3-3.8)
[Einführung in Python3 Tag 5] Kapitel 3 Py Tools: Listen, Taples, Wörterbücher, Sets (3.1-3.2.6)
[Einführung in Python3 Tag 10] Kapitel 5 Py's Cosmetic Box: Module, Pakete, Programme (5.4-5.7)
[Einführung in Python3 Tag 6] Kapitel 3 Py-Tool-Liste, Tapple, Wörterbuch, Set (3.2.7-3.2.19)
Einführung in die Überprüfung der Wirksamkeit Kapitel 3 in Python geschrieben
Einführung in die Python-Sprache
Einführung in OpenCV (Python) - (2)
[Einführung in die Udemy Python3 + -Anwendung] 64. Namespace und Gültigkeitsbereich
Einführung in die Überprüfung der Wirksamkeit Kapitel 2 in Python geschrieben
[Einführung in die Udemy Python3 + -Anwendung] 35. Vergleichsoperatoren und logische Operatoren
[Kapitel 5] Einführung in Python mit 100 Klopfen Sprachverarbeitung
[Kapitel 3] Einführung in Python mit 100 Klopfen Sprachverarbeitung
[Kapitel 2] Einführung in Python mit 100 Klopfen Sprachverarbeitung
[Einführung in die Udemy Python3 + -Anwendung] 68. Importanweisung und AS
[Technisches Buch] Einführung in die Datenanalyse mit Python -1 Kapitel Einführung-
[Kapitel 4] Einführung in Python mit 100 Klopfen Sprachverarbeitung
Einführung in Python Django (2) Win
Einführung in die serielle Kommunikation [Python]
[Einführung in Python] <Liste> [Bearbeiten: 22.02.2020]
Einführung in Python (Python-Version APG4b)
Eine Einführung in die Python-Programmierung
Einführung in Python For, While
[Einführung in cx_Oracle] (Teil 6) Zuordnung von DB- und Python-Datentypen
[Einführung in die Udemy Python3 + -Anwendung] 42. für Anweisung, break-Anweisung und continue-Anweisung
[Einführung in die Udemy Python3 + -Anwendung] 39. while-Anweisung, continue-Anweisung und break-Anweisung
[Einführung in die Udemy Python3 + -Anwendung] 36. Verwendung von In und Not
[Einführung in Data Scientists] Grundlagen von Python ♬ Funktionen und Klassen
[Einführung in Udemy Python3 + Application] 50. Positionsargumente, Schlüsselwortargumente und Standardargumente
Einführung in die Effektüberprüfung Schreiben der Kapitel 4 und 5 in Python
[Einführung in Python] Kombinieren Sie Nikkei-Durchschnitts- und NY Dow-CSV-Daten
[Einführung in die Udemy Python3 + -Anwendung] 58. Lambda
Python 3.6 unter Windows ... und zu Xamarin.
Einführung in die Python Numerical Calculation Library NumPy
Trainieren! !! Einführung in Python Type (Type Hints)
[Einführung in Python] <numpy ndarray> [edit: 2020/02/22]
[Einführung in die Udemy Python3 + -Anwendung] 57. Decorator
Einführung in Python Hands On Teil 1
[Einführung in Python] So analysieren Sie JSON
[Einführung in die Udemy Python3 + -Anwendung] 56. Abschluss
Einführung in Protobuf-c (C-Sprache ⇔ Python)
[Einführung in die Udemy Python3 + -Anwendung] 59. Generator
Hadoop-Einführung und MapReduce mit Python