added support for multi stream for ros topics

This commit is contained in:
nvikramraj 2026-02-06 15:44:59 -06:00
parent 2ba0f2e9f6
commit 12be305afe
5 changed files with 126 additions and 72 deletions

View File

@ -33,13 +33,18 @@ sudo apt-get install libgstreamer-plugins-base1.0-dev libgstreamer-plugins-good1
```bashrc ```bashrc
gedit ~/ros2_ws/src/image2rtsp/config/parameters.yaml gedit ~/ros2_ws/src/image2rtsp/config/parameters.yaml
``` ```
# Example ROS2 Image topic stream # Example ROS2 Image topic stream
# If the source is a ros2 topic (default case) Multi topic stream has been tested on ROS2 Jazzy with Intel RealSense D555 connected through Ethernet.
compressed: False ```
topic: "color/image_raw" /image2rtsp:
ros__parameters:
# If the source is a ros2 topic (default case)
compressed: [False,False,False]
topic: ["/topic1","/topic2","/topic3"]
default_pipeline: | default_pipeline: |
( appsrc name=imagesrc do-timestamp=true min-latency=0 ( appsrc name=imagesrc do-timestamp=true min-latency=0 max-latency=0 max-bytes=1000 is-live=true !
max-latency=0 max-bytes=1000 is-live=true !
videoconvert ! videoconvert !
videoscale ! videoscale !
video/x-raw, framerate=30/1, width=640, height=480 ! video/x-raw, framerate=30/1, width=640, height=480 !
@ -49,29 +54,27 @@ sudo apt-get install libgstreamer-plugins-base1.0-dev libgstreamer-plugins-good1
# Notice: The framerate setting does not affect the RTSP stream — it entirely depends on the ros2 topic frequency. # Notice: The framerate setting does not affect the RTSP stream — it entirely depends on the ros2 topic frequency.
# It is included in the pipeline and code for structural reasons. You can likely remove it from the pipeline without impacting the package's behavior. # It is included in the pipeline and code for structural reasons. You can likely remove it from the pipeline without impacting the package's behavior.
# If camera serves as a source # If camera serves as a source
camera: False camera: False
camera_pipeline: | camera_pipeline: |
( v4l2src device=/dev/video0 ! ( v4l2src device=/dev/video0 !
videoconvert ! videoconvert !
videoscale ! videoscale !
video/x-raw, framerate=30/1, width=640, height=480 ! video/x-raw, framerate=30/1, width=640, height=480 !
x264enc tune=zerolatency bitrate=500 key-int-max=30 ! x264enc tune=zerolatency bitrate=500 key-int-max=30 !
video/x-h264, profile=baseline ! video/x-h264, profile=baseline !
rtph264pay name=pay0 pt=96 ) rtph264pay name=pay0 pt=96 )
# Notice: Here the framerate might be set to the camera framerate, otherwise "503 Service Unavailable" error will appear. # Notice: Here the framerate might be set to the camera framerate, otherwise "503 Service Unavailable" error will appear.
# RTSP setup # RTSP setup
mountpoint: "/back" mountpoint: ["/1","/2","/3"]
port: "8554" port: "8554"
local_only: True # True = rtsp://127.0.0.1:portAndMountpoint (The stream is accessible only from the local machine) local_only: False # True = rtsp://127.0.0.1:portAndMountpoint (The stream is accessible only from the local machine)
# False = rtsp://0.0.0.0:portAndMountpoint (The stream is accessible from the outside) # False = rtsp://0.0.0.0:portAndMountpoint (The stream is accessible from the outside)
# For example, to access the stream running on the machine with IP = 192.168.20.20, # For example, to access the stream running on the machine with IP = 192.168.20.20,
# use rtsp://192.186.20.20:portAndMountpoint # use rtsp://192.186.20.20:portAndMountpoint
```
- Save your configuration and navigate to `ros2_ws` colcon root, source and build the package: - Save your configuration and navigate to `ros2_ws` colcon root, source and build the package:
```bashrc ```bashrc
cd ~/ros2_ws/ cd ~/ros2_ws/

View File

@ -2,8 +2,8 @@
ros__parameters: ros__parameters:
# If the source is a ros2 topic (default case) # If the source is a ros2 topic (default case)
compressed: False compressed: [False,False,False]
topic: "color/image_raw" topic: ["/topic1","/topic2","/topic3"]
default_pipeline: | default_pipeline: |
( appsrc name=imagesrc do-timestamp=true min-latency=0 max-latency=0 max-bytes=1000 is-live=true ! ( appsrc name=imagesrc do-timestamp=true min-latency=0 max-latency=0 max-bytes=1000 is-live=true !
videoconvert ! videoconvert !
@ -15,25 +15,23 @@
# Notice: The framerate setting does not affect the RTSP stream — it entirely depends on the ros2 topic frequency. # Notice: The framerate setting does not affect the RTSP stream — it entirely depends on the ros2 topic frequency.
# It is included in the pipeline and code for structural reasons. You can likely remove it from the pipeline without impacting the package's behavior. # It is included in the pipeline and code for structural reasons. You can likely remove it from the pipeline without impacting the package's behavior.
# If camera serves as a source # If camera serves as a source
camera: False camera: False
camera_pipeline: | camera_pipeline: |
( v4l2src device=/dev/video0 ! ( v4l2src device=/dev/video0 !
videoconvert ! videoconvert !
videoscale ! videoscale !
video/x-raw, framerate=30/1, width=640, height=480 ! video/x-raw, framerate=30/1, width=640, height=480 !
x264enc tune=zerolatency bitrate=500 key-int-max=30 ! x264enc tune=zerolatency bitrate=500 key-int-max=30 !
video/x-h264, profile=baseline ! video/x-h264, profile=baseline !
rtph264pay name=pay0 pt=96 ) rtph264pay name=pay0 pt=96 )
# Notice: Here the framerate might be set to the camera framerate, otherwise "503 Service Unavailable" error will appear. # Notice: Here the framerate might be set to the camera framerate, otherwise "503 Service Unavailable" error will appear.
# RTSP setup # RTSP setup
mountpoint: "/back" mountpoint: ["/1","/2","/3"]
port: "8554" port: "8554"
local_only: True # True = rtsp://127.0.0.1:portAndMountpoint (The stream is accessible only from the local machine) local_only: False # True = rtsp://127.0.0.1:portAndMountpoint (The stream is accessible only from the local machine)
# False = rtsp://0.0.0.0:portAndMountpoint (The stream is accessible from the outside) # False = rtsp://0.0.0.0:portAndMountpoint (The stream is accessible from the outside)
# For example, to access the stream running on the machine with IP = 192.168.20.20, # For example, to access the stream running on the machine with IP = 192.168.20.20,
# use rtsp://192.186.20.20:portAndMountpoint # use rtsp://192.186.20.20:portAndMountpoint

View File

@ -5,20 +5,34 @@
#include <gst/rtsp-server/rtsp-server.h> #include <gst/rtsp-server/rtsp-server.h>
#include <rclcpp/rclcpp.hpp> #include <rclcpp/rclcpp.hpp>
#include <string> #include <string>
#include <vector>
#include "sensor_msgs/msg/image.hpp" #include "sensor_msgs/msg/image.hpp"
#include "sensor_msgs/msg/compressed_image.hpp" #include "sensor_msgs/msg/compressed_image.hpp"
#include <opencv2/opencv.hpp> #include <opencv2/opencv.hpp>
using namespace std; using namespace std;
struct Stream {
std::string topic;
std::string mountpoint;
bool compressed = false;
GstAppSrc* appsrc = nullptr;
rclcpp::Subscription<sensor_msgs::msg::Image>::SharedPtr sub;
rclcpp::Subscription<sensor_msgs::msg::CompressedImage>::SharedPtr sub_compressed;
};
class Image2rtsp : public rclcpp::Node{ class Image2rtsp : public rclcpp::Node{
public: public:
Image2rtsp(); Image2rtsp();
GstRTSPServer *rtsp_server; GstRTSPServer *rtsp_server;
private: private:
string topic; vector<Stream> streams_;
string mountpoint; vector<string> topics = {"/topic1","/topic2","/topic3"};
vector<string> mountpoints = {"/1","/2","/3"};
vector<bool> is_compressed = {false,false,false};
string port; string port;
string pipeline; string pipeline;
string default_pipeline; string default_pipeline;
@ -26,18 +40,14 @@ private:
uint framerate; uint framerate;
bool local_only; bool local_only;
bool camera; bool camera;
bool compressed;
GstAppSrc *appsrc;
void video_mainloop_start(); void video_mainloop_start();
void rtsp_server_add_url(const char *url, const char *sPipeline, GstElement **appsrc); void rtsp_server_add_url(const char *url, const char *sPipeline, GstElement **appsrc);
void topic_callback(const sensor_msgs::msg::Image::SharedPtr msg); void topic_callback(const sensor_msgs::msg::Image::SharedPtr msg, Stream* stream);
void compressed_topic_callback(const sensor_msgs::msg::CompressedImage::SharedPtr msg); void compressed_topic_callback(const sensor_msgs::msg::CompressedImage::SharedPtr msg, Stream* stream);
uint extract_framerate(const std::string& pipeline, uint default_framerate); uint extract_framerate(const std::string& pipeline, uint default_framerate);
GstRTSPServer *rtsp_server_create(const string &port, const bool local_only); GstRTSPServer *rtsp_server_create(const string &port, const bool local_only);
GstCaps *gst_caps_new_from_image(const sensor_msgs::msg::Image::SharedPtr &msg); GstCaps *gst_caps_new_from_image(const sensor_msgs::msg::Image::SharedPtr &msg);
rclcpp::Subscription<sensor_msgs::msg::Image>::SharedPtr subscription_;
rclcpp::Subscription<sensor_msgs::msg::CompressedImage>::SharedPtr subscription_compressed_;
}; };
static void media_configure(GstRTSPMediaFactory *factory, GstRTSPMedia *media, GstElement **appsrc); static void media_configure(GstRTSPMediaFactory *factory, GstRTSPMedia *media, GstElement **appsrc);

View File

@ -9,12 +9,12 @@ using std::placeholders::_1;
Image2rtsp::Image2rtsp() : Node("image2rtsp"){ Image2rtsp::Image2rtsp() : Node("image2rtsp"){
// Declare and get the parameters // Declare and get the parameters
this->declare_parameter("topic", "/color/image_raw"); this->declare_parameter("topic", topics);
this->declare_parameter("mountpoint", "/back"); this->declare_parameter("mountpoint", mountpoints);
this->declare_parameter("port", "8554"); this->declare_parameter("port", "8554");
this->declare_parameter("local_only", true); this->declare_parameter("local_only", true);
this->declare_parameter("camera", false); this->declare_parameter("camera", false);
this->declare_parameter("compressed", false); this->declare_parameter("compressed", is_compressed);
this->declare_parameter("default_pipeline", R"( this->declare_parameter("default_pipeline", R"(
( appsrc name=imagesrc do-timestamp=true min-latency=0 max-latency=0 max-bytes=1000 is-live=true ! ( appsrc name=imagesrc do-timestamp=true min-latency=0 max-latency=0 max-bytes=1000 is-live=true !
@ -36,40 +36,79 @@ Image2rtsp::Image2rtsp() : Node("image2rtsp"){
rtph264pay name=pay0 pt=96 ) rtph264pay name=pay0 pt=96 )
)"); )");
topic = this->get_parameter("topic").as_string(); topics = this->get_parameter("topic").as_string_array();
mountpoint = this->get_parameter("mountpoint").as_string(); mountpoints = this->get_parameter("mountpoint").as_string_array();
port = this->get_parameter("port").as_string(); port = this->get_parameter("port").as_string();
local_only = this->get_parameter("local_only").as_bool(); local_only = this->get_parameter("local_only").as_bool();
camera = this->get_parameter("camera").as_bool(); camera = this->get_parameter("camera").as_bool();
compressed = this->get_parameter("compressed").as_bool(); is_compressed = this->get_parameter("compressed").as_bool_array();
default_pipeline = this->get_parameter("default_pipeline").as_string(); default_pipeline = this->get_parameter("default_pipeline").as_string();
camera_pipeline = this->get_parameter("camera_pipeline").as_string(); camera_pipeline = this->get_parameter("camera_pipeline").as_string();
// Start the subscription // Failsafe
if (camera == false){ if (topics.size() != mountpoints.size() || topics.size() != is_compressed.size()) {
if (compressed == false){ throw std::runtime_error("topic / mountpoint / compressed parameter sizes do not match");
subscription_ = this->create_subscription<sensor_msgs::msg::Image>(topic, 10, std::bind(&Image2rtsp::topic_callback, this, _1)); }
RCLCPP_INFO(this->get_logger(), "Subscribing to sensor_msgs::msg::Image");
} // Iterating through all parameters and creating a struct for all streams
else { streams_.clear();
subscription_compressed_ = this->create_subscription<sensor_msgs::msg::CompressedImage>(topic, 10, std::bind(&Image2rtsp::compressed_topic_callback, this, _1)); for (size_t i = 0; i < topics.size(); i++) {
RCLCPP_INFO(this->get_logger(), "Subscribing to sensor_msgs::msg::CompressedImage");
Stream s;
s.topic = topics[i];
s.mountpoint = mountpoints[i];
s.appsrc = NULL;
s.compressed = is_compressed[i];
streams_.push_back(s);
}
if (!camera) {
for (Stream &s : streams_) {
if (!s.compressed) {
s.sub = this->create_subscription<sensor_msgs::msg::Image>(
s.topic,
10,
[this, &s](sensor_msgs::msg::Image::SharedPtr msg) {
this->topic_callback(msg, &s);
});
RCLCPP_INFO(this->get_logger(),
"Subscribed to Image topic: %s",
s.topic.c_str());
}
else {
s.sub_compressed =
this->create_subscription<sensor_msgs::msg::CompressedImage>(
s.topic,
10,
[this, &s](sensor_msgs::msg::CompressedImage::SharedPtr msg) {
this->compressed_topic_callback(msg, &s);
});
RCLCPP_INFO(this->get_logger(),
"Subscribed to CompressedImage topic: %s",
s.topic.c_str());
}
} }
} }
else { else {
RCLCPP_INFO(this->get_logger(), "Trying to access camera device"); RCLCPP_INFO(this->get_logger(), "Trying to access camera device");
} }
// Start the RTSP server
video_mainloop_start();
rtsp_server = rtsp_server_create(port, local_only);
appsrc = NULL;
// Start the RTSP server
pipeline = camera ? camera_pipeline : default_pipeline; pipeline = camera ? camera_pipeline : default_pipeline;
framerate = extract_framerate(pipeline, 30); framerate = extract_framerate(pipeline, 30);
rtsp_server_add_url(mountpoint.c_str(), pipeline.c_str(), camera ? nullptr : (GstElement **)&appsrc);
RCLCPP_INFO(this->get_logger(), "Stream available at rtsp://%s:%s%s", gst_rtsp_server_get_address(rtsp_server), port.c_str(), mountpoint.c_str()); video_mainloop_start();
rtsp_server = rtsp_server_create(port, local_only);
for (Stream &s : streams_) {
s.appsrc = NULL;
rtsp_server_add_url(s.mountpoint.c_str(), pipeline.c_str(), camera ? nullptr : (GstElement **)&s.appsrc);
RCLCPP_INFO(this->get_logger(), "Stream available at rtsp://%s:%s%s", gst_rtsp_server_get_address(rtsp_server), port.c_str(), s.mountpoint.c_str());
}
} }
uint Image2rtsp::extract_framerate(const std::string& pipeline, uint default_framerate = 30) { uint Image2rtsp::extract_framerate(const std::string& pipeline, uint default_framerate = 30) {

View File

@ -66,6 +66,7 @@ void Image2rtsp::rtsp_server_add_url(const char *url, const char *sPipeline, Gst
g_object_unref(mounts); g_object_unref(mounts);
} }
static void media_configure(GstRTSPMediaFactory *factory, GstRTSPMedia *media, GstElement **appsrc){ static void media_configure(GstRTSPMediaFactory *factory, GstRTSPMedia *media, GstElement **appsrc){
if(appsrc){ if(appsrc){
GstElement *pipeline = gst_rtsp_media_get_element(media); GstElement *pipeline = gst_rtsp_media_get_element(media);
@ -154,23 +155,26 @@ static gboolean session_cleanup(Image2rtsp *node, rclcpp::Logger logger, gboolea
return TRUE; return TRUE;
} }
void Image2rtsp::topic_callback(const sensor_msgs::msg::Image::SharedPtr msg){ void Image2rtsp::topic_callback(const sensor_msgs::msg::Image::SharedPtr msg, Stream* s){
GstBuffer *buf; GstBuffer *buf;
GstCaps *caps; // image properties. see return of Image2rtsp::gst_caps_new_from_image GstCaps *caps; // image properties. see return of Image2rtsp::gst_caps_new_from_image
char *gst_type, *gst_format = (char *)""; char *gst_type, *gst_format = (char *)"";
if (appsrc != NULL){ if (s && s->appsrc){
// Set caps from message // Set caps from message
caps = gst_caps_new_from_image(msg); caps = gst_caps_new_from_image(msg);
gst_app_src_set_caps(appsrc, caps); gst_app_src_set_caps(s->appsrc , caps);
buf = gst_buffer_new_allocate(nullptr, msg->data.size(), nullptr); buf = gst_buffer_new_allocate(nullptr, msg->data.size(), nullptr);
gst_buffer_fill(buf, 0, msg->data.data(), msg->data.size()); gst_buffer_fill(buf, 0, msg->data.data(), msg->data.size());
GST_BUFFER_FLAG_SET(buf, GST_BUFFER_FLAG_LIVE); GST_BUFFER_FLAG_SET(buf, GST_BUFFER_FLAG_LIVE);
gst_app_src_push_buffer(appsrc, buf); gst_app_src_push_buffer(s->appsrc , buf);
// unref caps
// gst_caps_unref(caps);
} }
} }
void Image2rtsp::compressed_topic_callback(const sensor_msgs::msg::CompressedImage::SharedPtr msg){
if (appsrc == NULL) return; void Image2rtsp::compressed_topic_callback(const sensor_msgs::msg::CompressedImage::SharedPtr msg, Stream* s){
if (s && s->appsrc) return;
// Decompress the image // Decompress the image
cv::Mat img = cv::imdecode(cv::Mat(msg->data), cv::IMREAD_UNCHANGED); cv::Mat img = cv::imdecode(cv::Mat(msg->data), cv::IMREAD_UNCHANGED);
if (img.empty()) { if (img.empty()) {
@ -197,7 +201,7 @@ void Image2rtsp::compressed_topic_callback(const sensor_msgs::msg::CompressedIma
nullptr); nullptr);
// Set caps on appsrc // Set caps on appsrc
gst_app_src_set_caps(appsrc, caps); gst_app_src_set_caps(s->appsrc, caps);
gst_caps_unref(caps); gst_caps_unref(caps);
// Create a GstBuffer and fill it with the image data // Create a GstBuffer and fill it with the image data
@ -206,5 +210,5 @@ void Image2rtsp::compressed_topic_callback(const sensor_msgs::msg::CompressedIma
GST_BUFFER_FLAG_SET(buf, GST_BUFFER_FLAG_LIVE); GST_BUFFER_FLAG_SET(buf, GST_BUFFER_FLAG_LIVE);
// Push the buffer to GStreamer // Push the buffer to GStreamer
gst_app_src_push_buffer(appsrc, buf); gst_app_src_push_buffer(s->appsrc, buf);
} }