RidgeRun Metadata/Metadata Integration with Messaging Systems/Kafka

From RidgeRun Developer Wiki

Follow Us On Twitter LinkedIn Email Share this page



NVIDIA partner logo NXP partner logo





Illustration of a drone sending SEI metadata to a kafka broker to be consumed by kafka clients

Introduction

Apache Kafka is a distributed event streaming platform used to build real-time data pipelines and streaming applications. It is designed to handle high-throughput, low-latency, and fault-tolerant communication between systems.

Kafka works as a publish/subscribe messaging system, where producers send messages to topics, and consumers subscribe to those topics to receive data in real time. Unlike traditional messaging queues, Kafka is persistent and distributed, making it suitable for building reliable and scalable data infrastructure.

Key Features

Feature Description
High throughput and low latency Designed to handle millions of messages per second with minimal delay, even under heavy loads.
Durability and persistence Messages are written to disk and retained for configurable durations, enabling replay, recovery, and auditability.
Scalability Supports horizontal scaling by partitioning data across multiple brokers and consumer groups.
Fault tolerance Ensures reliability through data replication across brokers, allowing the system to remain operational during node or hardware failures.
Stream processing integration Built-in APIs like Kafka Streams and ksqlDB enable real-time processing, filtering, and transformation of streaming data.


Apache Kafka is particularly useful in systems that demand reliable, scalable, and high-performance data communication. It is ideal for environments with continuous data generation and the need to process, analyze, or store that data in real time. Kafka supports decoupled architectures, allowing producers and consumers to operate independently while sharing a common data stream.

Its durability and ability to replay historical data also make it a powerful tool for auditing, debugging, and machine learning pipelines. Kafka is not only used for logging and telemetry, but also as a central hub for streaming architectures across cloud-native, big data, and enterprise systems.

Common Use Cases

  • Log and telemetry aggregation from multiple services or devices.
  • Streaming analytics pipelines that perform filtering, enrichment, or aggregation of real-time data.
  • Integration of microservices through event-driven communication.
  • Machine learning pipelines, where feature extraction or inference relies on continuous data streams.

Metadata and Kafka

Kafka enables scalable and reliable distribution of metadata across distributed systems. It is particularly well-suited for high-throughput pipelines where structured data—such as telemetry, sensor streams, video annotations, or AI inference results—must be processed, stored, or forwarded in real time.

Use of Metadata in Kafka
Metadata Usage Benefit
Asynchronous decoupling Producers and consumers operate independently, enabling flexible and fault-tolerant communication between systems.
Event sourcing and auditing Kafka persists metadata events (e.g., {"object": "car", "confidence": 0.92}) for later replay, audit, or reprocessing.
Real-time analytics Metadata such as bounding boxes, geolocation, or telemetry can be processed on the fly using Kafka Streams or ksqlDB.
Multi-service distribution Kafka supports fan-out to multiple consumers, allowing metadata to feed different subsystems: dashboards, databases, AI pipelines, etc.
Integration with storage and batch systems Metadata streams can be archived to HDFS, S3, or databases using Kafka Connect for long-term storage or offline processing.

Metadata can be extracted from media streams via protocols such as RTMP, SRT, MPEG-TS, or RTP, and published to Kafka topics for real-time analytics or machine learning pipelines.

RidgeRun supports this workflow with GStreamer elements like seiextract (for SEI metadata) and metasink (for MPEG-TS). This allows edge devices such as drones, smart cameras, or vehicles to capture metadata in real time and push it into scalable Kafka infrastructure.

Examples:

🔹 SEI Metadata: Extracted from H.264/H.265 using seiextract, parsed as JSON, and published to a Kafka topic for downstream analytics.

🔹 KLV Metadata: Encoded in MPEG-TS using MISB standards, extracted via metasink, parsed, and published as structured messages in Kafka.

🔹FLV (RTMP): Metadata injected via flvmux as AMF data can be parsed and pushed to Kafka for real-time dashboards or storage.

Example

Ridgerun products in this demo
This demo uses RidgeRun GstSEI Metadata. For more information, visit the product page.


Info

This examples make use of GStreamer in python so make sure you have the following dependency installed:

pip install pygobject


This example demonstrates the use of metadata over Kafka, simulating a real-world scenario: a drone generates metadata remotely, which is transmitted via UDP and then published to an Kafka broker. Any subscribed client can subsequently consume this metadata.

The full scenario is illustrated, including:

  • The GStreamer pipeline that generates and sends the metadata.
  • The bridge that receives the metadata over UDP and publishes it to the Kafka broker.
  • The client application that subscribes to the broker and consumes the metadata.

Run example

Step 1 – Start the Kafka Broker (Using Docker)

In the first terminal, start the Kafka broker using Docker.

docker run --name redpanda --rm \
  -p 9092:9092 -p 9644:9644 \
  redpandadata/redpanda:latest \
  redpanda start --overprovisioned --smp 1 --memory 1G --reserve-memory 0M \
  --node-id 0 --check=false \
  --kafka-addr PLAINTEXT://0.0.0.0:9092 \
  --advertise-kafka-addr PLAINTEXT://localhost:9092

Step 2 – Inject SEI Metadata and Send via UDP

In a second terminal, run the Python script that injects binary SEI metadata into the video stream using GStreamer and sends the stream over UDP.

Use the following command:

python3 sender_inject_udp.py --host 127.0.0.1 --port 5000 --stream-id cam01

Python script:

#!/usr/bin/env python3
import argparse
import json
import gi
import signal

gi.require_version('Gst','1.0')
gi.require_version('GLib','2.0')

from gi.repository import Gst, GLib

Gst.init(None)

def main():
    ap = argparse.ArgumentParser(description="Send H264 over RTP/UDP with per-buffer SEI JSON")
    ap.add_argument('--host', default='127.0.0.1', help='Receiver host')
    ap.add_argument('--port', type=int, default=5000, help='Receiver UDP port')
    ap.add_argument('--stream-id', default='cam01')
    ap.add_argument('--fps', default='30/1', help='e.g. 30/1')
    # Use -1 to mean infinite.
    ap.add_argument('--num-buffers', type=int, default=-1, help='-1 = infinite (omit property); >=0 sets num-buffers')
    args = ap.parse_args()

    nbuf = "" if args.num_buffers < 0 else f" num-buffers={args.num_buffers}"

    pipeline = Gst.parse_launch(
        f"videotestsrc is-live=true do-timestamp=true {nbuf} ! "
        f"video/x-raw,framerate={args.fps} ! "
        f"x264enc tune=zerolatency key-int-max=30 ! "
        f"h264parse ! video/x-h264,stream-format=byte-stream,alignment=au ! "
        f"seiinject name=inj ! "
        f"rtph264pay config-interval=1 ! "
        f"udpsink host={args.host} port={args.port} sync=false async=false"
    )

    inj = pipeline.get_by_name('inj')
    seq = 0

    def on_inject(_pad, _info):
        nonlocal seq
        buf = _info.get_buffer()
        if buf and buf.pts != Gst.CLOCK_TIME_NONE:
            ts_ns = int(buf.pts)
        else:
            clk = inj.get_clock()
            ts_ns = int(clk.get_time()) if clk else int(GLib.get_monotonic_time() * 1000)

        payload = {
            "stream_id": args.stream_id,
            "seq": seq,
            "ts_ns": ts_ns,
            "objects": [{
                "id": f"obj-{(seq % 9999) + 1}",
                "label": ["person","car","dog","bicycle"][seq % 4],
                "confidence": round(0.5 + (seq % 50)/100.0, 2),
                "bbox": {"x":0.1+(seq%5)*0.1, "y":0.1, "w":0.3, "h":0.3}
            }],
            "source": "seiinject"
        }
        inj.set_property("metadata", json.dumps(payload))
        seq += 1
        return Gst.PadProbeReturn.OK

    inj.get_static_pad("sink").add_probe(Gst.PadProbeType.BUFFER, on_inject)

    # Run
    pipeline.set_state(Gst.State.PLAYING)
    bus = pipeline.get_bus(); bus.add_signal_watch()
    loop = GLib.MainLoop()

    def stop(*_):
        print("[Sender] Shutting down...")
        pipeline.send_event(Gst.Event.new_eos())

    signal.signal(signal.SIGINT, stop)
    signal.signal(signal.SIGTERM, stop)

    def on_msg(_bus, msg):
        if msg.type == Gst.MessageType.ERROR:
            err, dbg = msg.parse_error()
            print("[Sender][ERROR]", err, "\n  debug:", (dbg or "").strip())
            pipeline.set_state(Gst.State.NULL); loop.quit()
        elif msg.type == Gst.MessageType.EOS:
            print("[Sender] EOS")
            pipeline.set_state(Gst.State.NULL); loop.quit()

    bus.connect("message", on_msg)
    print(f"[Sender] RTP/H264 → udp://{args.host}:{args.port} , stream_id={args.stream_id}, num-buffers={'∞' if args.num_buffers<0 else args.num_buffers}")
    loop.run()

if __name__ == '__main__':
    main()

Step 3 – Receive the Stream, Extract Metadata, and Publish to Kafka

In a third terminal, run the script that listens for the incoming video stream, extracts SEI metadata using seiextract, and publishes the metadata to a Kafka topic.


Info

Install confluent kafka before running this script:

pip install --only-binary=:all: "confluent-kafka>=2.4,<3"


Use the following command:

python3 bridge_extract_to_bus_kafka.py  --port 5000 --kafka-bootstrap localhost:9092 --kafka-topic rr.metadata.sei --verbose

Python script:

#!/usr/bin/env python3
import argparse
import json
import gi
import signal

gi.require_version('Gst', '1.0')
gi.require_version('GLib', '2.0')
gi.require_version('GstSei', '1.0')

from gi.repository import Gst, GLib, GstSei
from confluent_kafka import Producer as CKProducer

Gst.init(None)


def bytes_from_gbytes(gb):
    try:
        return bytes(gb)
    except Exception:
        data = gb.get_data()
        mv = data[0] if isinstance(data, tuple) else data
        return bytes(mv)


def main():
    ap = argparse.ArgumentParser(description="Receive RTP/H264, extract SEI JSON, publish to Kafka")
    ap.add_argument('--port', type=int, default=5000, help='UDP listen port')

    # Kafka
    ap.add_argument('--kafka-bootstrap', default='localhost:9092', help='Kafka bootstrap servers')
    ap.add_argument('--kafka-topic', default='rr.metadata.sei', help='Kafka topic to publish to')

    # Optional: more logs from this script
    ap.add_argument('--verbose', action='store_true')
    args = ap.parse_args()

    # Kafka publisher setup
    producer = CKProducer({
        "bootstrap.servers": args.kafka_bootstrap,
        "linger.ms": 5,
        "acks": "all",
    })

    def publish_kafka(d: dict):
        payload = json.dumps(d, separators=(',', ':')).encode()
        producer.produce(
            topic=args.kafka_topic,
            key=str(d.get("seq", 0)).encode(),
            value=payload,
        )
        producer.poll(0)  # serve delivery callbacks

    def close_kafka():
        producer.flush()

    print(f"[Bridge] Kafka -> {args.kafka_bootstrap} topic={args.kafka_topic}")

    # Pipeline: udpsrc → jitterbuffer → depay → parse → caps(AU) → seiextract
    caps = f"application/x-rtp,media=video,encoding-name=H264,clock-rate=90000,payload=96"
    pipe = Gst.parse_launch(
        f"udpsrc port={args.port} caps=\"{caps}\" ! "
        f"rtpjitterbuffer ! rtph264depay ! "
        f"h264parse ! video/x-h264,stream-format=byte-stream,alignment=au ! "
        f"seiextract name=ext ! fakesink"
    )
    ext = pipe.get_by_name('ext')

    # Publish helper
    def publish(d: dict):
        publish_kafka(d)

    # Extract probe
    def on_extract(_pad, info, _ud):
        buf = info.get_buffer()
        if not buf:
            return Gst.PadProbeReturn.OK

        gbytes = GstSei.buffer_get_meta_bytes(buf)
        if not gbytes:
            return Gst.PadProbeReturn.OK

        raw = bytes_from_gbytes(gbytes)
        s = raw.decode('utf-8', errors='ignore').rstrip('\x00').strip()

        # Expect JSON from sender; fallback to pass-through if not JSON
        try:
            obj = json.loads(s)
        except Exception:
            obj = {"raw": s}

        clk = ext.get_clock()
        now_ns = int(clk.get_time()) if clk else 0
        ts_ns = int(obj.get("ts_ns", 0))
        obj["bridge_ts_ns"] = now_ns
        obj["sender_ts_ns"] = ts_ns

        if args.verbose:
            print("[Bridge][EXTRACT]", s[:120] + ("..." if len(s) > 120 else ""))
        publish(obj)
        if args.verbose:
            print(f"[Bridge][KAFKA] sent seq={obj.get('seq')} sender_ts_ns={ts_ns}")
        return Gst.PadProbeReturn.OK

    probe_id = ext.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER, on_extract, None)

    # Run
    pipe.set_state(Gst.State.PLAYING)
    bus = pipe.get_bus()
    bus.add_signal_watch()
    loop = GLib.MainLoop()

    def on_msg(_bus, msg):
        if msg.type in (Gst.MessageType.ERROR, Gst.MessageType.EOS):
            pipe.set_state(Gst.State.NULL)
            close_kafka()
            loop.quit()

    bus.connect("message", on_msg)

    def stop(*_):
        try: 
            print("[Bridge] Shutting down...")
            ext.get_static_pad("src").remove_probe(probe_id)
            pipe.set_state(Gst.State.NULL)
            loop.quit()
        except Exception:
            pass

    signal.signal(signal.SIGINT, stop)
    signal.signal(signal.SIGTERM, stop)

    print(f"[Bridge] listening udp://0.0.0.0:{args.port}")
    loop.run()


if __name__ == '__main__':
    main()

Expected output:

[Bridge] listening udp://0.0.0.0:5000
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 960, "ts_ns": 3600032000000000, "objects": [{"id": "obj-961", "label": "person", "confiden...
[Bridge][KAFKA] sent seq=960 sender_ts_ns=3600032000000000
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 961, "ts_ns": 3600032033333333, "objects": [{"id": "obj-962", "label": "car", "confidence"...
[Bridge][KAFKA] sent seq=961 sender_ts_ns=3600032033333333
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 962, "ts_ns": 3600032066666666, "objects": [{"id": "obj-963", "label": "dog", "confidence"...
[Bridge][KAFKA] sent seq=962 sender_ts_ns=3600032066666666
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 963, "ts_ns": 3600032100000000, "objects": [{"id": "obj-964", "label": "bicycle", "confide...
[Bridge][KAFKA] sent seq=963 sender_ts_ns=3600032100000000
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 964, "ts_ns": 3600032133333333, "objects": [{"id": "obj-965", "label": "person", "confiden...
[Bridge][KAFKA] sent seq=964 sender_ts_ns=3600032133333333
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 965, "ts_ns": 3600032166666666, "objects": [{"id": "obj-966", "label": "car", "confidence"...
[Bridge][KAFKA] sent seq=965 sender_ts_ns=3600032166666666
[Bridge][EXTRACT] {"stream_id": "cam01", "seq": 966, "ts_ns": 3600032200000000, "objects": [{"id": "obj-967", "label": "dog", "confidence"...

Step 4 – Subscribe to the Metadata Topic

In a fourth terminal, run a simple Kafka client script that subscribes to the metadata topic and prints the received SEI metadata to the console.

Use the following command:

python3 kafka_print.py --bootstrap localhost:9092 --topic rr.metadata.sei

Python script:

#!/usr/bin/env python3
import argparse
import json
from confluent_kafka import Consumer, KafkaException

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument('--bootstrap', default='localhost:9092')
    ap.add_argument('--topic', default='rr.metadata.sei')
    ap.add_argument('--group', default='rr.sei.demo')
    args = ap.parse_args()

    conf = {
        "bootstrap.servers": args.bootstrap,
        "group.id": args.group,
        "auto.offset.reset": "earliest",
        "enable.auto.commit": True,
    }
    c = Consumer(conf)
    c.subscribe([args.topic])
    print(f"[Kafka] (confluent) subscribed to {args.topic} @ {args.bootstrap}")
    try:
        while True:
            msg = c.poll(1.0)
            if msg is None: continue
            if msg.error(): raise KafkaException(msg.error())
            try:
                data = json.loads(msg.value().decode('utf-8'))
            except Exception:
                data = {"raw": msg.value().decode('utf-8', errors='ignore')}
            print("[Kafka]", data)
    except KeyboardInterrupt:
        print("\n[Kafka] interrupted, closing…")
    finally:
        c.close()

if __name__ == '__main__':
    main()

Expected output:

[Kafka] (confluent) subscribed to rr.metadata.sei @ localhost:9092
[Kafka] {'stream_id': 'cam01', 'seq': 960, 'ts_ns': 3600032000000000, 'objects': [{'id': 'obj-961', 'label': 'person', 'confidence': 0.6, 'bbox': {'x': 0.1, 'y': 0.1, 'w': 0.3, 'h': 0.3}}], 'source': 'seiinject', 'bridge_ts_ns': 24057313044833, 'sender_ts_ns': 3600032000000000}
[Kafka] {'stream_id': 'cam01', 'seq': 961, 'ts_ns': 3600032033333333, 'objects': [{'id': 'obj-962', 'label': 'car', 'confidence': 0.61, 'bbox': {'x': 0.2, 'y': 0.1, 'w': 0.3, 'h': 0.3}}], 'source': 'seiinject', 'bridge_ts_ns': 24057346084433, 'sender_ts_ns': 3600032033333333}
[Kafka] {'stream_id': 'cam01', 'seq': 962, 'ts_ns': 3600032066666666, 'objects': [{'id': 'obj-963', 'label': 'dog', 'confidence': 0.62, 'bbox': {'x': 0.30000000000000004, 'y': 0.1, 'w': 0.3, 'h': 0.3}}], 'source': 'seiinject', 'bridge_ts_ns': 24057379642545, 'sender_ts_ns': 3600032066666666}

References

https://kafka.apache.org/

https://kafka.apache.org/documentation/

https://www.confluent.io/learn/kafka/

https://ksqldb.io/