RidgeRun Metadata/Use Cases/UAV metadata streaming

Use Case: Surveillance or Reconnaissance Drones
In defense, security, and monitoring applications, drones (UAVs) equipped with cameras are commonly used to capture real-time video. However, the value of this video increases significantly when it is enriched with metadata that provides contextual information such as GPS coordinates, altitude, camera orientation, precise timestamps, and more.
One of the most widely adopted standards for transmitting this type of information is MISB (Motion Imagery Standards Board). MISB defines how to structure and transport metadata relevant to visual intelligence. This metadata can be encapsulated and transmitted alongside the video using the MPEG Transport Stream (MPEG-TS) container, which supports multiplexing of video, audio, and metadata into a single synchronized stream.
Applicable RidgeRun Products
MPEG-TS
By leveraging RidgeRun's MPEG-TS metadata framework, developers can easily integrate synchronized, real-time metadata into video pipelines used in UAV applications. The solution simplifies the injection and extraction process using the metasrc and metasink elements, and supports both asynchronous and synchronous metadata conforming to standards like MISB ST 0601.
This enables a powerful and standards-compliant method of enriching video streams with mission-critical data—ideal for drones engaged in surveillance, reconnaissance, or tactical operations.
Code Example
This script creates a real-time GStreamer pipeline that streams H.264 video and KLV metadata (MISB ST 0601) over RTP/UDP to 127.0.0.1:5000. It generates and updates a KLV packet containing a single tag 7, representing Platform Roll Angle, with values oscillate between -10 and 10 degrees to simulate realistic data. The packet is serialized and dynamically injected into the stream using the metasrc element via the metadata-binary property.
This setup is ideal for simulating a live video feed with embedded, time-synchronized telemetry, making it suitable for testing systems that process MPEG-TS with KLV metadata. The example also includes a receiver pipeline using udpsrc to capture the RTP stream, extract the KLV metadata allowing real-time verification of the injected values.
This project uses LibMISB, a library developed by RidgeRun for encoding and decoding KLV metadata based on MISB standards. It simplifies the integration of real-time telemetry into video streams by handling the serialization process automatically — particularly useful for standards like MISB ST 0601 (e.g., Platform Roll Angle, tag 7).
MISB Standards: LibMISB
Sender
Creates a GStreamer pipeline that generates a test H.264 video stream and periodically injects a single MISB ST0601 tag (Tag 7) via metasrc. The stream is muxed into MPEG‑TS and sent over RTP/UDP to 127.0.0.1:5000
#!/usr/bin/env python3
import sys, signal, gi, random
gi.require_version('Gst', '1.0')
gi.require_version('GObject', '2.0')
gi.require_version('GLib', '2.0')
from gi.repository import Gst, GLib
from rrmedia.native.gsthelper import set_gbytearray_property
# MISB
from libmisb import LibMisb, Metadata, metadata_item, LogLevel
from libmisb.formatter import JsonFormatter
# --- MISB Config ---
ST0601_KEY = "060E2B34020B01010E01030101000000" # ST 0601 (UAS Local Set)
TAG = "7" # Platform Roll Angle
ROLL_MIN = -10.0 # degrees
ROLL_MAX = 10.0 # degrees
INTERVAL_MS = 500 # update every 500 ms
def build_items(lst):
out = []
for entry in lst:
mi = metadata_item()
mi.tag = entry["tag"]
mi.value = entry["value"]
mi.sub_items = []
mi.local_set = True
out.append(mi)
return out
def encode_single_tag(lm, key_hex: str, tag: str, value_str: str) -> bytes:
meta = Metadata()
meta.set_key(key_hex)
meta.set_items(build_items([{"tag": tag, "value": value_str}]))
pkt, status = lm.encode(meta)
if status != 0:
raise RuntimeError(f"MISB encode failed: {status}")
return pkt
# MISB Encoder
lm = LibMisb()
lm.set_formatter(JsonFormatter())
lm.set_log_level(LogLevel.INFO)
PIPELINE_DESCRIPTION = (
"mpegtsmux name=mux ! rtpmp2tpay pt=33 ! udpsink host=127.0.0.1 port=5000 "
"videotestsrc is-live=true pattern=smpte ! "
"video/x-raw,width=640,height=360,framerate=30/1 ! "
"x264enc tune=zerolatency key-int-max=30 bitrate=2000 speed-preset=ultrafast ! "
"h264parse ! mux. "
"metasrc name=meta ! mux."
)
class TsMetaSender:
def __init__(self):
self.pipeline = None
self.meta = None
self.loop = None
self.tick_id = None
self.bus = None
# Start with near-level roll
self.roll = random.uniform(-2, 2)
def create_pipeline(self):
self.loop = GLib.MainLoop()
try:
self.pipeline = Gst.parse_launch(PIPELINE_DESCRIPTION)
except GLib.Error as e:
print(f"Unable to build pipeline: {e.message}", file=sys.stderr)
return False
self.meta = self.pipeline.get_by_name("meta")
if not self.meta:
print("Could not get metasrc element", file=sys.stderr)
return False
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect("message", self.on_bus_message)
signal.signal(signal.SIGINT, lambda *_: self.quit())
self.tick_id = GLib.timeout_add(INTERVAL_MS, self._tick_update_klv)
return True
def _tick_update_klv(self):
try:
# Smooth roll angle simulation within ±10°
delta = random.uniform(-1.5, 1.5) # gentle change per tick
if (self.roll <= ROLL_MIN and delta < 0) or (self.roll >= ROLL_MAX and delta > 0):
delta = -delta # reflect at the edge
self.roll = max(ROLL_MIN, min(ROLL_MAX, self.roll + delta))
# Small noise for realism
noisy_value = max(ROLL_MIN, min(ROLL_MAX, self.roll + random.uniform(-0.2, 0.2)))
pkt = encode_single_tag(lm, ST0601_KEY, TAG, f"{noisy_value:.2f}")
set_gbytearray_property(self.meta, "metadata-binary", pkt)
print(f"[tick] tag{TAG} (Platform Roll Angle) = {noisy_value:.2f}° "
f"KLV={pkt.hex()[:64]}... (len={len(pkt)})")
except Exception as e:
print(f"Update error: {e}", file=sys.stderr)
return True
def on_bus_message(self, _bus, msg):
if msg.type == Gst.MessageType.EOS:
print("End of stream"); self.quit()
elif msg.type == Gst.MessageType.ERROR:
err, dbg = msg.parse_error()
print(f"Error: {err} {dbg or ''}", file=sys.stderr)
self.quit()
def start(self):
self.pipeline.set_state(Gst.State.PLAYING)
print("Running… MPEG-TS + KLV (Platform Roll Angle tag 7) over RTP/UDP :5000")
self.loop.run()
def quit(self):
if self.tick_id is not None:
GLib.source_remove(self.tick_id)
self.tick_id = None
if self.bus:
try:
self.bus.remove_signal_watch()
except Exception:
pass
if self.loop and self.loop.is_running():
self.loop.quit()
if self.pipeline:
self.pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
Gst.init(None)
app = TsMetaSender()
if not app.create_pipeline():
sys.exit(1)
app.start()
Receiver
Open a second terminal and run the receiver. It listens on UDP/5000, depayloads MPEG‑TS, extracts KLV (meta/x-klv), parses it with misbparser and prints it via metasink.
gst-launch-1.0 -e -v udpsrc port=5000 caps="application/x-rtp,media=video,encoding-name=MP2T,clock-rate=90000,payload=33" ! rtpmp2tdepay ! 'video/mpegts, systemstream=(boolean)true, packetsize=(int)188' ! tsdemux name=demux demux. ! queue ! meta/x-klv ! misbparser ! metasink async=false
Output
You should see parsed ST0601 fields including the varying Tag 9 (Platform Indicated Airspeed) and other fields (e.g., Timestamp, Version, Checksum), similar to:
key: 6 E 2B 34 2 B 1 1 E 1 3 1 1 0 0 0 Length: 21 Value: -------- -------- [0] Tag: [2](Timestamp) Length: 8 Value: 54 BE 1F 3F 8D 3E 06 00 -------- -------- [1] Tag: [7](Platform Roll Angle) Length: 2 Value: F7 6D -------- -------- [2] Tag: [65](UAS DataLink LS Version Number) Length: 1 Value: 13 -------- -------- [3] Tag: [1](Checksum) Length: 2 Value: AC 11 00000000 (0x55c4a4812510): 06 0e 2b 34 02 0b 01 01 0e 01 03 01 01 00 00 00 ..+4............ 00000010 (0x55c4a4812520): 15 02 08 54 be 1f 3f 8d 3e 06 00 07 02 f7 6d 41 ...T..?.>.....mA 00000020 (0x55c4a4812530): 01 13 01 02 ac 11