From 12be305afed98830484fba04aefd9289d650a907 Mon Sep 17 00:00:00 2001 From: nvikramraj Date: Fri, 6 Feb 2026 15:44:59 -0600 Subject: [PATCH] added support for multi stream for ros topics --- README.md | 35 ++++++++-------- config/parameters.yaml | 24 +++++------ include/image2rtsp.hpp | 26 ++++++++---- src/image2rtsp.cpp | 91 ++++++++++++++++++++++++++++++------------ src/video.cpp | 22 +++++----- 5 files changed, 126 insertions(+), 72 deletions(-) diff --git a/README.md b/README.md index a921d63..6ecfa65 100644 --- a/README.md +++ b/README.md @@ -33,13 +33,18 @@ sudo apt-get install libgstreamer-plugins-base1.0-dev libgstreamer-plugins-good1 ```bashrc gedit ~/ros2_ws/src/image2rtsp/config/parameters.yaml ``` + # Example ROS2 Image topic stream - # If the source is a ros2 topic (default case) - compressed: False - topic: "color/image_raw" +Multi topic stream has been tested on ROS2 Jazzy with Intel RealSense D555 connected through Ethernet. +``` +/image2rtsp: + ros__parameters: + + # If the source is a ros2 topic (default case) + compressed: [False,False,False] + topic: ["/topic1","/topic2","/topic3"] default_pipeline: | - ( appsrc name=imagesrc do-timestamp=true min-latency=0 - max-latency=0 max-bytes=1000 is-live=true ! + ( appsrc name=imagesrc do-timestamp=true min-latency=0 max-latency=0 max-bytes=1000 is-live=true ! videoconvert ! videoscale ! video/x-raw, framerate=30/1, width=640, height=480 ! @@ -49,29 +54,27 @@ sudo apt-get install libgstreamer-plugins-base1.0-dev libgstreamer-plugins-good1 # Notice: The framerate setting does not affect the RTSP stream — it entirely depends on the ros2 topic frequency. # It is included in the pipeline and code for structural reasons. You can likely remove it from the pipeline without impacting the package's behavior. - - # If camera serves as a source camera: False camera_pipeline: | ( v4l2src device=/dev/video0 ! - videoconvert ! - videoscale ! - video/x-raw, framerate=30/1, width=640, height=480 ! - x264enc tune=zerolatency bitrate=500 key-int-max=30 ! - video/x-h264, profile=baseline ! - rtph264pay name=pay0 pt=96 ) + videoconvert ! + videoscale ! + video/x-raw, framerate=30/1, width=640, height=480 ! + x264enc tune=zerolatency bitrate=500 key-int-max=30 ! + video/x-h264, profile=baseline ! + rtph264pay name=pay0 pt=96 ) # Notice: Here the framerate might be set to the camera framerate, otherwise "503 Service Unavailable" error will appear. # RTSP setup - mountpoint: "/back" + mountpoint: ["/1","/2","/3"] port: "8554" - local_only: True # True = rtsp://127.0.0.1:portAndMountpoint (The stream is accessible only from the local machine) + local_only: False # True = rtsp://127.0.0.1:portAndMountpoint (The stream is accessible only from the local machine) # False = rtsp://0.0.0.0:portAndMountpoint (The stream is accessible from the outside) # For example, to access the stream running on the machine with IP = 192.168.20.20, # use rtsp://192.186.20.20:portAndMountpoint - +``` - Save your configuration and navigate to `ros2_ws` colcon root, source and build the package: ```bashrc cd ~/ros2_ws/ diff --git a/config/parameters.yaml b/config/parameters.yaml index 2ca4cb0..8d653c8 100644 --- a/config/parameters.yaml +++ b/config/parameters.yaml @@ -2,8 +2,8 @@ ros__parameters: # If the source is a ros2 topic (default case) - compressed: False - topic: "color/image_raw" + compressed: [False,False,False] + topic: ["/topic1","/topic2","/topic3"] default_pipeline: | ( appsrc name=imagesrc do-timestamp=true min-latency=0 max-latency=0 max-bytes=1000 is-live=true ! videoconvert ! @@ -15,25 +15,23 @@ # Notice: The framerate setting does not affect the RTSP stream — it entirely depends on the ros2 topic frequency. # It is included in the pipeline and code for structural reasons. You can likely remove it from the pipeline without impacting the package's behavior. - - # If camera serves as a source camera: False camera_pipeline: | ( v4l2src device=/dev/video0 ! - videoconvert ! - videoscale ! - video/x-raw, framerate=30/1, width=640, height=480 ! - x264enc tune=zerolatency bitrate=500 key-int-max=30 ! - video/x-h264, profile=baseline ! - rtph264pay name=pay0 pt=96 ) + videoconvert ! + videoscale ! + video/x-raw, framerate=30/1, width=640, height=480 ! + x264enc tune=zerolatency bitrate=500 key-int-max=30 ! + video/x-h264, profile=baseline ! + rtph264pay name=pay0 pt=96 ) # Notice: Here the framerate might be set to the camera framerate, otherwise "503 Service Unavailable" error will appear. # RTSP setup - mountpoint: "/back" + mountpoint: ["/1","/2","/3"] port: "8554" - local_only: True # True = rtsp://127.0.0.1:portAndMountpoint (The stream is accessible only from the local machine) + local_only: False # True = rtsp://127.0.0.1:portAndMountpoint (The stream is accessible only from the local machine) # False = rtsp://0.0.0.0:portAndMountpoint (The stream is accessible from the outside) # For example, to access the stream running on the machine with IP = 192.168.20.20, - # use rtsp://192.186.20.20:portAndMountpoint + # use rtsp://192.186.20.20:portAndMountpoint \ No newline at end of file diff --git a/include/image2rtsp.hpp b/include/image2rtsp.hpp index b5b7693..f3f70d8 100644 --- a/include/image2rtsp.hpp +++ b/include/image2rtsp.hpp @@ -5,20 +5,34 @@ #include #include #include +#include #include "sensor_msgs/msg/image.hpp" #include "sensor_msgs/msg/compressed_image.hpp" #include using namespace std; + +struct Stream { + std::string topic; + std::string mountpoint; + bool compressed = false; + GstAppSrc* appsrc = nullptr; + rclcpp::Subscription::SharedPtr sub; + rclcpp::Subscription::SharedPtr sub_compressed; +}; + + class Image2rtsp : public rclcpp::Node{ public: Image2rtsp(); GstRTSPServer *rtsp_server; private: - string topic; - string mountpoint; + vector streams_; + vector topics = {"/topic1","/topic2","/topic3"}; + vector mountpoints = {"/1","/2","/3"}; + vector is_compressed = {false,false,false}; string port; string pipeline; string default_pipeline; @@ -26,18 +40,14 @@ private: uint framerate; bool local_only; bool camera; - bool compressed; - GstAppSrc *appsrc; void video_mainloop_start(); void rtsp_server_add_url(const char *url, const char *sPipeline, GstElement **appsrc); - void topic_callback(const sensor_msgs::msg::Image::SharedPtr msg); - void compressed_topic_callback(const sensor_msgs::msg::CompressedImage::SharedPtr msg); + void topic_callback(const sensor_msgs::msg::Image::SharedPtr msg, Stream* stream); + void compressed_topic_callback(const sensor_msgs::msg::CompressedImage::SharedPtr msg, Stream* stream); uint extract_framerate(const std::string& pipeline, uint default_framerate); GstRTSPServer *rtsp_server_create(const string &port, const bool local_only); GstCaps *gst_caps_new_from_image(const sensor_msgs::msg::Image::SharedPtr &msg); - rclcpp::Subscription::SharedPtr subscription_; - rclcpp::Subscription::SharedPtr subscription_compressed_; }; static void media_configure(GstRTSPMediaFactory *factory, GstRTSPMedia *media, GstElement **appsrc); diff --git a/src/image2rtsp.cpp b/src/image2rtsp.cpp index 58d101f..407d22e 100644 --- a/src/image2rtsp.cpp +++ b/src/image2rtsp.cpp @@ -9,12 +9,12 @@ using std::placeholders::_1; Image2rtsp::Image2rtsp() : Node("image2rtsp"){ // Declare and get the parameters - this->declare_parameter("topic", "/color/image_raw"); - this->declare_parameter("mountpoint", "/back"); + this->declare_parameter("topic", topics); + this->declare_parameter("mountpoint", mountpoints); this->declare_parameter("port", "8554"); this->declare_parameter("local_only", true); this->declare_parameter("camera", false); - this->declare_parameter("compressed", false); + this->declare_parameter("compressed", is_compressed); this->declare_parameter("default_pipeline", R"( ( appsrc name=imagesrc do-timestamp=true min-latency=0 max-latency=0 max-bytes=1000 is-live=true ! @@ -36,40 +36,79 @@ Image2rtsp::Image2rtsp() : Node("image2rtsp"){ rtph264pay name=pay0 pt=96 ) )"); - topic = this->get_parameter("topic").as_string(); - mountpoint = this->get_parameter("mountpoint").as_string(); - port = this->get_parameter("port").as_string(); - local_only = this->get_parameter("local_only").as_bool(); - camera = this->get_parameter("camera").as_bool(); - compressed = this->get_parameter("compressed").as_bool(); - default_pipeline = this->get_parameter("default_pipeline").as_string(); - camera_pipeline = this->get_parameter("camera_pipeline").as_string(); + topics = this->get_parameter("topic").as_string_array(); + mountpoints = this->get_parameter("mountpoint").as_string_array(); + port = this->get_parameter("port").as_string(); + local_only = this->get_parameter("local_only").as_bool(); + camera = this->get_parameter("camera").as_bool(); + is_compressed = this->get_parameter("compressed").as_bool_array(); + default_pipeline = this->get_parameter("default_pipeline").as_string(); + camera_pipeline = this->get_parameter("camera_pipeline").as_string(); - // Start the subscription - if (camera == false){ - if (compressed == false){ - subscription_ = this->create_subscription(topic, 10, std::bind(&Image2rtsp::topic_callback, this, _1)); - RCLCPP_INFO(this->get_logger(), "Subscribing to sensor_msgs::msg::Image"); - } - else { - subscription_compressed_ = this->create_subscription(topic, 10, std::bind(&Image2rtsp::compressed_topic_callback, this, _1)); - RCLCPP_INFO(this->get_logger(), "Subscribing to sensor_msgs::msg::CompressedImage"); + // Failsafe + if (topics.size() != mountpoints.size() || topics.size() != is_compressed.size()) { + throw std::runtime_error("topic / mountpoint / compressed parameter sizes do not match"); + } + + // Iterating through all parameters and creating a struct for all streams + streams_.clear(); + for (size_t i = 0; i < topics.size(); i++) { + + Stream s; + s.topic = topics[i]; + s.mountpoint = mountpoints[i]; + s.appsrc = NULL; + s.compressed = is_compressed[i]; + streams_.push_back(s); + } + + if (!camera) { + for (Stream &s : streams_) { + + if (!s.compressed) { + s.sub = this->create_subscription( + s.topic, + 10, + [this, &s](sensor_msgs::msg::Image::SharedPtr msg) { + this->topic_callback(msg, &s); + }); + + RCLCPP_INFO(this->get_logger(), + "Subscribed to Image topic: %s", + s.topic.c_str()); + } + else { + s.sub_compressed = + this->create_subscription( + s.topic, + 10, + [this, &s](sensor_msgs::msg::CompressedImage::SharedPtr msg) { + this->compressed_topic_callback(msg, &s); + }); + + RCLCPP_INFO(this->get_logger(), + "Subscribed to CompressedImage topic: %s", + s.topic.c_str()); + } } } else { RCLCPP_INFO(this->get_logger(), "Trying to access camera device"); } - // Start the RTSP server - video_mainloop_start(); - rtsp_server = rtsp_server_create(port, local_only); - appsrc = NULL; + // Start the RTSP server pipeline = camera ? camera_pipeline : default_pipeline; framerate = extract_framerate(pipeline, 30); - rtsp_server_add_url(mountpoint.c_str(), pipeline.c_str(), camera ? nullptr : (GstElement **)&appsrc); - RCLCPP_INFO(this->get_logger(), "Stream available at rtsp://%s:%s%s", gst_rtsp_server_get_address(rtsp_server), port.c_str(), mountpoint.c_str()); + video_mainloop_start(); + rtsp_server = rtsp_server_create(port, local_only); + for (Stream &s : streams_) { + s.appsrc = NULL; + rtsp_server_add_url(s.mountpoint.c_str(), pipeline.c_str(), camera ? nullptr : (GstElement **)&s.appsrc); + RCLCPP_INFO(this->get_logger(), "Stream available at rtsp://%s:%s%s", gst_rtsp_server_get_address(rtsp_server), port.c_str(), s.mountpoint.c_str()); + + } } uint Image2rtsp::extract_framerate(const std::string& pipeline, uint default_framerate = 30) { diff --git a/src/video.cpp b/src/video.cpp index bde96c5..e41afaf 100644 --- a/src/video.cpp +++ b/src/video.cpp @@ -66,6 +66,7 @@ void Image2rtsp::rtsp_server_add_url(const char *url, const char *sPipeline, Gst g_object_unref(mounts); } + static void media_configure(GstRTSPMediaFactory *factory, GstRTSPMedia *media, GstElement **appsrc){ if(appsrc){ GstElement *pipeline = gst_rtsp_media_get_element(media); @@ -154,23 +155,26 @@ static gboolean session_cleanup(Image2rtsp *node, rclcpp::Logger logger, gboolea return TRUE; } -void Image2rtsp::topic_callback(const sensor_msgs::msg::Image::SharedPtr msg){ +void Image2rtsp::topic_callback(const sensor_msgs::msg::Image::SharedPtr msg, Stream* s){ GstBuffer *buf; GstCaps *caps; // image properties. see return of Image2rtsp::gst_caps_new_from_image char *gst_type, *gst_format = (char *)""; - if (appsrc != NULL){ + if (s && s->appsrc){ // Set caps from message caps = gst_caps_new_from_image(msg); - gst_app_src_set_caps(appsrc, caps); + gst_app_src_set_caps(s->appsrc , caps); buf = gst_buffer_new_allocate(nullptr, msg->data.size(), nullptr); gst_buffer_fill(buf, 0, msg->data.data(), msg->data.size()); GST_BUFFER_FLAG_SET(buf, GST_BUFFER_FLAG_LIVE); - gst_app_src_push_buffer(appsrc, buf); + gst_app_src_push_buffer(s->appsrc , buf); + // unref caps + // gst_caps_unref(caps); } } -void Image2rtsp::compressed_topic_callback(const sensor_msgs::msg::CompressedImage::SharedPtr msg){ - if (appsrc == NULL) return; + +void Image2rtsp::compressed_topic_callback(const sensor_msgs::msg::CompressedImage::SharedPtr msg, Stream* s){ + if (s && s->appsrc) return; // Decompress the image cv::Mat img = cv::imdecode(cv::Mat(msg->data), cv::IMREAD_UNCHANGED); if (img.empty()) { @@ -197,7 +201,7 @@ void Image2rtsp::compressed_topic_callback(const sensor_msgs::msg::CompressedIma nullptr); // Set caps on appsrc - gst_app_src_set_caps(appsrc, caps); + gst_app_src_set_caps(s->appsrc, caps); gst_caps_unref(caps); // Create a GstBuffer and fill it with the image data @@ -206,5 +210,5 @@ void Image2rtsp::compressed_topic_callback(const sensor_msgs::msg::CompressedIma GST_BUFFER_FLAG_SET(buf, GST_BUFFER_FLAG_LIVE); // Push the buffer to GStreamer - gst_app_src_push_buffer(appsrc, buf); -} \ No newline at end of file + gst_app_src_push_buffer(s->appsrc, buf); +}