Source code for py_trees_ros_tutorials.behaviours

#
# License: BSD
#   https://github.com/splintered-reality/py_trees_ros_tutorials/raw/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################

"""
Behaviours for the tutorials.
"""

##############################################################################
# Imports
##############################################################################

import py_trees
import py_trees_ros
import rcl_interfaces.msg as rcl_msgs
import rcl_interfaces.srv as rcl_srvs
import rclpy
import std_msgs.msg as std_msgs

##############################################################################
# Behaviours
##############################################################################


[docs]class FlashLedStrip(py_trees.behaviour.Behaviour): """ This behaviour simply shoots a command off to the LEDStrip to flash a certain colour and returns :attr:`~py_trees.common.Status.RUNNING`. Note that this behaviour will never return with :attr:`~py_trees.common.Status.SUCCESS` but will send a clearing command to the LEDStrip if it is cancelled or interrupted by a higher priority behaviour. Publishers: * **/led_strip/command** (:class:`std_msgs.msg.String`) * colourised string command for the led strip ['red', 'green', 'blue'] Args: name: name of the behaviour topic_name : name of the battery state topic colour: colour to flash ['red', 'green', blue'] """ def __init__( self, name: str, topic_name: str="/led_strip/command", colour: str="red" ): super(FlashLedStrip, self).__init__(name=name) self.topic_name = topic_name self.colour = colour
[docs] def setup(self, **kwargs): """ Setup the publisher which will stream commands to the mock robot. Args: **kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree Raises: :class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs """ self.logger.debug("{}.setup()".format(self.qualified_name)) try: self.node = kwargs['node'] except KeyError as e: error_message = "didn't find 'node' in setup's kwargs [{}][{}]".format(self.qualified_name) raise KeyError(error_message) from e # 'direct cause' traceability self.publisher = self.node.create_publisher( msg_type=std_msgs.String, topic=self.topic_name, qos_profile=py_trees_ros.utilities.qos_profile_latched() ) self.feedback_message = "publisher created"
[docs] def update(self) -> py_trees.common.Status: """ Annoy the led strip to keep firing every time it ticks over (the led strip will clear itself if no command is forthcoming within a certain period of time). This behaviour will only finish if it is terminated or priority interrupted from above. Returns: Always returns :attr:`~py_trees.common.Status.RUNNING` """ self.logger.debug("%s.update()" % self.__class__.__name__) self.publisher.publish(std_msgs.String(data=self.colour)) self.feedback_message = "flashing {0}".format(self.colour) return py_trees.common.Status.RUNNING
[docs] def terminate(self, new_status: py_trees.common.Status): """ Shoot off a clearing command to the led strip. Args: new_status: the behaviour is transitioning to this new status """ self.logger.debug( "{}.terminate({})".format( self.qualified_name, "{}->{}".format(self.status, new_status) if self.status != new_status else "{}".format(new_status) ) ) self.publisher.publish(std_msgs.String(data="")) self.feedback_message = "cleared"
[docs]class ScanContext(py_trees.behaviour.Behaviour): """ Alludes to switching the context of the runtime system for a scanning action. Technically, it reaches out to the mock robots safety sensor dynamic parameter, switches it off in :meth:`initialise()` and maintains that for the the duration of the context before returning it to it's original value in :meth:`terminate()`. Args: name (:obj:`str`): name of the behaviour """ def __init__(self, name): super().__init__(name=name) self.cached_context = None
[docs] def setup(self, **kwargs): """ Setup the ros2 communications infrastructure. Args: **kwargs (:obj:`dict`): look for the 'node' object being passed down from the tree Raises: :class:`KeyError`: if a ros2 node isn't passed under the key 'node' in kwargs """ self.logger.debug("%s.setup()" % self.__class__.__name__) # ros2 node try: self.node = kwargs['node'] except KeyError as e: error_message = "didn't find 'node' in setup's kwargs [{}][{}]".format(self.qualified_name) raise KeyError(error_message) from e # 'direct cause' traceability # parameter service clients self.parameter_clients = { 'get_safety_sensors': self.node.create_client( rcl_srvs.GetParameters, '/safety_sensors/get_parameters' ), 'set_safety_sensors': self.node.create_client( rcl_srvs.SetParameters, '/safety_sensors/set_parameters' ) } for name, client in self.parameter_clients.items(): if not client.wait_for_service(timeout_sec=3.0): raise RuntimeError("client timed out waiting for server [{}]".format(name))
[docs] def initialise(self): """ Reset the cached context and trigger the chain of get/set parameter calls involved in changing the context. .. note:: Completing the chain of service calls here (with `rclpy.spin_until_future_complete(node, future)`) is not possible if this behaviour is encapsulated inside, e.g. a tree tick activated by a ros2 timer callback, since it is already part of a scheduled job in a spinning node. It will just deadlock. Prefer instead to chain a sequence of events that will be completed over a span of ticks instead of at best, blocking here and at worst, falling into deadlock. """ self.logger.debug("%s.initialise()" % self.__class__.__name__) self.cached_context = None # kickstart get/set parameter chain self._send_get_parameter_request()
[docs] def update(self) -> py_trees.common.Status: """ Complete the chain of calls begun in :meth:`initialise()` and then maintain the context (i.e. :class:`py_trees.behaviour.Behaviour` and return :data:`~py_trees.common.Status.RUNNING`). """ self.logger.debug("%s.update()" % self.__class__.__name__) all_done = False # wait for get_parameter to return if self.cached_context is None: if self._process_get_parameter_response(): self._send_set_parameter_request(value=True) return py_trees.common.Status.RUNNING # wait for set parameter to return if not all_done: if self._process_set_parameter_response(): all_done = True return py_trees.common.Status.RUNNING # just spin around, wait for an interrupt to trigger terminate return py_trees.common.Status.RUNNING
[docs] def terminate(self, new_status: py_trees.common.Status): """ Reset the parameters back to their original (cached) values. Args: new_status: the behaviour is transitioning to this new status """ self.logger.debug("%s.terminate(%s)" % (self.__class__.__name__, "%s->%s" % (self.status, new_status) if self.status != new_status else "%s" % new_status)) if ( new_status == py_trees.common.Status.INVALID and self.cached_context is not None ): self._send_set_parameter_request(value=self.cached_context)
# don't worry about the response, no chance to catch it anyway def _send_get_parameter_request(self): request = rcl_srvs.GetParameters.Request() # noqa request.names.append("enabled") self.get_parameter_future = self.parameter_clients['get_safety_sensors'].call_async(request) def _process_get_parameter_response(self) -> bool: if not self.get_parameter_future.done(): return False if self.get_parameter_future.result() is None: self.feedback_message = "failed to retrieve the safety sensors context" self.node.get_logger().error(self.feedback_message) # self.node.get_logger().info('Service call failed %r' % (future.exception(),)) raise RuntimeError(self.feedback_message) if len(self.get_parameter_future.result().values) > 1: self.feedback_message = "expected one parameter value, got multiple [{}]".format("/safety_sensors/enabled") raise RuntimeError(self.feedback_message) value = self.get_parameter_future.result().values[0] if value.type != rcl_msgs.ParameterType.PARAMETER_BOOL: # noqa self.feedback_message = "expected parameter type bool, got [{}]{}]".format(value.type, "/safety_sensors/enabled") self.node.get_logger().error(self.feedback_message) raise RuntimeError(self.feedback_message) self.cached_context = value.bool_value return True def _send_set_parameter_request(self, value: bool): request = rcl_srvs.SetParameters.Request() # noqa parameter = rcl_msgs.Parameter() parameter.name = "enabled" parameter.value.type = rcl_msgs.ParameterType.PARAMETER_BOOL # noqa parameter.value.bool_value = value request.parameters.append(parameter) self.set_parameter_future = self.parameter_clients['set_safety_sensors'].call_async(request) def _process_set_parameter_response(self) -> bool: if not self.get_parameter_future.done(): return False if self.set_parameter_future.result() is not None: self.feedback_message = "reconfigured the safety sensors context" else: self.feedback_message = "failed to reconfigure the safety sensors context" self.node.get_logger().error(self.feedback_message) # self.node.get_logger().info('service call failed %r' % (future.exception(),)) return True