Source code for py_trees_ros_tutorials.mock.dock

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: BSD
#   https://github.com/splintered-reality/py_trees_ros_tutorials/raw/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################

"""
Mocks a docking controller
"""


##############################################################################
# Imports
##############################################################################

import argparse
import py_trees_ros.mock.actions
import py_trees_ros_interfaces.action as py_trees_actions
import rclpy
import sys

##############################################################################
# Class
##############################################################################


[docs]class Dock(py_trees_ros.mock.actions.GenericServer): """ Simple action server that docks/undocks depending on the instructions in the goal requests. Node Name: * **docking_controller** Action Servers: * **/dock** (:class:`py_trees_ros_interfaces.action.Dock`) * docking/undocking control Args: duration: mocked duration of a successful docking/undocking action """ def __init__(self, duration: float=2.0): super().__init__( node_name="docking_controller", action_name="dock", action_type=py_trees_actions.Dock, generate_feedback_message=self.generate_feedback_message, goal_received_callback=self.goal_received_callback, duration=duration )
[docs] def goal_received_callback(self, goal): """ Set the title of the action depending on whether a docking or undocking action was requestions ('Dock'/'UnDock') """ if goal.dock: self.title = "Dock" else: self.title = "UnDock"
[docs] def generate_feedback_message(self) -> py_trees_actions.Dock.Feedback: """ Create a feedback message that populates the percent completed. Returns: :class:`py_trees_actions.Dock_Feedback`: the populated feedback message """ msg = py_trees_actions.Dock.Feedback( percentage_completed=self.percent_completed ) return msg
[docs]def main(): """ Entry point for the mocked docking controller. """ parser = argparse.ArgumentParser(description='Mock a docking controller') command_line_args = rclpy.utilities.remove_ros_args(args=sys.argv)[1:] parser.parse_args(command_line_args) rclpy.init() # picks up sys.argv automagically internally docking = Dock() executor = rclpy.executors.MultiThreadedExecutor(num_threads=4) executor.add_node(docking.node) try: executor.spin() except KeyboardInterrupt: docking.abort() # caveat: often broken, whether with spin_once multiple times or this, the # usual mysterious: # The following exception was never retrieved: PyCapsule_GetPointer # called with invalid PyCapsule object executor.shutdown() # finishes all remaining work and exits docking.shutdown() rclpy.shutdown()