[PYTHON] ROS Lecture 113 Führen Sie Aufgaben mit smach aus

Umgebung

Dieser Artikel funktioniert in der folgenden Umgebung.

Artikel Wert
CPU Core i5-8250U
Ubuntu 16.04
ROS Kinetic

Informationen zur Installation finden Sie unter ROS-Kurs 02-Installation. Das Programm in diesem Artikel wurde ebenfalls auf github hochgeladen. Weitere Informationen finden Sie im ROS Lecture 11 Git Repository.

Überblick

smach ist eine Python-Bibliothek zum Ausführen von Aufgaben auf hoher Ebene basierend auf Statusübergängen. Es gibt auch praktische Funktionen wie die Anzeige des Statusübergangs auf der GUI. Ich werde erklären, wie man diese Bibliothek benutzt.

Installation

Installation von Smach


sudo apt install ros-kinetic-smach ros-kinetic-smach-viewer

Beispiel 1 Grundlegende Verwendung

Ich werde die grundlegende Verwendung erklären.

Quellcode

Die Basis von Smach ist State. Jeder Staat führt die Verarbeitung durch (excute ()) und erzeugt ein Ergebnis (results). StateMachine selbst erzeugt ein Ergebnis ("Ergebnisse"). StateMachine enthält jedoch einen Status, und das Ergebnis dieses Status stellt eine Verbindung zum Übergang in einen anderen Status her oder gibt das Ergebnis aus. Wenn "execute ()" der Zustandsmaschine ausgeführt wird, wird "execute ()" des intern registrierten Zustands der Reihe nach ausgeführt.

py_lecture/scripts/smach_simple1.py


#!/usr/bin/env python

import rospy
import smach

class State1(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done','exit'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE1')
        rospy.sleep(2.0)
        if self.counter < 3:
            self.counter += 1
            return 'done'
        else:
            return 'exit'

class State2(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done'])

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE2')
        rospy.sleep(2.0)
        return 'done'

def main():
    rospy.init_node('smach_somple1')

    sm_top = smach.StateMachine(outcomes=['succeeded'])
    with sm_top:
        smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'succeeded'})
        smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})

    outcome = sm_top.execute()

if __name__ == '__main__':
    main()

Lauf

1. Terminal


roscore

Zweites Terminal


rosrun py_lecture smach_simple1.py

Wenn Sie es ausführen, können Sie sehen, dass sich die Zustände in der unten gezeigten Reihenfolge ändern.

Ausgabe


[INFO] [1585988828.285383]: Executing state STATE1
[  INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988830.287671]: Executing state STATE2
[  INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988832.292678]: Executing state STATE1
[  INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988834.297569]: Executing state STATE2
[  INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988836.301891]: Executing state STATE1
[  INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988838.308503]: Executing state STATE2
[  INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988840.314152]: Executing state STATE1
[  INFO ] : State machine terminating 'STATE1':'exit':'succeeded'

Beispiel 2 Hierarchie und Selbstbeobachtung der Zustandsmaschine

StateMachine kann eine hierarchische Struktur haben. Sie können den Status von State Machine auch über die GUI überprüfen, indem Sie eine Funktion namens Introspection verwenden.

Quellcode

Im vorherigen Beispiel hat StateMachine nur State hinzugefügt, Sie können jedoch auch StateMachine hinzufügen.

py_lecture/scripts/smach_simple2.py


#!/usr/bin/env python

import rospy
import smach
import smach_ros

class State1(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done','exit'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE1')
        rospy.sleep(2.0)
        if self.counter < 3:
            self.counter += 1
            return 'done'
        else:
            return 'exit'

class State2(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done'])

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE2')
        rospy.sleep(2.0)
        return 'done'

class State3(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done'])

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE2')
        rospy.sleep(2.0)
        return 'done'

# main
def main():
    rospy.init_node('smach_somple2')

    sm_top = smach.StateMachine(outcomes=['succeeded'])
    with sm_top:
        smach.StateMachine.add('STATE3', State3(), transitions={'done':'SUB'})

        sm_sub = smach.StateMachine(outcomes=['done'])
        with sm_sub:
            smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'done'})
            smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})

        smach.StateMachine.add('SUB', sm_sub, transitions={'done':'succeeded'})

    sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
    sis.start()
    outcome = sm_top.execute()
    rospy.spin()
    sis.stop()

if __name__ == '__main__':
    main()

Lauf

1. Terminal


roscore

Zweites Terminal


rosrun py_lecture smach_simple1.py

Drittes Terminal


 rosrun smach_viewer smach_viewer.py 

Der folgende Bildschirm wird angezeigt.

smach2.gif

Beispiel 3 Ausführen komplexer Aufgaben

Es ist notwendig, andere ROS-Knoten vom ROS-Knoten des Smach anzuweisen, realistische Aufgaben auszuführen. Hier wird actionlib verwendet, um einen Befehl an move_base zu senden. Wenn die eigentliche Aufgabe ausgeführt wird, ist es außerdem erforderlich, unterwegs eine Stornierung durch eine externe Anweisung senden zu können.

Quellcode

py_lecture/scripts/smach_task.py


#!/usr/bin/env python

import rospy
import smach
import smach_ros
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from std_msgs.msg import Empty

# define state Foo
class MoveBaseState(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1','outcome2'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if self.counter < 3:
            self.counter += 1
            return 'outcome1'
        else:
            return 'outcome2'


# define state Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        return 'outcome1'

def child_term_cb(outcome_map):
    print("child_term_cb")
    if outcome_map['MOVE2'] == 'succeeded':
        return True
    if outcome_map['MONITOR2']:
        return True
    return False

def out_cb(outcome_map):
    print("out_cb")
    if outcome_map['MOVE2'] == 'succeeded':
        return 'done'
    else:
        return 'exit'

def monitor_cb(ud, msg):
    print("monitor_cb")
    return False

def main():
    rospy.init_node('smach_example_state_machine')

    # Create the top level SMACH state machine
    sm_top = smach.StateMachine(outcomes=['done', 'exit'])
    
    # Open the container
    with sm_top:
        goal1=MoveBaseGoal()
        goal1.target_pose.header.frame_id = "dtw_robot1/map"
        goal1.target_pose.pose.position.x = 0.5
        goal1.target_pose.pose.orientation.w = 1.0
        smach.StateMachine.add('MOVE1', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal1), transitions={'succeeded':'TASK2', 'preempted':'done', 'aborted':'done'})

        task2_concurrence = smach.Concurrence(outcomes=['done', 'exit'], default_outcome='done', child_termination_cb = child_term_cb, outcome_cb = out_cb)
        with task2_concurrence:
            goal2=MoveBaseGoal()
            goal2.target_pose.header.frame_id = "dtw_robot1/map"
            goal2.target_pose.pose.position.x = -0.5
            goal2.target_pose.pose.orientation.w = 1.0
            smach.Concurrence.add('MOVE2', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal2))
            smach.Concurrence.add('MONITOR2', smach_ros.MonitorState("/sm_stop", Empty, monitor_cb))
        smach.StateMachine.add('TASK2', task2_concurrence, transitions={'done':'done', 'exit':'exit'}) 



    # Execute SMACH plan
    sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
    sis.start()
    outcome = sm_top.execute()
    rospy.spin()
    sis.stop()


if __name__ == '__main__':
    main()

Lauf

1. Terminal


roslaunch nav_lecture move_base.launch 

Zweites Terminal


rosrun smach_viewer smach_viewer.py 

Drittes Terminal


rosrun py_lecture smach_task.py

Wenn Sie die obigen Aufgaben normal ausführen, werden die beiden Aufgaben TASK1 und TASK2 wie unten gezeigt nacheinander ausgeführt.

smach3_1.gif

Wenn Sie den folgenden Befehl eingeben, während TASK2 ausgeführt wird, wird TASK2 gestoppt.

4. Terminal


rostopic pub -1 /sm_stop std_msgs/Empty "{}" 

smach3_2.gif

Referenz

smach tutorial

Link zur Tabellenseite

Link zum Inhaltsverzeichnis des ROS-Kurses

Recommended Posts

ROS Lecture 113 Führen Sie Aufgaben mit smach aus
Sichern Sie ROS mit AppArmor
Führen Sie logische Operationen mit Perceptron aus
Führen Sie mit PyTorch eine geschichtete Aufteilung durch
ROS Kurs 105 Betriebstio mit ROS