Wenn Sie sich auf die offizielle Tello-Python beziehen, können Sie das Steuerungsprogramm anscheinend problemlos mit Python anwenden, aber es empfängt das Kamerabild. Ich war süchtig danach, also mach dir eine Notiz.
Das Repository sagt, dass Sie einen h264-Decoder erstellen und verwenden können, aber ich habe verschiedene Dinge in meiner Mac-Umgebung ausprobiert, aber ich konnte ihn nicht gut erstellen. Insbesondere in der Python3-Umgebung muss der Build anscheinend geändert werden. (Ergänzung) Führen Sie "Tello_Video" in Python 3 aus
Toy Drone Tello auf Mac bedienen und Kamera-Video mit OpenCV empfangen Im obigen Artikel wird der Stream formatiert und dann mit opencv dekodiert. Der Artikel besagt, dass das direkte Dekodieren des Streams voller Rauschen ist, aber als ich es so versuchte, wie es war, konnte ich es in meiner Umgebung normal dekodieren. Als ich danach verschiedene h264-Decoder-Builds ausprobierte, konnte ich beim Aktualisieren des Betriebssystems und des xcodes nicht mehr mit opencv dekodieren.
Ich habe die Ursache nicht verstanden, aber am Ende stellte ich fest, dass opencv-python nicht ausreicht und dass es durch die Installation von opencv-contrib-python dekodiert werden kann. (Gibt es irgendetwas richtig, um h264 zu dekodieren?)
Ich bin nicht sicher, ob es in anderen Umgebungen funktioniert, aber wenn der h264-Decoder-Build nicht funktioniert, können Sie es versuchen. Diese Methode scheint die einfachste zu sein, wenn Sie mit Python ausführen möchten.
Das Folgende ist die Änderung von Tello_Video / tello.py von Tello-Python auf die opencv-Version.
import socket
import threading
import time
import numpy as np
import cv2
class Tello:
"""Wrapper class to interact with the Tello drone with opencv."""
def __init__(self, local_ip='', local_port=8889, imperial=False, command_timeout=.3, tello_ip='192.168.10.1',
tello_port=8889):
"""
Binds to the local IP/port and puts the Tello into command mode.
:param local_ip (str): Local IP address to bind.
:param local_port (int): Local port to bind.
:param imperial (bool): If True, speed is MPH and distance is feet.
If False, speed is KPH and distance is meters.
:param command_timeout (int|float): Number of seconds to wait for a response to a command.
:param tello_ip (str): Tello IP.
:param tello_port (int): Tello port.
"""
self.abort_flag = False
self.command_timeout = command_timeout
self.imperial = imperial
self.response = None
self.frame = None # numpy array BGR -- current camera output frame
self.is_freeze = False # freeze current camera output
self.last_frame = None
self.socket = socket.socket(
socket.AF_INET, socket.SOCK_DGRAM) # socket for sending cmd
self.tello_address = (tello_ip, tello_port)
self.last_height = 0
self.socket.bind((local_ip, local_port))
# thread for receiving cmd ack
self.receive_thread = threading.Thread(target=self._receive_thread)
self.receive_thread.daemon = True
self.receive_thread.start()
# to receive video -- send cmd: command, streamon
self.socket.sendto(b'command', self.tello_address)
print('sent: command')
self.socket.sendto(b'streamon', self.tello_address)
print('sent: streamon')
# for opencv
self.VS_UDP_IP = '0.0.0.0'
self.VS_UDP_PORT = 11111
self.udp_video_address = 'udp://@' + self.VS_UDP_IP + ':' + str(self.VS_UDP_PORT)
self.cap = cv2.VideoCapture(self.udp_video_address)
# thread for receiving video
self.receive_video_thread = threading.Thread(
target=self._receive_video_thread)
self.receive_video_thread.daemon = True
self.receive_video_thread.start()
def __del__(self):
"""Closes the local socket."""
self.socket.close()
self.cap.release()
def read(self):
"""Return the last frame from camera."""
if self.is_freeze:
return self.last_frame
else:
return self.frame
def video_freeze(self, is_freeze=True):
"""Pause video output -- set is_freeze to True"""
self.is_freeze = is_freeze
if is_freeze:
self.last_frame = self.frame
def _receive_thread(self):
"""Listen to responses from the Tello.
Runs as a thread, sets self.response to whatever the Tello last returned.
"""
while True:
try:
self.response, ip = self.socket.recvfrom(3000)
# print(self.response)
except socket.error as exc:
print("Caught exception socket.error : %s" % exc)
def _receive_video_thread(self):
"""
Listens for video streaming (raw h264) from the Tello with opencv.
Runs as a thread, sets self.frame to the most recent frame Tello captured.
"""
while True:
ret, self.frame = self.cap.read()
def send_command(self, command):
"""
Send a command to the Tello and wait for a response.
:param command: Command to send.
:return (str): Response from Tello.
"""
print(">> send cmd: {}".format(command))
self.abort_flag = False
timer = threading.Timer(self.command_timeout, self.set_abort_flag)
self.socket.sendto(command.encode('utf-8'), self.tello_address)
timer.start()
while self.response is None:
if self.abort_flag is True:
break
timer.cancel()
if self.response is None:
response = 'none_response'
else:
response = self.response.decode('utf-8')
self.response = None
return response
def set_abort_flag(self):
"""
Sets self.abort_flag to True.
Used by the timer in Tello.send_command() to indicate to that a response
timeout has occurred.
"""
self.abort_flag = True
def takeoff(self):
"""
Initiates take-off.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('takeoff')
def set_speed(self, speed):
"""
Sets speed.
This method expects KPH or MPH. The Tello API expects speeds from
1 to 100 centimeters/second.
Metric: .1 to 3.6 KPH
Imperial: .1 to 2.2 MPH
Args:
speed (int|float): Speed.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
speed = float(speed)
if self.imperial is True:
speed = int(round(speed * 44.704))
else:
speed = int(round(speed * 27.7778))
return self.send_command('speed %s' % speed)
def rotate_cw(self, degrees):
"""
Rotates clockwise.
Args:
degrees (int): Degrees to rotate, 1 to 360.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('cw %s' % degrees)
def rotate_ccw(self, degrees):
"""
Rotates counter-clockwise.
Args:
degrees (int): Degrees to rotate, 1 to 360.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('ccw %s' % degrees)
def flip(self, direction):
"""
Flips.
Args:
direction (str): Direction to flip, 'l', 'r', 'f', 'b'.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('flip %s' % direction)
def get_response(self):
"""
Returns response of tello.
Returns:
int: response of tello.
"""
response = self.response
return response
def get_height(self):
"""Returns height(dm) of tello.
Returns:
int: Height(dm) of tello.
"""
height = self.send_command('height?')
height = str(height)
height = filter(str.isdigit, height)
try:
height = int(height)
self.last_height = height
except:
height = self.last_height
pass
return height
def get_battery(self):
"""Returns percent battery life remaining.
Returns:
int: Percent battery life remaining.
"""
battery = self.send_command('battery?')
try:
battery = int(battery)
except:
pass
return battery
def get_flight_time(self):
"""Returns the number of seconds elapsed during flight.
Returns:
int: Seconds elapsed during flight.
"""
flight_time = self.send_command('time?')
try:
flight_time = int(flight_time)
except:
pass
return flight_time
def get_speed(self):
"""Returns the current speed.
Returns:
int: Current speed in KPH or MPH.
"""
speed = self.send_command('speed?')
try:
speed = float(speed)
if self.imperial is True:
speed = round((speed / 44.704), 1)
else:
speed = round((speed / 27.7778), 1)
except:
pass
return speed
def land(self):
"""Initiates landing.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.send_command('land')
def move(self, direction, distance):
"""Moves in a direction for a distance.
This method expects meters or feet. The Tello API expects distances
from 20 to 500 centimeters.
Metric: .02 to 5 meters
Imperial: .7 to 16.4 feet
Args:
direction (str): Direction to move, 'forward', 'back', 'right' or 'left'.
distance (int|float): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
distance = float(distance)
if self.imperial is True:
distance = int(round(distance * 30.48))
else:
distance = int(round(distance * 100))
return self.send_command('%s %s' % (direction, distance))
def move_backward(self, distance):
"""Moves backward for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('back', distance)
def move_down(self, distance):
"""Moves down for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('down', distance)
def move_forward(self, distance):
"""Moves forward for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('forward', distance)
def move_left(self, distance):
"""Moves left for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('left', distance)
def move_right(self, distance):
"""Moves right for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
"""
return self.move('right', distance)
def move_up(self, distance):
"""Moves up for a distance.
See comments for Tello.move().
Args:
distance (int): Distance to move.
Returns:
str: Response from Tello, 'OK' or 'FALSE'.
"""
return self.move('up', distance)
Recommended Posts