NVIDIA Deepstream Gst-nvmsgbroker
|
Introduction to Gst-nvmsgbroker
This wiki serves as an example on how to use the gst-nvmsgbroker plugin provided in DeepStream 6.0: https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_plugin_gst-nvmsgbroker.html
As stated in the documentation, the plugin will receive GstBuffers with NvDsMeta meta attached in the GstMeta and use the NvDsPayload metadata attached to them on each frame metadata inside the user metadata list. It will then use the nvds_msgapi_* interface to send the messages inside the payload.
The plugin allows an easy configuration in which only one configuration file needs to be specified and a path to the protocol adapter library.
Below you can find a diagram of a possible pipeline to be used with the broker element:
And below is the flow of configuration and messages sending into the broker by using the nvmsgbroker element:
Protocol adapters
The documentation lists different adapters that can be used:
- Kafka Protocol Adapter
- Azure MQTT Protocol Adapter
- AMQP Protocol Adapter
- REDIS Protocol Adapter
All libraries for the adapters are located in the following path:
/opt/nvidia/deepstream/deepstream-6.0/lib
AMQP protocol adapter example
Below is an example for using this plugin with the AMQP adapter with all steps required to read messages.
Dependencies for AMQP
DS adapter dependencies
The AMQP underlaying protocol adapter implementation uses the librabbitmq.so library, this is build from rabbitmq-c (v0.8.0). To build and install it:
git clone -b v0.8.0 --recursive https://github.com/alanxz/rabbitmq-c.git cd rabbitmq-c && mkdir build && cd build cmake .. && cmake --build . sudo cp ./librabbitmq/librabbitmq.so.4 /usr/lib/aarch64-linux-gnu/ sudo cp ./librabbitmq/librabbitmq.so.4 /opt/nvidia/deepstream/deepstream-6.0/lib/
Broker dependencies
You will also need a message broker running compatible with AMQP 0-9-1, you can install one by looking at here: https://www.rabbitmq.com/install-debian.html
Or you just can execute this to install the package for the system:
sudo apt-get install rabbitmq-server
Element creation
To create a gst-nvmsgbroker:
GstElement *nvmsgbroker1 = NULL; nvmsgbroker1 = gst_element_factory_make ("nvmsgbroker", "nvmsgbroker1");
Element configuration
1. Configure element sync:
g_object_set (G_OBJECT (nvmsgbroker1), "sync", 0, "async", false, NULL);
2. Configure the protocol library on the element:
g_object_set (G_OBJECT (nvmsgbroker1), "proto-lib", "/opt/nvidia/deepstream/deepstream-6.0/lib/libnvds_amqp_proto.so", NULL);
This will automatically select AMQP as the element message protocol adapter.
3. Configure the element adapter:
g_object_set (G_OBJECT (nvmsgbroker1), "config", "/opt/nvidia/deepstream/deepstream-6.0/sources/libs/amqp_protocol_adaptor/cfg_amqp.txt", NULL);
In this case, the following file will be used:
[message-broker] hostname = localhost port = 5672 username = guest password = guest exchange = amq.topic topic = topicname #share-connection = 1
This will configure the element to send messages with this configuration for the broker.
Reading messages
To read the messages, follow a series of steps:
1. Enable the management plugin for rabbitmq:
sudo rabbitmq-plugins enable rabbitmq_management
2. Declare a new queue named myqueue for the user we are using (guest):
sudo rabbitmqadmin -u guest -p guest -V / declare queue name=myqueue durable=false auto_delete=true
3. Declare a binding to forward the messages received in the specified exchange and topic, into the queue.
sudo rabbitmqadmin -u guest -p guest -V / declare binding source=amq.topic destination=myqueue routing_key=topicname
4. Read the messages back from the queue:
rabbitmqadmin get queue=myqueue count=1 requeue=false
Adding NvDsPayload to NvDsMeta
Assuming that you have a batch_meta (NvDsBatchMeta*) and a frame_meta (NvDsFrameMeta*), you can add the payload like this to these:
/* Acquire new meta and add it to the frame */ NvDsUserMeta* new_user_meta = nvds_acquire_user_meta_from_pool(batch_meta); new_user_meta->base_meta.meta_type = NVDS_PAYLOAD_META; nvds_add_user_meta_to_frame(frame_meta, new_user_meta); /* Create payload */ NvDsPayload* payload = g_new(NvDsPayload, 1); payload->payloadSize = <PAYLOAD_SIZE>; gchar* payload_message = (gchar*) g_malloc0(<PAYLOAD_SIZE>); /* Copy the data to the payload_message ptr */ g_stpcpy(payload_message, <DATA_PTR>); payload->payload = payload_message; new_user_meta->user_meta_data = (void*) payload;
You may need to find a way to free this memory if necessary for your use case.
For direct inquiries, please refer to the contact information available on our Contact page. Alternatively, you may complete and submit the form provided at the same link. We will respond to your request at our earliest opportunity.
Links to RidgeRun Resources and RidgeRun Artificial Intelligence Solutions can be found in the footer below.