[PYTHON] Conférence ROS 113 Effectuer des tâches avec smach

environnement

Cet article fonctionne dans l'environnement suivant.

article valeur
CPU Core i5-8250U
Ubuntu 16.04
ROS Kinetic

Pour l'installation, reportez-vous à ROS Course 02 Installation. Le programme de cet article a également été téléchargé sur github. Veuillez vous référer à ROS Lecture 11 git Repository.

Aperçu

smach est une bibliothèque python pour exécuter des tâches de haut niveau basées sur des transitions d'état. Il existe également des fonctions pratiques telles que l'affichage de la transition d'état sur l'interface graphique. Je vais vous expliquer comment utiliser cette bibliothèque.

Installation

installation de smach


sudo apt install ros-kinetic-smach ros-kinetic-smach-viewer

Exemple 1 Utilisation de base

J'expliquerai l'utilisation de base.

Code source

La base de smach est l'État. Chaque état effectue un traitement (ʻexcute () ) et produit un résultat (ʻoutcomes). State Machine elle-même produit un résultat (ʻoutcomes). Cependant, StateMachine a un état à l'intérieur, et le résultat de cet état se connecte à un autre état ou produit le résultat. Lorsque ʻexecute () de StateMachine est exécuté, ʻexecute ()` de l'État enregistré en interne est exécuté dans l'ordre.

py_lecture/scripts/smach_simple1.py


#!/usr/bin/env python

import rospy
import smach

class State1(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done','exit'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE1')
        rospy.sleep(2.0)
        if self.counter < 3:
            self.counter += 1
            return 'done'
        else:
            return 'exit'

class State2(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done'])

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE2')
        rospy.sleep(2.0)
        return 'done'

def main():
    rospy.init_node('smach_somple1')

    sm_top = smach.StateMachine(outcomes=['succeeded'])
    with sm_top:
        smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'succeeded'})
        smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})

    outcome = sm_top.execute()

if __name__ == '__main__':
    main()

Courir

1er terminal


roscore

Deuxième terminal


rosrun py_lecture smach_simple1.py

Lorsque vous l'exécutez, vous pouvez voir que les états changent dans l'ordre comme indiqué ci-dessous.

production


[INFO] [1585988828.285383]: Executing state STATE1
[  INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988830.287671]: Executing state STATE2
[  INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988832.292678]: Executing state STATE1
[  INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988834.297569]: Executing state STATE2
[  INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988836.301891]: Executing state STATE1
[  INFO ] : State machine transitioning 'STATE1':'done'-->'STATE2'
[INFO] [1585988838.308503]: Executing state STATE2
[  INFO ] : State machine transitioning 'STATE2':'done'-->'STATE1'
[INFO] [1585988840.314152]: Executing state STATE1
[  INFO ] : State machine terminating 'STATE1':'exit':'succeeded'

Exemple 2 Hiérarchie et introspection de machine d'état

StateMachine peut avoir une structure hiérarchique. Vous pouvez également vérifier l'état de State Machine avec l'interface graphique en utilisant une fonction appelée Introspection.

Code source

Dans l'exemple précédent, StateMachine n'a ajouté que State, mais vous pouvez également ajouter StateMachine.

py_lecture/scripts/smach_simple2.py


#!/usr/bin/env python

import rospy
import smach
import smach_ros

class State1(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done','exit'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE1')
        rospy.sleep(2.0)
        if self.counter < 3:
            self.counter += 1
            return 'done'
        else:
            return 'exit'

class State2(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done'])

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE2')
        rospy.sleep(2.0)
        return 'done'

class State3(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['done'])

    def execute(self, userdata):
        rospy.loginfo('Executing state STATE2')
        rospy.sleep(2.0)
        return 'done'

# main
def main():
    rospy.init_node('smach_somple2')

    sm_top = smach.StateMachine(outcomes=['succeeded'])
    with sm_top:
        smach.StateMachine.add('STATE3', State3(), transitions={'done':'SUB'})

        sm_sub = smach.StateMachine(outcomes=['done'])
        with sm_sub:
            smach.StateMachine.add('STATE1', State1(), transitions={'done':'STATE2', 'exit':'done'})
            smach.StateMachine.add('STATE2', State2(), transitions={'done':'STATE1'})

        smach.StateMachine.add('SUB', sm_sub, transitions={'done':'succeeded'})

    sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
    sis.start()
    outcome = sm_top.execute()
    rospy.spin()
    sis.stop()

if __name__ == '__main__':
    main()

Courir

1er terminal


roscore

Deuxième terminal


rosrun py_lecture smach_simple1.py

Troisième terminal


 rosrun smach_viewer smach_viewer.py 

L'écran suivant s'affiche.

smach2.gif

Exemple 3 Exécution de tâches complexes

Il est nécessaire d'instruire d'autres nœuds ROS à partir du nœud ROS du smach pour effectuer des tâches réalistes. Ici, actionlib est utilisé pour envoyer une commande à move_base. De plus, lorsque la tâche proprement dite est exécutée, il est nécessaire de pouvoir envoyer une annulation par une instruction externe en cours de route.

Code source

py_lecture/scripts/smach_task.py


#!/usr/bin/env python

import rospy
import smach
import smach_ros
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from std_msgs.msg import Empty

# define state Foo
class MoveBaseState(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1','outcome2'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if self.counter < 3:
            self.counter += 1
            return 'outcome1'
        else:
            return 'outcome2'


# define state Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        return 'outcome1'

def child_term_cb(outcome_map):
    print("child_term_cb")
    if outcome_map['MOVE2'] == 'succeeded':
        return True
    if outcome_map['MONITOR2']:
        return True
    return False

def out_cb(outcome_map):
    print("out_cb")
    if outcome_map['MOVE2'] == 'succeeded':
        return 'done'
    else:
        return 'exit'

def monitor_cb(ud, msg):
    print("monitor_cb")
    return False

def main():
    rospy.init_node('smach_example_state_machine')

    # Create the top level SMACH state machine
    sm_top = smach.StateMachine(outcomes=['done', 'exit'])
    
    # Open the container
    with sm_top:
        goal1=MoveBaseGoal()
        goal1.target_pose.header.frame_id = "dtw_robot1/map"
        goal1.target_pose.pose.position.x = 0.5
        goal1.target_pose.pose.orientation.w = 1.0
        smach.StateMachine.add('MOVE1', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal1), transitions={'succeeded':'TASK2', 'preempted':'done', 'aborted':'done'})

        task2_concurrence = smach.Concurrence(outcomes=['done', 'exit'], default_outcome='done', child_termination_cb = child_term_cb, outcome_cb = out_cb)
        with task2_concurrence:
            goal2=MoveBaseGoal()
            goal2.target_pose.header.frame_id = "dtw_robot1/map"
            goal2.target_pose.pose.position.x = -0.5
            goal2.target_pose.pose.orientation.w = 1.0
            smach.Concurrence.add('MOVE2', smach_ros.SimpleActionState('/dtw_robot1/move_base', MoveBaseAction, goal=goal2))
            smach.Concurrence.add('MONITOR2', smach_ros.MonitorState("/sm_stop", Empty, monitor_cb))
        smach.StateMachine.add('TASK2', task2_concurrence, transitions={'done':'done', 'exit':'exit'}) 



    # Execute SMACH plan
    sis = smach_ros.IntrospectionServer('smach_server', sm_top, '/SM_ROOT')
    sis.start()
    outcome = sm_top.execute()
    rospy.spin()
    sis.stop()


if __name__ == '__main__':
    main()

Courir

1er terminal


roslaunch nav_lecture move_base.launch 

Deuxième terminal


rosrun smach_viewer smach_viewer.py 

Troisième terminal


rosrun py_lecture smach_task.py

Si vous exécutez normalement ce qui précède, les deux tâches TASK1 et TASK2 seront exécutées dans l'ordre indiqué ci-dessous.

smach3_1.gif

Si vous tapez la commande suivante pendant l'exécution de TASK2, TASK2 sera arrêté.

4ème terminal


rostopic pub -1 /sm_stop std_msgs/Empty "{}" 

smach3_2.gif

référence

smach tutorial

Lien vers la page du tableau

Lien vers la table des matières du cours ROS

Recommended Posts

Conférence ROS 113 Effectuer des tâches avec smach
Cours ROS 108 Utilisation de la base de données (mongo) avec ROS
ROS sécurisé avec AppArmor
Effectuer des opérations logiques à l'aide de Perceptron
Effectuer un fractionnement stratifié avec PyTorch
Cours ROS 105 Fonctionnement toio avec ROS