NVIDIA Deepstream Gst-nvmsgbroker: Difference between revisions

From RidgeRun Developer Wiki
mNo edit summary
mNo edit summary
 
(11 intermediate revisions by 2 users not shown)
Line 1: Line 1:
== Introduction ==
<seo title="Gst-nvmsgbroker | Deepstream 6.0 | RidgeRun" titlemode="replace" keywords="GStreamer, Linux SDK, Linux BSP,  Embedded Linux, Device Drivers, NVIDIA, Jetson, TX1, TX2, Jetson AGX Xavier, Jetson TX1, Jetson TX2, Embedded Linux driver development, Linux Software development, Embedded Linux SDK, Embedded Linux Application development, GStreamer Multimedia Framework, Xavier, AI, Deep Learning, gst-nvmsgbroker, NvDsMeta, NvDsPayload, metadata, GstMeta, deepstream-6.0, AMQP protocol, AMQP, Kafka, Azure MQTT, REDIS Protocol, nvmsgbroker1, Kafka adapter, Azure MQTT adapter, AMQP adapter, REDIS adapter" description="This wiki serves as an example on how to use the gst-nvmsgbroker plugin provided in DeepStream 6.0."></seo>
 
{{NVIDIA Pref Partner logo and RR Contact}}
 
== Introduction to Gst-nvmsgbroker==


This wiki serves as an example on how to use the gst-nvmsgbroker plugin provided in DeepStream 6.0: https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_plugin_gst-nvmsgbroker.html
This wiki serves as an example on how to use the gst-nvmsgbroker plugin provided in DeepStream 6.0: https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_plugin_gst-nvmsgbroker.html
Line 5: Line 9:
As stated in the documentation, the plugin will receive GstBuffers with NvDsMeta meta attached in the GstMeta and use the NvDsPayload metadata attached to them on each frame metadata inside the user metadata list. It will then use the nvds_msgapi_* interface to send the messages inside the payload.  
As stated in the documentation, the plugin will receive GstBuffers with NvDsMeta meta attached in the GstMeta and use the NvDsPayload metadata attached to them on each frame metadata inside the user metadata list. It will then use the nvds_msgapi_* interface to send the messages inside the payload.  


The plugin allows an easy configuration in which only one configuration file needs to be specified and a path the the protocol adapter library.
The plugin allows an easy configuration in which only one configuration file needs to be specified and a path to the protocol adapter library.


Below you can find a diagram of a possible pipeline to be used with the broker element:
<br>
<br>
[[File:Nvmsgbroker pipeline.png|500px|thumb|center|nvmsgbroker pipeline]]
<br>
And below is the flow of configuration and messages sending into the broker by using the nvmsgbroker element:
<br>
<br>
[[File:NvMsgbroker flow.png|400px|thumb|center|NvMsgbroker flow]]
<br>
== Protocol adapters ==
== Protocol adapters ==


Line 29: Line 43:
The AMQP underlaying protocol adapter implementation uses the librabbitmq.so library, this is build from rabbitmq-c (v0.8.0). To build and install it:
The AMQP underlaying protocol adapter implementation uses the librabbitmq.so library, this is build from rabbitmq-c (v0.8.0). To build and install it:


<pre>
<syntaxhighlight lang="bash">
git clone -b v0.8.0  --recursive https://github.com/alanxz/rabbitmq-c.git
git clone -b v0.8.0  --recursive https://github.com/alanxz/rabbitmq-c.git
cd rabbitmq-c && mkdir build && cd build
cd rabbitmq-c && mkdir build && cd build
Line 35: Line 49:
sudo cp ./librabbitmq/librabbitmq.so.4 /usr/lib/aarch64-linux-gnu/
sudo cp ./librabbitmq/librabbitmq.so.4 /usr/lib/aarch64-linux-gnu/
sudo cp ./librabbitmq/librabbitmq.so.4 /opt/nvidia/deepstream/deepstream-6.0/lib/
sudo cp ./librabbitmq/librabbitmq.so.4 /opt/nvidia/deepstream/deepstream-6.0/lib/
</pre>
</syntaxhighlight>


==== Broker dependencies ====
==== Broker dependencies ====
Line 43: Line 57:
Or you just can execute this to install the package for the system:  
Or you just can execute this to install the package for the system:  


<pre>
<syntaxhighlight lang="bash">
sudo apt-get install rabbitmq-server
sudo apt-get install rabbitmq-server
</pre>
</syntaxhighlight>


=== Element creation ===
=== Element creation ===
Line 51: Line 65:
To create a gst-nvmsgbroker:
To create a gst-nvmsgbroker:


<pre>
<syntaxhighlight lang="c++">
GstElement *nvmsgbroker1 = NULL;
GstElement *nvmsgbroker1 = NULL;
nvmsgbroker1 = gst_element_factory_make ("nvmsgbroker", "nvmsgbroker1");
nvmsgbroker1 = gst_element_factory_make ("nvmsgbroker", "nvmsgbroker1");
</pre>
</syntaxhighlight>
 


=== Element configuration ===
=== Element configuration ===


1. Configure element sync:  
1. Configure element sync:  
<pre>
<syntaxhighlight lang="c++">
g_object_set (G_OBJECT (nvmsgbroker1), "sync", 0, "async", false, NULL);
g_object_set (G_OBJECT (nvmsgbroker1), "sync", 0, "async", false, NULL);
</pre>
</syntaxhighlight>


2. Configure the protocol library on the element:
2. Configure the protocol library on the element:
<pre>
<syntaxhighlight lang="c++">
g_object_set (G_OBJECT (nvmsgbroker1), "proto-lib", "/opt/nvidia/deepstream/deepstream-6.0/lib/libnvds_amqp_proto.so", NULL);
g_object_set (G_OBJECT (nvmsgbroker1), "proto-lib", "/opt/nvidia/deepstream/deepstream-6.0/lib/libnvds_amqp_proto.so", NULL);
</pre>
</syntaxhighlight>


This will automatically select AMQP as the element message protocol adapter.
This will automatically select AMQP as the element message protocol adapter.


3. Configure the element adapter:
3. Configure the element adapter:
<pre>
<syntaxhighlight lang="c++">
g_object_set (G_OBJECT (nvmsgbroker1), "config", "/opt/nvidia/deepstream/deepstream-6.0/sources/libs/amqp_protocol_adaptor/cfg_amqp.txt", NULL);
g_object_set (G_OBJECT (nvmsgbroker1), "config", "/opt/nvidia/deepstream/deepstream-6.0/sources/libs/amqp_protocol_adaptor/cfg_amqp.txt", NULL);
</pre>
</syntaxhighlight>


In this case the following file will be used:
In this case, the following file will be used:
<pre>
<pre>
[message-broker]
[message-broker]
Line 95: Line 108:


1. Enable the management plugin for rabbitmq:
1. Enable the management plugin for rabbitmq:
<pre>
<syntaxhighlight lang="bash">
sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmq-plugins enable rabbitmq_management
</pre>
</syntaxhighlight>


2. Declare a new queue named myqueue for the user we are using (guest):
2. Declare a new queue named myqueue for the user we are using (guest):
<pre>
<syntaxhighlight lang="bash">
sudo rabbitmqadmin -u guest -p guest -V / declare queue name=myqueue durable=false auto_delete=true
sudo rabbitmqadmin -u guest -p guest -V / declare queue name=myqueue durable=false auto_delete=true
</pre>
</syntaxhighlight>


3. Declare a binding to forward the messages received in the specified exchange and topic, into the queue.
3. Declare a binding to forward the messages received in the specified exchange and topic, into the queue.
<pre>
<syntaxhighlight lang="bash">
sudo rabbitmqadmin -u guest -p guest -V / declare binding source=amq.topic destination=myqueue routing_key=topicname
sudo rabbitmqadmin -u guest -p guest -V / declare binding source=amq.topic destination=myqueue routing_key=topicname
</pre>
</syntaxhighlight>


4. Read the messages back from the queue:
4. Read the messages back from the queue:
<pre>
<syntaxhighlight lang="bash">
rabbitmqadmin get queue=myqueue count=1 requeue=false
rabbitmqadmin get queue=myqueue count=1 requeue=false
</pre>
</syntaxhighlight>
 
== Adding NvDsPayload to NvDsMeta ==
 
Assuming that you have a batch_meta (NvDsBatchMeta*) and a frame_meta (NvDsFrameMeta*), you can add the payload like this to these:
<syntaxhighlight lang="c++">
/* Acquire new meta and add it to the frame */
NvDsUserMeta* new_user_meta = nvds_acquire_user_meta_from_pool(batch_meta);
new_user_meta->base_meta.meta_type = NVDS_PAYLOAD_META;
nvds_add_user_meta_to_frame(frame_meta, new_user_meta);
 
/* Create payload */
NvDsPayload* payload = g_new(NvDsPayload, 1);
 
payload->payloadSize = <PAYLOAD_SIZE>;
gchar* payload_message = (gchar*) g_malloc0(<PAYLOAD_SIZE>);
 
/* Copy the data to the payload_message ptr */
g_stpcpy(payload_message, <DATA_PTR>);
 
payload->payload = payload_message;
new_user_meta->user_meta_data = (void*) payload;
 
</syntaxhighlight>
 
You may need to find a way to free this memory if necessary for your use case.
 
{{ContactUs}}
 
 
[[Category:GStreamer]][[Category:Jetson]][[Category:JetsonNano]][[Category:JetsonTX2]][[Category:NVIDIA Xavier]]

Latest revision as of 19:12, 26 June 2024


Introduction to Gst-nvmsgbroker

This wiki serves as an example on how to use the gst-nvmsgbroker plugin provided in DeepStream 6.0: https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_plugin_gst-nvmsgbroker.html

As stated in the documentation, the plugin will receive GstBuffers with NvDsMeta meta attached in the GstMeta and use the NvDsPayload metadata attached to them on each frame metadata inside the user metadata list. It will then use the nvds_msgapi_* interface to send the messages inside the payload.

The plugin allows an easy configuration in which only one configuration file needs to be specified and a path to the protocol adapter library.

Below you can find a diagram of a possible pipeline to be used with the broker element:

nvmsgbroker pipeline


And below is the flow of configuration and messages sending into the broker by using the nvmsgbroker element:

NvMsgbroker flow


Protocol adapters

The documentation lists different adapters that can be used:

  • Kafka Protocol Adapter
  • Azure MQTT Protocol Adapter
  • AMQP Protocol Adapter
  • REDIS Protocol Adapter

All libraries for the adapters are located in the following path:

/opt/nvidia/deepstream/deepstream-6.0/lib

AMQP protocol adapter example

Below is an example for using this plugin with the AMQP adapter with all steps required to read messages.

Dependencies for AMQP

DS adapter dependencies

The AMQP underlaying protocol adapter implementation uses the librabbitmq.so library, this is build from rabbitmq-c (v0.8.0). To build and install it:

git clone -b v0.8.0  --recursive https://github.com/alanxz/rabbitmq-c.git
cd rabbitmq-c && mkdir build && cd build
cmake .. && cmake --build .
sudo cp ./librabbitmq/librabbitmq.so.4 /usr/lib/aarch64-linux-gnu/
sudo cp ./librabbitmq/librabbitmq.so.4 /opt/nvidia/deepstream/deepstream-6.0/lib/

Broker dependencies

You will also need a message broker running compatible with AMQP 0-9-1, you can install one by looking at here: https://www.rabbitmq.com/install-debian.html

Or you just can execute this to install the package for the system:

sudo apt-get install rabbitmq-server

Element creation

To create a gst-nvmsgbroker:

GstElement *nvmsgbroker1 = NULL;
nvmsgbroker1 = gst_element_factory_make ("nvmsgbroker", "nvmsgbroker1");

Element configuration

1. Configure element sync:

g_object_set (G_OBJECT (nvmsgbroker1), "sync", 0, "async", false, NULL);

2. Configure the protocol library on the element:

g_object_set (G_OBJECT (nvmsgbroker1), "proto-lib", "/opt/nvidia/deepstream/deepstream-6.0/lib/libnvds_amqp_proto.so", NULL);

This will automatically select AMQP as the element message protocol adapter.

3. Configure the element adapter:

g_object_set (G_OBJECT (nvmsgbroker1), "config", "/opt/nvidia/deepstream/deepstream-6.0/sources/libs/amqp_protocol_adaptor/cfg_amqp.txt", NULL);

In this case, the following file will be used:

[message-broker]
hostname = localhost
port = 5672
username = guest
password = guest
exchange = amq.topic
topic = topicname
#share-connection = 1

This will configure the element to send messages with this configuration for the broker.

Reading messages

To read the messages, follow a series of steps:

1. Enable the management plugin for rabbitmq:

sudo rabbitmq-plugins enable rabbitmq_management

2. Declare a new queue named myqueue for the user we are using (guest):

sudo rabbitmqadmin -u guest -p guest -V / declare queue name=myqueue durable=false auto_delete=true

3. Declare a binding to forward the messages received in the specified exchange and topic, into the queue.

sudo rabbitmqadmin -u guest -p guest -V / declare binding source=amq.topic destination=myqueue routing_key=topicname

4. Read the messages back from the queue:

rabbitmqadmin get queue=myqueue count=1 requeue=false

Adding NvDsPayload to NvDsMeta

Assuming that you have a batch_meta (NvDsBatchMeta*) and a frame_meta (NvDsFrameMeta*), you can add the payload like this to these:

/* Acquire new meta and add it to the frame */
NvDsUserMeta* new_user_meta = nvds_acquire_user_meta_from_pool(batch_meta);
new_user_meta->base_meta.meta_type = NVDS_PAYLOAD_META;
nvds_add_user_meta_to_frame(frame_meta, new_user_meta);

/* Create payload */
NvDsPayload* payload = g_new(NvDsPayload, 1);

payload->payloadSize = <PAYLOAD_SIZE>;
gchar* payload_message = (gchar*) g_malloc0(<PAYLOAD_SIZE>);

/* Copy the data to the payload_message ptr */
g_stpcpy(payload_message, <DATA_PTR>);

payload->payload = payload_message;
new_user_meta->user_meta_data = (void*) payload;

You may need to find a way to free this memory if necessary for your use case.


RidgeRun Resources

Quick Start Client Engagement Process RidgeRun Blog Homepage
Technical and Sales Support RidgeRun Online Store RidgeRun Videos Contact Us
RidgeRun.ai: Artificial Intelligence | Generative AI | Machine Learning

Contact Us

Visit our Main Website for the RidgeRun Products and Online Store. RidgeRun Engineering information is available at RidgeRun Engineering Services, RidgeRun Professional Services, RidgeRun Subscription Model and Client Engagement Process wiki pages. Please email to support@ridgerun.com for technical questions and contactus@ridgerun.com for other queries. Contact details for sponsoring the RidgeRun GStreamer projects are available in Sponsor Projects page.