Je voulais communiquer en série avec Raspberry Pi, j'ai donc essayé la communication en utilisant la conversion série USB. J'ai fait référence à "J'ai essayé la communication série avec Raspberry Pi". Un timeout de réception a été ajouté au traitement de réception en supposant que le traitement sera effectué dans l'ordre émission de commande → réception de commande.
$ pipenv install pyserial
J'ai utilisé le câble de conversion série USB FT232. S'il est équipé de la puce FT232, il semble être reconnu sans ajout de pilote.
Vérifiez la destination de la connexion avec la commande suivante.
$ ls -la /dev/ttyUSB*
S'il s'agit de la première unité, la destination de la connexion sera «/ dev / ttyUSB0».
Remarque: Lorsque vous communiquez avec Rasberry Pi sur un PC (Windows / Mac / Linux), utilisez un câble croisé. Comme Rasberry Pi est également un PC, il existe un câble croisé entre les PC et la connexion avec les appareils de communication est directe (souvent).
sample.py
# -*- coding: utf-8 -*-
import serial
import time
import threading
"""
Classe de communication série
"""
class SampleComm:
#Initialisation
def __init__(self):
#Drapeau ouvert
self.isPortOpen = False
#données reçues
self.recvData = bytearray()
#Génération d'événements
self.event = threading.Event()
#En attente de réception des données(Avec timeout[sec])
def recv(self, timeout=3):
#Obtenez du temps pour le timeout
time_start = time.time()
time_end = time_start
#Effacer l'événement de thread en attente
self.event.clear()
#Effacer les données reçues
self.recvData.clear()
#Résultat reçu Vrai:Succès Faux:Échec(temps libre)
result = False
#En attente de réception des données
while not self.event.is_set():
#Vérification du délai d'attente
time_end = time.time()
if (time_end - time_start > timeout):
#La transmission / réception de données s'est arrêtée et a échoué(temps libre)À
result = False
self.stop()
print("timeout:{0}sec".format(timeout))
break
#Lire les données reçues
buff = self.comm.read()
#Jugement des données reçues
if len(buff) > 0:
#Ajouter les données reçues
self.recvData.extend(buff)
# (Temporaire)Succès si \ n a été reçu
if (self.recvData.find(b'\n')) >= 0:
#Arrêtez d'envoyer et de recevoir des données et réussissez
result = True
self.stop()
break
#Renvoie le résultat
return (result, self.recvData)
#Transmission de données
def send(self, data):
self.comm.write(data)
#Arrêter d'envoyer et de recevoir des données
def stop(self):
self.event.set()
#Cyril port ouvert
def open(self, tty, baud='115200'):
try:
self.comm = serial.Serial(tty, baud, timeout=0.1)
self.isPortOpen = True
except Exception as e:
self.isPortOpen = False
return self.isPortOpen
#Port série fermé(Fermer explicitement)
def close(self):
self.stop()
if (self.isPortOpen):
self.comm.close()
self.isPortOpen = False
if __name__ == "__main__":
#Série ouverte
comm = SampleComm()
comm.open('/dev/ttyUSB0', '115200')
#Transmission de données
comm.send('test'.encode())
#Réception de données(temps libre=10sec)
result, data = comm.recv(10)
print(result)
print(data)
#Fermer la série
comm.close();
Dans pySerial version 2.5 et supérieure, l'argument de write () est bytearray (). Si vous souhaitez envoyer une chaîne, effectuez l'encode () approprié. str.encode () vaut par défaut "utf-8".
#Série ouverte
comm = SampleComm()
comm.open('/dev/ttyUSB0', '115200')
#Transmission de données
comm.send('sample'.encode())
#Réception de données(temps libre=10sec)
result, data = comm.recv(10)
print(result)
print(data)
#Fermer la série
comm.close();
Vous pouvez également vérifier l'opération en exécutant sample.py.
$ python sample.py
La communication avec l'appareil est souvent traitée dans l'ordre de transmission de commande → réception de commande, et je l'ai faite parce que je voulais exécuter le processus de réception avec un timeout. Il est nécessaire de modifier le jugement d'achèvement de la réception, le cas échéant. J'espère que cela sera utile pour des cas similaires dans la communication série.
Recommended Posts