#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# License: BSD
# https://github.com/splintered-reality/py_trees_ros_tutorials/raw/devel/LICENSE
#
##############################################################################
# Documentation
##############################################################################
"""
Mock a hardware LED strip.
"""
##############################################################################
# Imports
##############################################################################
import argparse
import functools
import math
import py_trees.console as console
import py_trees_ros
import rclpy
import std_msgs.msg as std_msgs
import sys
import threading
import uuid
##############################################################################
# Class
##############################################################################
[docs]class LEDStrip(object):
"""
Emulates command/display of an led strip so that it flashes various colours.
Node Name:
* **led_strip**
Publishers:
* **~display** (:class:`std_msgs.msg.String`)
* colourised string display of the current led strip state
Subscribers:
* **~command** (:class:`std_msgs.msg.String`)
* send it a colour to express, it will flash this for the next 3 seconds
"""
_pattern = '*'
_pattern_width = 60 # total width of the pattern to be output
_pattern_name_spacing = 4 # space between pattern and the name of the pattern
def __init__(self):
self.node = rclpy.create_node("led_strip")
self.command_subscriber = self.node.create_subscription(
msg_type=std_msgs.String,
topic='~/command',
callback=self.command_callback,
qos_profile=py_trees_ros.utilities.qos_profile_unlatched()
)
self.display_publisher = self.node.create_publisher(
msg_type=std_msgs.String,
topic="~/display",
qos_profile=py_trees_ros.utilities.qos_profile_latched()
)
self.duration_sec = 3.0
self.last_text = ''
self.last_uuid = None
self.lock = threading.Lock()
self.flashing_timer = None
def _get_display_string(self, width: int, label: str="Foo") -> str:
"""
Display the current state of the led strip as a formatted
string.
Args:
width: the width of the pattern
label: display this in the centre of the pattern rather
than the pattern name
"""
# top and bottom of print repeats the pattern as many times as possible
# in the space specified
top_bottom = LEDStrip._pattern * int(width / len(LEDStrip._pattern))
# space for two halves of the pattern on either side of the pattern name
mid_pattern_space = (width - len(label) - self._pattern_name_spacing * 2) / 2
# pattern for the mid line
mid = LEDStrip._pattern * int(mid_pattern_space / len(LEDStrip._pattern))
# total length of the middle line with pattern, spacing and name
mid_len = len(mid) * 2 + self._pattern_name_spacing * 2 + len(label)
# patterns won't necessarily match up with the width, so need to deal
# with extra space. Odd numbers of extra space handled by putting more
# spaces on the right side
extra_space = width - mid_len
extra_left_space = int(math.floor(extra_space / 2.0))
extra_right_space = int(math.ceil(extra_space / 2.0))
# left and right parts of the mid line to go around the name
left = mid + ' ' * (self._pattern_name_spacing + extra_left_space)
right = ' ' * (self._pattern_name_spacing + extra_right_space) + mid
return '\n' + top_bottom + '\n' + left + label.replace('_', ' ') + right + '\n' + top_bottom
[docs] def generate_led_text(self, colour: bool) -> str:
"""
Generate a formatted string representation of the the current state of the led strip.
Args:
colour: use shell escape sequences for colour, matching the specified text colour label
"""
if not colour:
return ""
else:
text = self._get_display_string(self._pattern_width, label=colour)
# map colour names in message to console colour escape sequences
console_colour_map = {
'grey': console.dim + console.white,
'red': console.red,
'green': console.green,
'yellow': console.yellow,
'blue': console.blue,
'purple': console.magenta,
'white': console.white
}
coloured_text = console_colour_map[colour] + console.blink + text + console.reset
return coloured_text
[docs] def command_callback(self, msg: std_msgs.String):
"""
If the requested state is different from the existing state, update and
restart a periodic timer to affect the flashing effect.
Args:
msg (:class:`std_msgs.msg.String`): incoming command message
"""
with self.lock:
text = self.generate_led_text(msg.data)
# don't bother publishing if nothing changed.
if self.last_text != text:
self.node.get_logger().info("{}".format(text))
self.last_text = text
self.last_uuid = uuid.uuid4()
self.display_publisher.publish(std_msgs.String(data=msg.data))
if self.flashing_timer is not None:
self.flashing_timer.cancel()
self.node.destroy_timer(self.flashing_timer)
# TODO: convert this to a one-shot once rclpy has the capability
# Without oneshot, it will keep triggering, but do nothing while
# it has the uuid check
self.flashing_timer = self.node.create_timer(
timer_period_sec=self.duration_sec,
callback=functools.partial(
self.cancel_flashing,
this_uuid=self.last_uuid
)
)
[docs] def cancel_flashing(self, this_uuid: uuid.UUID):
"""
If the notification identified by the given uuid is still relevant (i.e.
new command requests haven't come in) then publish an update with an
empty display message.
Args:
this_uuid: the uuid of the notification to cancel
"""
with self.lock:
if self.last_uuid == this_uuid:
# We're still relevant, publish and make us irrelevant
self.display_publisher.publish(std_msgs.String(data=""))
self.last_text = ""
self.last_uuid = uuid.uuid4()
[docs] def spin(self):
"""
Spin, and finally shutdown ROS components.
"""
try:
rclpy.spin(self.node)
except KeyboardInterrupt:
pass
self.node.destroy_node()
[docs]def main():
"""
Entry point for the mock led strip.
"""
parser = argparse.ArgumentParser(description='Mock an led strip')
command_line_args = rclpy.utilities.remove_ros_args(args=sys.argv)[1:]
parser.parse_args(command_line_args)
rclpy.init(args=sys.argv)
led_strip = LEDStrip()
led_strip.spin()
rclpy.shutdown()