NVIDIA Deepstream Gst-nvmsgbroker

From RidgeRun Developer Wiki



Introduction to Gst-nvmsgbroker

This wiki serves as an example on how to use the gst-nvmsgbroker plugin provided in DeepStream 6.0: https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_plugin_gst-nvmsgbroker.html

As stated in the documentation, the plugin will receive GstBuffers with NvDsMeta meta attached in the GstMeta and use the NvDsPayload metadata attached to them on each frame metadata inside the user metadata list. It will then use the nvds_msgapi_* interface to send the messages inside the payload.

The plugin allows an easy configuration in which only one configuration file needs to be specified and a path to the protocol adapter library.

Below you can find a diagram of a possible pipeline to be used with the broker element:

nvmsgbroker pipeline


And below is the flow of configuration and messages sending into the broker by using the nvmsgbroker element:

NvMsgbroker flow


Protocol adapters

The documentation lists different adapters that can be used:

  • Kafka Protocol Adapter
  • Azure MQTT Protocol Adapter
  • AMQP Protocol Adapter
  • REDIS Protocol Adapter

All libraries for the adapters are located in the following path:

/opt/nvidia/deepstream/deepstream-6.0/lib

AMQP protocol adapter example

Below is an example for using this plugin with the AMQP adapter with all steps required to read messages.

Dependencies for AMQP

DS adapter dependencies

The AMQP underlaying protocol adapter implementation uses the librabbitmq.so library, this is build from rabbitmq-c (v0.8.0). To build and install it:

git clone -b v0.8.0  --recursive https://github.com/alanxz/rabbitmq-c.git
cd rabbitmq-c && mkdir build && cd build
cmake .. && cmake --build .
sudo cp ./librabbitmq/librabbitmq.so.4 /usr/lib/aarch64-linux-gnu/
sudo cp ./librabbitmq/librabbitmq.so.4 /opt/nvidia/deepstream/deepstream-6.0/lib/

Broker dependencies

You will also need a message broker running compatible with AMQP 0-9-1, you can install one by looking at here: https://www.rabbitmq.com/install-debian.html

Or you just can execute this to install the package for the system:

sudo apt-get install rabbitmq-server

Element creation

To create a gst-nvmsgbroker:

GstElement *nvmsgbroker1 = NULL;
nvmsgbroker1 = gst_element_factory_make ("nvmsgbroker", "nvmsgbroker1");

Element configuration

1. Configure element sync:

g_object_set (G_OBJECT (nvmsgbroker1), "sync", 0, "async", false, NULL);

2. Configure the protocol library on the element:

g_object_set (G_OBJECT (nvmsgbroker1), "proto-lib", "/opt/nvidia/deepstream/deepstream-6.0/lib/libnvds_amqp_proto.so", NULL);

This will automatically select AMQP as the element message protocol adapter.

3. Configure the element adapter:

g_object_set (G_OBJECT (nvmsgbroker1), "config", "/opt/nvidia/deepstream/deepstream-6.0/sources/libs/amqp_protocol_adaptor/cfg_amqp.txt", NULL);

In this case, the following file will be used:

[message-broker]
hostname = localhost
port = 5672
username = guest
password = guest
exchange = amq.topic
topic = topicname
#share-connection = 1

This will configure the element to send messages with this configuration for the broker.

Reading messages

To read the messages, follow a series of steps:

1. Enable the management plugin for rabbitmq:

sudo rabbitmq-plugins enable rabbitmq_management

2. Declare a new queue named myqueue for the user we are using (guest):

sudo rabbitmqadmin -u guest -p guest -V / declare queue name=myqueue durable=false auto_delete=true

3. Declare a binding to forward the messages received in the specified exchange and topic, into the queue.

sudo rabbitmqadmin -u guest -p guest -V / declare binding source=amq.topic destination=myqueue routing_key=topicname

4. Read the messages back from the queue:

rabbitmqadmin get queue=myqueue count=1 requeue=false

Adding NvDsPayload to NvDsMeta

Assuming that you have a batch_meta (NvDsBatchMeta*) and a frame_meta (NvDsFrameMeta*), you can add the payload like this to these:

/* Acquire new meta and add it to the frame */
NvDsUserMeta* new_user_meta = nvds_acquire_user_meta_from_pool(batch_meta);
new_user_meta->base_meta.meta_type = NVDS_PAYLOAD_META;
nvds_add_user_meta_to_frame(frame_meta, new_user_meta);

/* Create payload */
NvDsPayload* payload = g_new(NvDsPayload, 1);

payload->payloadSize = <PAYLOAD_SIZE>;
gchar* payload_message = (gchar*) g_malloc0(<PAYLOAD_SIZE>);

/* Copy the data to the payload_message ptr */
g_stpcpy(payload_message, <DATA_PTR>);

payload->payload = payload_message;
new_user_meta->user_meta_data = (void*) payload;

You may need to find a way to free this memory if necessary for your use case.



For direct inquiries, please refer to the contact information available on our Contact page. Alternatively, you may complete and submit the form provided at the same link. We will respond to your request at our earliest opportunity.


Links to RidgeRun Resources and RidgeRun Artificial Intelligence Solutions can be found in the footer below.