diff --git a/behaviortree_ros2/CMakeLists.txt b/behaviortree_ros2/CMakeLists.txt index edf1440..ad2d108 100644 --- a/behaviortree_ros2/CMakeLists.txt +++ b/behaviortree_ros2/CMakeLists.txt @@ -37,6 +37,9 @@ target_include_directories(${PROJECT_NAME} PUBLIC target_link_libraries(${PROJECT_NAME} bt_executor_parameters) +if (BUILD_TESTING) + add_subdirectory(test) +endif() ###################################################### # INSTALL diff --git a/behaviortree_ros2/include/behaviortree_ros2/bt_topic_sub_node.hpp b/behaviortree_ros2/include/behaviortree_ros2/bt_topic_sub_node.hpp index 211f41c..a76dedd 100644 --- a/behaviortree_ros2/include/behaviortree_ros2/bt_topic_sub_node.hpp +++ b/behaviortree_ros2/include/behaviortree_ros2/bt_topic_sub_node.hpp @@ -14,12 +14,14 @@ #pragma once +#include +#include #include +#include +#include #include #include #include -#include "behaviortree_cpp/condition_node.h" -#include "behaviortree_cpp/bt_factory.h" #include "behaviortree_ros2/ros_node_params.hpp" #include @@ -42,7 +44,7 @@ namespace BT * 2. Otherwise, use the value in RosNodeParams::default_port_value */ template -class RosTopicSubNode : public BT::ConditionNode +class RosTopicSubNode : public BT::StatefulActionNode { public: // Type definitions @@ -51,7 +53,9 @@ class RosTopicSubNode : public BT::ConditionNode protected: struct SubscriberInstance { - SubscriberInstance(std::shared_ptr node, const std::string& topic_name); + SubscriberInstance(std::shared_ptr node, const std::string& topic_name, + const std::size_t history_depth, + const rclcpp::ReliabilityPolicy& reliability); std::shared_ptr subscriber; rclcpp::CallbackGroup::SharedPtr callback_group; @@ -131,9 +135,13 @@ class RosTopicSubNode : public BT::ConditionNode return providedBasicPorts({}); } - NodeStatus tick() override final; + NodeStatus onStart() override final; - /** Callback invoked in the tick. You must return either SUCCESS of FAILURE + NodeStatus onRunning() override final; + + void onHalted() override final; + + /** Callback invoked in the tick. You must return SUCCESS, FAILURE, or RUNNING. * * @param last_msg the latest message received, since the last tick. * Will be empty if no new message received. @@ -157,6 +165,8 @@ class RosTopicSubNode : public BT::ConditionNode private: bool createSubscriber(const std::string& topic_name); + + NodeStatus checkStatus(const NodeStatus& status) const; }; //---------------------------------------------------------------- @@ -164,7 +174,8 @@ class RosTopicSubNode : public BT::ConditionNode //---------------------------------------------------------------- template inline RosTopicSubNode::SubscriberInstance::SubscriberInstance( - std::shared_ptr node, const std::string& topic_name) + std::shared_ptr node, const std::string& topic_name, + const std::size_t history_depth, const rclcpp::ReliabilityPolicy& reliability) { // create a callback group for this particular instance callback_group = @@ -180,58 +191,24 @@ inline RosTopicSubNode::SubscriberInstance::SubscriberInstance( last_msg = msg; broadcaster(msg); }; - subscriber = node->create_subscription(topic_name, 1, callback, option); + subscriber = node->create_subscription( + topic_name, rclcpp::QoS(history_depth).reliability(reliability), callback, option); } template inline RosTopicSubNode::RosTopicSubNode(const std::string& instance_name, const NodeConfig& conf, const RosNodeParams& params) - : BT::ConditionNode(instance_name, conf), node_(params.nh) + : BT::StatefulActionNode(instance_name, conf), node_(params.nh) { - // check port remapping - auto portIt = config().input_ports.find("topic_name"); - if(portIt != config().input_ports.end()) + // Check if default_port_value was used to provide a topic name. + if(!params.default_port_value.empty()) { - const std::string& bb_topic_name = portIt->second; - - if(bb_topic_name.empty() || bb_topic_name == "__default__placeholder__") - { - if(params.default_port_value.empty()) - { - throw std::logic_error("Both [topic_name] in the InputPort and the RosNodeParams " - "are empty."); - } - else - { - createSubscriber(params.default_port_value); - } - } - else if(!isBlackboardPointer(bb_topic_name)) - { - // If the content of the port "topic_name" is not - // a pointer to the blackboard, but a static string, we can - // create the client in the constructor. - createSubscriber(bb_topic_name); - } - else - { - // do nothing - // createSubscriber will be invoked in the first tick(). - } - } - else - { - if(params.default_port_value.empty()) - { - throw std::logic_error("Both [topic_name] in the InputPort and the RosNodeParams " - "are empty."); - } - else - { - createSubscriber(params.default_port_value); - } + topic_name_ = params.default_port_value; } + + // If no value was provided through the params, assume that the topic name will be set + // through a port when the node is first ticked. } template @@ -243,25 +220,53 @@ inline bool RosTopicSubNode::createSubscriber(const std::string& topic_name) } if(sub_instance_) { - throw RuntimeError("Can't call createSubscriber more than once"); + throw RuntimeError("Subscriber already exists"); } - // find SubscriberInstance in the registry - std::unique_lock lk(registryMutex()); - auto node = node_.lock(); if(!node) { throw RuntimeError("The ROS node went out of scope. RosNodeParams doesn't take the " "ownership of the node."); } + + const auto publisher_info = node->get_publishers_info_by_topic(topic_name); + if(publisher_info.empty()) + { + RCLCPP_INFO(logger(), + "No publisher found on topic [%s]. Deferring creation of subscriber " + "until publisher exists.", + topic_name_.c_str()); + return false; + } + + rclcpp::ReliabilityPolicy publisher_reliability = + publisher_info.at(0).qos_profile().reliability(); + for(std::size_t i = 1; i < publisher_info.size(); i++) + { + if(publisher_reliability != publisher_info.at(i).qos_profile().reliability()) + { + RCLCPP_WARN_ONCE(logger(), + "Multiple publishers were found on topic [%s] with different QoS " + "reliability policies. The subscriber will use the reliability " + "setting from the first publisher it found, but this may result " + "in undesirable behavior.", + topic_name_.c_str()); + break; + } + } + subscriber_key_ = std::string(node->get_fully_qualified_name()) + "/" + topic_name; + // find SubscriberInstance in the registry + std::unique_lock lk(registryMutex()); + auto& registry = getRegistry(); auto it = registry.find(subscriber_key_); if(it == registry.end() || it->second.expired()) { - sub_instance_ = std::make_shared(node, topic_name); + sub_instance_ = + std::make_shared(node, topic_name, 1, publisher_reliability); registry.insert({ subscriber_key_, sub_instance_ }); RCLCPP_INFO(logger(), "Node [%s] created Subscriber to topic [%s]", name().c_str(), @@ -282,40 +287,57 @@ inline bool RosTopicSubNode::createSubscriber(const std::string& topic_name) signal_connection_ = sub_instance_->broadcaster.connect( [this](const std::shared_ptr msg) { last_msg_ = msg; }); - topic_name_ = topic_name; return true; } +template +inline NodeStatus RosTopicSubNode::checkStatus(const NodeStatus& status) const +{ + if(!isStatusActive(status)) + { + throw std::logic_error("RosTopicSubNode: the callback must return SUCCESS, FAILURE, " + "or RUNNING"); + } + return status; +}; template -inline NodeStatus RosTopicSubNode::tick() +inline NodeStatus RosTopicSubNode::onStart() { - // First, check if the subscriber_ is valid and that the name of the - // topic_name in the port didn't change. - // otherwise, create a new subscriber - std::string topic_name; - getInput("topic_name", topic_name); - - if(!topic_name.empty() && topic_name != "__default__placeholder__" && - topic_name != topic_name_) + // If the topic name was provided through an input port, is a valid topic name, and is different + // from the stored topic name, update the stored topic name to the new string. + if(const auto topic_name = getInput("topic_name"); + topic_name.has_value() && !topic_name->empty() && + topic_name.value() != "__default__placeholder__" && topic_name != topic_name_) { - sub_instance_.reset(); + topic_name_ = topic_name.value(); } - if(!sub_instance_) + // If we fail to create a subscriber, exit early and try again on the next tick + if(!createSubscriber(topic_name_)) { - createSubscriber(topic_name); + return NodeStatus::RUNNING; } - auto CheckStatus = [](NodeStatus status) { - if(!isStatusCompleted(status)) - { - throw std::logic_error("RosTopicSubNode: the callback must return" - "either SUCCESS or FAILURE"); - } - return status; - }; + // Discard any previous message unless latching is enabled. + // This enforces that the message must have been received after the current call to onStart(), even if this node has been ticked to completion previously. + if(!latchLastMessage()) + { + last_msg_.reset(); + } + + // NOTE(schornakj): Something weird is happening here that prevents the subscriber from receiving a message if the publisher both initializes and publishes a message in between ticks. + // I think it has to do with the discovery process between the publisher and subscriber and how that relates to events being handled by the executor. + // This might depend on the middleware implemenation too. + sub_instance_->callback_group_executor.spin_some(); - auto status = CheckStatus(onTick(last_msg_)); + + // If no message was received, return RUNNING + if(last_msg_ == nullptr) + { + return NodeStatus::RUNNING; + } + + auto status = checkStatus(onTick(last_msg_)); if(!latchLastMessage()) { last_msg_.reset(); @@ -323,4 +345,35 @@ inline NodeStatus RosTopicSubNode::tick() return status; } +template +inline NodeStatus RosTopicSubNode::onRunning() +{ + // If the subscriber wasn't initialized during onStart(), attempt to create it here. + if(!sub_instance_) + { + // If the subscriber still cannot be created, return RUNNING and try again on the next tick. + if(!createSubscriber(topic_name_)) + { + return NodeStatus::RUNNING; + } + } + + sub_instance_->callback_group_executor.spin_some(); + + // If no message was received, return RUNNING + if(last_msg_ == nullptr) + { + return NodeStatus::RUNNING; + } + + // TODO(schornakj): handle timeout here + + auto status = checkStatus(onTick(last_msg_)); + + return status; +} + +template +inline void RosTopicSubNode::onHalted() +{} } // namespace BT diff --git a/behaviortree_ros2/test/CMakeLists.txt b/behaviortree_ros2/test/CMakeLists.txt new file mode 100644 index 0000000..dee5d2c --- /dev/null +++ b/behaviortree_ros2/test/CMakeLists.txt @@ -0,0 +1,7 @@ +find_package(ament_cmake_gmock REQUIRED) +find_package(std_msgs REQUIRED) +find_package(launch_testing_ament_cmake REQUIRED) + +ament_add_gmock(test_${PROJECT_NAME} test_bt_topic_sub_node.cpp) +target_link_libraries(test_${PROJECT_NAME} ${PROJECT_NAME}) +ament_target_dependencies(test_${PROJECT_NAME} std_msgs) diff --git a/behaviortree_ros2/test/test_bt_topic_sub_node.cpp b/behaviortree_ros2/test/test_bt_topic_sub_node.cpp new file mode 100644 index 0000000..c37db63 --- /dev/null +++ b/behaviortree_ros2/test/test_bt_topic_sub_node.cpp @@ -0,0 +1,240 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace +{ +constexpr auto kTopicName = "/my_topic"; +constexpr auto kHistoryDepth = 1; +constexpr auto kPortIdTopicName = "topic_name"; + +using Empty = std_msgs::msg::Empty; +} // namespace + +namespace BT +{ + +class SubNode : public RosTopicSubNode +{ +public: + SubNode(const std::string& instance_name, const BT::NodeConfig& conf, + const RosNodeParams& params) + : RosTopicSubNode(instance_name, conf, params) + {} + +private: + NodeStatus onTick(const std::shared_ptr& last_msg) override + { + if(last_msg == nullptr) + { + return NodeStatus::RUNNING; + } + return NodeStatus::SUCCESS; + } +}; + +class TestBtTopicSubNode : public ::testing::Test +{ +public: + void SetUp() override + { + rclcpp::init(0, nullptr); + + node_ = std::make_shared("ros_node"); + + BT::assignDefaultRemapping(config_); + config_.blackboard = Blackboard::create(); + } + + void TearDown() override + { + rclcpp::shutdown(); + } + + void createPublisher(const rclcpp::QoS& qos) + { + publisher_ = node_->create_publisher(kTopicName, qos); + } + + std::shared_ptr node_; + std::shared_ptr> publisher_; + + NodeConfig config_; +}; + +TEST_F(TestBtTopicSubNode, TopicAsParam) +{ + // GIVEN the blackboard does not contain the topic name + + // GIVEN the input params contain the topic name + RosNodeParams params; + params.nh = node_; + params.default_port_value = kTopicName; + + // GIVEN we pass in the params and the blackboard when creating the BT node + SubNode bt_node("test_node", config_, params); + + // GIVEN we create publisher with default QoS settings after creating the BT node + createPublisher(rclcpp::QoS(kHistoryDepth)); + + // GIVEN the BT node is RUNNING before the publisher publishes a message + ASSERT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::RUNNING)); + + // GIVEN the publisher has published a message + publisher_->publish(std_msgs::build()); + + // WHEN the BT node is ticked + // THEN it succeeds + EXPECT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::SUCCESS)); +} + +TEST_F(TestBtTopicSubNode, TopicFromStaticStringInPort) +{ + // GIVEN the input port directly contains the topic name + config_.input_ports[kPortIdTopicName] = kTopicName; + + // GIVEN the input params do not contain the topic name + RosNodeParams params; + params.nh = node_; + + // GIVEN we pass in the params and the blackboard when creating the BT node + SubNode bt_node("test_node", config_, params); + + // GIVEN we create publisher with default QoS settings after creating the BT node + createPublisher(rclcpp::QoS(kHistoryDepth)); + + // GIVEN the BT node is RUNNING before the publisher publishes a message + ASSERT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::RUNNING)); + + // GIVEN the publisher has published a message + publisher_->publish(std_msgs::build()); + + // WHEN the BT node is ticked + // THEN it succeeds + EXPECT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::SUCCESS)); +} + +TEST_F(TestBtTopicSubNode, TopicFromBlackboard) +{ + // GIVEN the input port is remapped to point to a value on the blackboard + config_.blackboard->set(kPortIdTopicName, kTopicName); + + // GIVEN the input params do not contain the topic name + RosNodeParams params; + params.nh = node_; + + // GIVEN we pass in the params and the blackboard when creating the BT node + SubNode bt_node("test_node", config_, params); + + // GIVEN we create publisher with default QoS settings after creating the BT node + createPublisher(rclcpp::QoS(kHistoryDepth)); + + // GIVEN the BT node is RUNNING before the publisher publishes a message + ASSERT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::RUNNING)); + + // GIVEN the publisher has published a message + publisher_->publish(std_msgs::build()); + + // WHEN the BT node is ticked + // THEN it succeeds + EXPECT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::SUCCESS)); +} + +TEST_F(TestBtTopicSubNode, QoSBestEffort) +{ + // GIVEN the blackboard does not contain the topic name + + // GIVEN the input params contain the topic name + RosNodeParams params; + params.nh = node_; + params.default_port_value = kTopicName; + + // GIVEN we pass in the params and the blackboard when creating the BT node + SubNode bt_node("test_node", config_, params); + + // GIVEN we create publisher with BestEffort reliability QoS settings after creating the BT node + createPublisher(rclcpp::QoS(kHistoryDepth).best_effort()); + + // GIVEN the BT node is RUNNING before the publisher publishes a message + ASSERT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::RUNNING)); + + // GIVEN the publisher has published a message + publisher_->publish(std_msgs::build()); + + // WHEN the BT node is ticked + // THEN it succeeds + EXPECT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::SUCCESS)); +} + +TEST_F(TestBtTopicSubNode, PublisherCreatedAfterFirstTick) +{ + // GIVEN the blackboard does not contain the topic name + + // GIVEN the input params contain the topic name + RosNodeParams params; + params.nh = node_; + params.default_port_value = kTopicName; + + // GIVEN we pass in the params and the blackboard when creating the BT node + SubNode bt_node("test_node", config_, params); + + // GIVEN the BT node is RUNNING before the publisher is created + ASSERT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::RUNNING)); + + // GIVEN we create publisher with Reliable reliability QoS settings after creating the BT node and after the node starts ticking + createPublisher(rclcpp::QoS(kHistoryDepth).reliable()); + + // TODO(Joe): Why does the node need to be ticked in between the publisher appearing and sending a message for the message to be received? Seems highly suspicious! + ASSERT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::RUNNING)); + + // GIVEN the publisher has published a message + publisher_->publish(std_msgs::build()); + // WHEN the BT node is ticked + // THEN it succeeds + EXPECT_THAT(bt_node.executeTick(), testing::Eq(NodeStatus::SUCCESS)); +} + +TEST_F(TestBtTopicSubNode, ExceptionIfNoTopic) +{ + // GIVEN the blackboard does not contain the topic name + + // GIVEN the input params do not contain the topic name + RosNodeParams params; + params.nh = node_; + + // GIVEN we pass in the params and the blackboard when creating the BT node + SubNode bt_node("test_node", config_, params); + + // WHEN the BT node is ticked + // THEN it throws an exception + EXPECT_THROW(bt_node.executeTick(), BT::RuntimeError); +} + +TEST_F(TestBtTopicSubNode, ExceptionIfRosNodeReset) +{ + // GIVEN the blackboard does not contain the topic name + + // GIVEN the input params do not contain the topic name + RosNodeParams params; + params.nh = node_; + + // GIVEN we pass in the params and the blackboard when creating the BT node + SubNode bt_node("test_node", config_, params); + + // GIVEN the ROS node is reset prior to the first tick + node_.reset(); + + // WHEN the BT node is ticked + // THEN it throws an exception + EXPECT_THROW(bt_node.executeTick(), BT::RuntimeError); +} +} // namespace BT