RidgeRun Metadata/Metadata Integration with Messaging Systems/MQTT

Introduction
Message Queuing Telemetry Transport (MQTT) is a lightweight messaging protocol based on the publish/subscribe model, designed for networks with constrained resources, high latency, or unreliable connections.
| Feature | Description |
|---|---|
| Publish/Subscribe model | Clients can publish messages to specific topics, while other clients can subscribe to those topics to receive messages asynchronously. |
| Low bandwidth usage | Extremely lightweight protocol, making it ideal for embedded systems and mobile networks. |
| Quality of Service (QoS) | • QoS 0: At most once (no acknowledgment). • QoS 1: At least once (requires acknowledgment). • QoS 2: Exactly once (handshake mechanism). |
| Runs over TCP/IP | Operates over TCP connections, ensuring reliable message delivery. |
| Retained messages | Broker stores the last published message on a topic and sends it to new subscribers immediately. |
| Last Will and Testament (LWT) | Broker sends a pre-defined message if a client disconnects unexpectedly. |
MQTT is particularly well-suited for scenarios where bandwidth is limited or costly, and where devices operate under intermittent or unstable connectivity conditions. Its lightweight nature enables reliable message delivery without introducing significant overhead, making it ideal for constrained environments. Additionally, MQTT is highly scalable and can support communication across thousands of devices, which is essential for modern distributed systems.
Typical use cases include IoT sensor telemetry—such as temperature, GPS, and pressure readings—remote monitoring of field devices like drones, robots, and security cameras, and smart home applications involving lighting, alarms, or automation.
Metadata and MQTT
Metadata allows MQTT messages to be enriched with additional information that facilitates interpretation, traceability, and automated processing. In systems where telemetry, video, and sensor data converge — such as in defense, drones, automotive, or video surveillance applications — metadata plays a critical role.
| Metadata Usage | Benefit |
|---|---|
| Data contextualization | Helps understand the origin, purpose, or format of the message (e.g., {"sensor": "GPS", "unit": "m/s", "source": "drone-07"}).
|
| Intelligent filtering | Brokers or subscribers can apply more specific subscription rules or route messages based on tags. |
| Multimodal synchronization | Enables alignment of sensor, video, and telemetry data using timestamps or common IDs in the metadata. |
| Audit and traceability | Adding information like device ID, location, or firmware version allows tracking and logging of data origins. |
| Interoperability | Facilitates integration between different systems by defining a common metadata schema (e.g., JSON or CBOR). |
Metadata can be transported within a media stream using communication protocols such as RTMP, SRT, UDP, or MPEG-TS, and then extracted for publication via MQTT. This enables seamless integration between edge devices and cloud-based systems.
RidgeRun enables advanced metadata workflows through a set of specialized GStreamer elements designed to handle in-band metadata, supporting structured formats from standards such as SEI, OBU, and MPEG-TS/KLV. MQTT complements these workflows by carrying the extracted metadata to cloud services, dashboards, or AI systems, enabling real-time processing, monitoring, and automation.
Examples:
🔹 SEI Metadata: Extracted from an H.264/H.265 stream using seiextract, then published via MQTT to analytics engines or user dashboards.
🔹 RTMP Metadata: Injected using flvmux as ScriptData, extracted downstream, converted to JSON, and forwarded over MQTT.
🔹 MPEG-TS + KLV: MISB-compliant metadata inserted as KLV (Key-Length-Value) can be extracted, parsed into structured formats (e.g. JSON), and published as MQTT messages.
🔹 OBU Metadata: Often used to transport AI inference results like bounding boxes or classification labels, which can be extracted and sent via MQTT for real-time applications (e.g., alerting, visualization, decision-making).
Example
This example demonstrates the use of metadata over MQTT, simulating a real-world scenario: a drone generates metadata remotely, which is transmitted via UDP and then published to an MQTT broker. Any subscribed client can subsequently consume this metadata.
The full scenario is illustrated, including:
- The GStreamer pipeline that generates and sends the metadata.
- The bridge that receives the metadata over UDP and publishes it to the MQTT broker.
- The client application that subscribes to the broker and consumes the metadata.
Run example
Step 1 – Start the MQTT Broker (Mosquitto in Docker)
In the first terminal, create a simple configuration file for Mosquitto:
printf "listener 1883 0.0.0.0\nallow_anonymous true\npersistence false\n" > mosquitto.conf
Run the broker:
docker run --rm -p 1883:1883 \ -v "$PWD/mosquitto.conf:/mosquitto/config/mosquitto.conf" \ eclipse-mosquitto:2
Step 2 – Run the Metadata Injection Script (GStreamer + SEI + UDP)
In a second terminal, run the Python script that injects binary SEI metadata into the video stream using GStreamer and sends the stream over UDP.
Use the following command:
python3 sender_inject_udp_mqtt.py --host 127.0.0.1 --port 5000 --stream-id cam01
Python script:
#!/usr/bin/env python3
import argparse
import json
import gi
import signal
gi.require_version('Gst','1.0')
gi.require_version('GLib','2.0')
from gi.repository import Gst, GLib
Gst.init(None)
def main():
ap = argparse.ArgumentParser(description="Send H264 over RTP/UDP with per-buffer SEI JSON")
ap.add_argument('--host', default='127.0.0.1', help='Receiver host')
ap.add_argument('--port', type=int, default=5000, help='Receiver UDP port')
ap.add_argument('--stream-id', default='cam01')
ap.add_argument('--fps', default='30/1', help='e.g. 30/1')
# Use -1 to mean infinite.
ap.add_argument('--num-buffers', type=int, default=-1, help='-1 = infinite (omit property); >=0 sets num-buffers')
args = ap.parse_args()
nbuf = "" if args.num_buffers < 0 else f" num-buffers={args.num_buffers}"
pipeline = Gst.parse_launch(
f"videotestsrc is-live=true do-timestamp=true {nbuf} ! "
f"video/x-raw,framerate={args.fps} ! "
f"x264enc tune=zerolatency key-int-max=30 ! "
f"h264parse ! video/x-h264,stream-format=byte-stream,alignment=au ! "
f"seiinject name=inj ! "
f"rtph264pay config-interval=1 ! "
f"udpsink host={args.host} port={args.port} sync=false async=false"
)
inj = pipeline.get_by_name('inj')
seq = 0
def on_inject(_pad, _info):
nonlocal seq
buf = _info.get_buffer()
if buf and buf.pts != Gst.CLOCK_TIME_NONE:
ts_ns = int(buf.pts)
else:
clk = inj.get_clock()
ts_ns = int(clk.get_time()) if clk else int(GLib.get_monotonic_time() * 1000)
payload = {
"stream_id": args.stream_id,
"seq": seq,
"ts_ns": ts_ns,
"objects": [{
"id": f"obj-{(seq % 9999) + 1}",
"label": ["person","car","dog","bicycle"][seq % 4],
"confidence": round(0.5 + (seq % 50)/100.0, 2),
"bbox": {"x":0.1+(seq%5)*0.1, "y":0.1, "w":0.3, "h":0.3}
}],
"source": "seiinject"
}
inj.set_property("metadata", json.dumps(payload))
seq += 1
return Gst.PadProbeReturn.OK
inj.get_static_pad("sink").add_probe(Gst.PadProbeType.BUFFER, on_inject)
# Run
pipeline.set_state(Gst.State.PLAYING)
bus = pipeline.get_bus(); bus.add_signal_watch()
loop = GLib.MainLoop()
def stop(*_):
print("[Sender] Shutting down...")
pipeline.send_event(Gst.Event.new_eos())
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)
def on_msg(_bus, msg):
if msg.type == Gst.MessageType.ERROR:
err, dbg = msg.parse_error()
print("[Sender][ERROR]", err, "\n debug:", (dbg or "").strip())
pipeline.set_state(Gst.State.NULL); loop.quit()
elif msg.type == Gst.MessageType.EOS:
print("[Sender] EOS")
pipeline.set_state(Gst.State.NULL); loop.quit()
bus.connect("message", on_msg)
print(f"[Sender] RTP/H264 → udp://{args.host}:{args.port} , stream_id={args.stream_id}, num-buffers={'∞' if args.num_buffers<0 else args.num_buffers}")
loop.run()
if __name__ == '__main__':
main()
Step 3 – Receive the Stream, Extract Metadata, and Publish to MQTT
In a third terminal, run the script that listens for the incoming video stream, extracts SEI metadata using seiextract, and publishes the metadata to a specified MQTT topic.
Use the following command:
python3 bridge_extract_to_bus_mqtt.py --port 5000 \ --mqtt-host 127.0.0.1 --mqtt-port 1883 --mqtt-topic rr/metadata/sei --verbose
Python script:
#!/usr/bin/env python3
import argparse
import gi
import json
import signal
import sys
gi.require_version('Gst','1.0')
gi.require_version('GLib','2.0')
gi.require_version('GstSei', '1.0')
from gi.repository import Gst, GLib, GstSei
try:
import paho.mqtt.client as mqtt
except Exception:
print("ERROR: paho-mqtt is not installed. Execute: pip install paho-mqtt", file=sys.stderr)
raise
Gst.init(None)
# Convert GBytes (used in SEI buffers) to native Python bytes
def bytes_from_gbytes(gb):
try:
return bytes(gb)
except Exception:
data = gb.get_data()
mv = data[0] if isinstance(data, tuple) else data
return bytes(mv)
def main():
ap = argparse.ArgumentParser(description="Recibe RTP/H264, extrae SEI (JSON) y publica en MQTT")
ap.add_argument('--port', type=int, default=5000, help='Puerto UDP de escucha')
# MQTT-specific options
ap.add_argument('--mqtt-host', default='127.0.0.1')
ap.add_argument('--mqtt-port', type=int, default=1883)
ap.add_argument('--mqtt-topic', default='rr/metadata/sei')
ap.add_argument('--client-id', default='rr-sei-bridge')
ap.add_argument('--qos', type=int, choices=[0,1,2], default=0)
# Set up MQTT client and connection
ap.add_argument('--verbose', action='store_true')
args = ap.parse_args()
client = mqtt.Client(client_id=args.client_id, protocol=mqtt.MQTTv311, transport="tcp")
client.enable_logger()
def on_connect(c, u, flags, rc):
print(f"[Bridge][MQTT] conectado rc={rc} a {args.mqtt_host}:{args.mqtt_port}")
client.on_connect = on_connect
client.connect(args.mqtt_host, args.mqtt_port, keepalive=30)
client.loop_start()
print(f"[Bridge] MQTT -> {args.mqtt_host}:{args.mqtt_port} topic={args.mqtt_topic} qos={args.qos}")
# Pipeline: udpsrc → jitterbuffer → depay → parse → caps(AU) → seiextract
caps = f"application/x-rtp,media=video,encoding-name=H264,clock-rate=90000,payload=96"
pipe = Gst.parse_launch(
f"udpsrc port={args.port} caps=\"{caps}\" ! "
f"rtpjitterbuffer ! rtph264depay ! "
f"h264parse ! video/x-h264,stream-format=byte-stream,alignment=au ! "
f"seiextract name=ext ! fakesink"
)
ext = pipe.get_by_name('ext')
def publish_mqtt(d: dict):
payload = json.dumps(d, separators=(',',':')).encode()
r = client.publish(args.mqtt_topic, payload, qos=args.qos)
if r.rc != mqtt.MQTT_ERR_SUCCESS:
print("[Bridge][MQTT] publish failed rc=", r.rc, file=sys.stderr)
# Pad probe callback that extracts SEI metadata from video buffers
def on_extract(_pad, info, _ud):
buf = info.get_buffer()
if not buf:
return Gst.PadProbeReturn.OK
gbytes = GstSei.buffer_get_meta_bytes(buf)
if not gbytes:
return Gst.PadProbeReturn.OK
raw = bytes_from_gbytes(gbytes)
s = raw.decode('utf-8', errors='ignore').rstrip('\x00').strip()
# Parse JSON if possible, otherwise wrap as raw string
try:
obj = json.loads(s)
except Exception:
obj = {"raw": s}
# Add timestamp from local clock and sender timestamp
clk = ext.get_clock()
now_ns = int(clk.get_time()) if clk else 0
ts_ns = int(obj.get("ts_ns", 0))
obj["bridge_ts_ns"] = now_ns
obj["sender_ts_ns"] = ts_ns
if args.verbose:
preview = s[:120] + ("..." if len(s) > 120 else "")
print("[Bridge][EXTRACT]", preview)
publish_mqtt(obj)
if args.verbose:
print(f"[Bridge][MQTT] sent seq={obj.get('seq')} sender_ts_ns={ts_ns}")
return Gst.PadProbeReturn.OK
probe_id = ext.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER, on_extract, None)
# Run loop
pipe.set_state(Gst.State.PLAYING)
bus = pipe.get_bus(); bus.add_signal_watch()
loop = GLib.MainLoop()
def stop(*_):
try:
print("[Bridge] Shutting down...")
ext.get_static_pad("src").remove_probe(probe_id)
pipe.set_state(Gst.State.NULL)
client.loop_stop()
client.disconnect()
loop.quit()
except Exception:
pass
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)
def on_msg(_bus, msg):
if msg.type in (Gst.MessageType.ERROR, Gst.MessageType.EOS):
pipe.set_state(Gst.State.NULL)
client.loop_stop(); client.disconnect()
loop.quit()
bus.connect("message", on_msg)
print(f"[Bridge] listening udp://0.0.0.0:{args.port}")
loop.run()
if __name__ == '__main__':
main()
Expected output:
[Bridge] listening udp://0.0.0.0:5000 (pt=96)
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 120, "ts_ns": 8739572248968, "objects": [{"id": "obj-121", "label": "person", "confidence"...
[Bridge][MQTT] sent seq=120 delay_ms=1
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 121, "ts_ns": 8739605565072, "objects": [{"id": "obj-122", "label": "car", "confidence": 0...
[Bridge][MQTT] sent seq=121 delay_ms=0
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 122, "ts_ns": 8739641614308, "objects": [{"id": "obj-123", "label": "dog", "confidence": 0...
[Bridge][MQTT] sent seq=122 delay_ms=0
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 123, "ts_ns": 8739675440602, "objects": [{"id": "obj-124", "label": "bicycle", "confidence...
[Bridge][MQTT] sent seq=123 delay_ms=0
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 124, "ts_ns": 8739705807441, "objects": [{"id": "obj-125", "label": "person", "confidence"...
[Bridge][MQTT] sent seq=124 delay_ms=0
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 125, "ts_ns": 8739739074098, "objects": [{"id": "obj-126", "label": "car", "confidence": 0...
[Bridge][MQTT] sent seq=125 delay_ms=0
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 126, "ts_ns": 8739773187326, "objects": [{"id": "obj-127", "label": "dog", "confidence": 0...
[Bridge][MQTT] sent seq=126 delay_ms=0
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 127, "ts_ns": 8739806423217, "objects": [{"id": "obj-128", "label": "bicycle", "confidence...
[Bridge][MQTT] sent seq=127 delay_ms=0
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 128, "ts_ns": 8739842294749, "objects": [{"id": "obj-129", "label": "person", "confidence"...
[Bridge][MQTT] sent seq=128 delay_ms=0
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 129, "ts_ns": 8739876305643, "objects": [{"id": "obj-130", "label": "car", "confidence": 0...
Step 4 – Subscribe to the Metadata Topic
In a fourth terminal, run a simple MQTT client script that subscribes to the metadata topic and prints the received SEI metadata to the console.
Use the following command:
python3 mqtt_print.py --host 127.0.0.1 --port 1883 --topic rr/metadata/sei
Python script:
#!/usr/bin/env python3
import argparse
import json
import paho.mqtt.client as mqtt
import signal
def main():
ap = argparse.ArgumentParser()
ap.add_argument('--host', default='127.0.0.1')
ap.add_argument('--port', type=int, default=1883)
ap.add_argument('--topic', default='rr/metadata/sei')
ap.add_argument('--client-id', default='rr-sei-receiver')
args = ap.parse_args()
# Callback when client connects to the MQTT broker
def on_connect(client, _ud, _flags, rc):
print(f"[MQTT] connected rc={rc}; subscribing {args.topic}")
client.subscribe(args.topic, qos=0)
def on_disconnect(_c, _u, rc):
print("[MQTT] disconnected rc=", rc)
# Callback when subscription is successful
def on_subscribe(_c, _u, mid, granted_qos):
print(f"[MQTT] subscribed mid={mid} qos={granted_qos}")
def on_message(_client, _ud, msg):
try:
data = json.loads(msg.payload.decode('utf-8'))
except Exception:
data = {"raw": msg.payload.decode('utf-8', errors='ignore')}
print("[MQTT]", data)
def on_log(_c, _u, level, buf):
print("[MQTT][LOG]", buf)
# Create and configure MQTT client
c = mqtt.Client(client_id=args.client_id, protocol=mqtt.MQTTv311, transport="tcp")
c.enable_logger()
c.on_connect = on_connect
c.on_disconnect = on_disconnect
c.on_subscribe = on_subscribe
c.on_message = on_message
c.on_log = on_log
c.connect(args.host, args.port, 30)
def stop(*_):
print("[MQTT] Shutting down...")
try: c.disconnect()
finally: sys.exit(0)
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)
c.loop_forever()
if __name__ == '__main__':
main()
Expected output
[MQTT][LOG] Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k30) client_id=b'rr-sei-receiver'
[MQTT][LOG] Received CONNACK (0, 0)
[MQTT] connected rc=0; subscribing rr/metadata/sei
[MQTT][LOG] Sending SUBSCRIBE (d0, m1) [(b'rr/metadata/sei', 0)]
[MQTT][LOG] Received SUBACK
[MQTT] subscribed mid=1 qos=(0,)
[MQTT][LOG] Received PUBLISH (d0, q0, r0, m0), 'rr/metadata/sei', ... (243 bytes)
[MQTT] {'stream_id': 'cam01', 'seq': 2525, 'ts_ns': 3600084166666666, 'objects': [{'id': 'obj-2526', 'label': 'car', 'confidence': 0.75, 'bbox': {'x': 0.1, 'y': 0.1, 'w': 0.3, 'h': 0.3}}], 'source': 'seiinject', 'bridge_ts_ns': 19506672392923, 'sender_ts_ns': 3600084166666666}
[MQTT][LOG] Received PUBLISH (d0, q0, r0, m0), 'rr/metadata/sei', ... (243 bytes)
[MQTT] {'stream_id': 'cam01', 'seq': 2526, 'ts_ns': 3600084200000000, 'objects': [{'id': 'obj-2527', 'label': 'dog', 'confidence': 0.76, 'bbox': {'x': 0.2, 'y': 0.1, 'w': 0.3, 'h': 0.3}}], 'source': 'seiinject', 'bridge_ts_ns': 19506705847587, 'sender_ts_ns': 3600084200000000}
[MQTT][LOG] Received PUBLISH (d0, q0, r0, m0), 'rr/metadata/sei', ... (263 bytes)
[MQTT] {'stream_id': 'cam01', 'seq': 2527, 'ts_ns': 3600084233333333, 'objects': [{'id': 'obj-2528', 'label': 'bicycle', 'confidence': 0.77, 'bbox': {'x': 0.30000000000000004, 'y': 0.1, 'w': 0.3, 'h': 0.3}}], 'source': 'seiinject', 'bridge_ts_ns': 19506743646515, 'sender_ts_ns': 3600084233333333}
[MQTT][LOG] Received PUBLISH (d0, q0, r0, m0), 'rr/metadata/sei', ... (246 bytes)
[MQTT] {'stream_id': 'cam01', 'seq': 2528, 'ts_ns': 3600084266666666, 'objects': [{'id': 'obj-2529', 'label': 'person', 'confidence': 0.78, 'bbox': {'x': 0.4, 'y': 0.1, 'w': 0.3, 'h': 0.3}}], 'source': 'seiinject', 'bridge_ts_ns': 19506775863740, 'sender_ts_ns': 3600084266666666}
[MQTT][LOG] Received PUBLISH (d0, q0, r0, m0), 'rr/metadata/sei', ... (243 bytes)
[MQTT] {'stream_id': 'cam01', 'seq': 2529, 'ts_ns': 3600084300000000, 'objects': [{'id': 'obj-2530', 'label': 'car', 'confidence': 0.79, 'bbox': {'x': 0.5, 'y': 0.1, 'w': 0.3, 'h': 0.3}}], 'source': 'seiinject', 'bridge_ts_ns': 19506805569339, 'sender_ts_ns': 3600084300000000}
[MQTT][LOG] Received PUBLISH (d0, q0, r0, m0), 'rr/metadata/sei', ... (242 bytes)
References
https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
https://www.hivemq.com/mqtt-essentials/