RidgeRun Metadata/Use Cases/Use Bounding Boxes

From RidgeRun Developer Wiki

Follow Us On Twitter LinkedIn Email Share this page





NVIDIA partner logo NXP partner logo







Use Case: Video Analysis with Bounding Boxes (SEI)

In computer vision applications such as object detection and video analytics, models often output bounding boxes, class labels, and confidence scores per frame. To transport this metadata without modifying the video frames and without using a parallel data stream, a widely adopted method is to embed the metadata in SEI (Supplemental Enhancement Information) .

Applicable RidgeRun Products SEI

The GstSEIMetadata plugin provides a robust solution for inserting and extracting metadata in H.264/H.265 video streams, enabling the synchronization of additional information—such as bounding boxes—directly with each video frame. Through its seiinject element, bounding boxes can be embedded as metadata using properties, binary data, or GstMeta structures. The seiextract element then retrieves this metadata from the bitstream and reattaches it as GstMeta, making it available for downstream pipeline elements or external applications. Additionally, seimetatimestamp inserts precise timestamps as metadata, helping maintain temporal correlation between detected objects and their corresponding frames. This capability is especially valuable in computer vision and real-time analytics systems, where ensuring that bounding boxes remain tightly synchronized with the video—even through encoding and transmission—is critical.

Code Example

This Python script creates a real-time GStreamer pipeline that generates a test video using videotestsrc, encodes it with x264enc, injects metadata using the seiinject element, and streams it over UDP in RTP H.264 format. Every 500 milliseconds, it builds and updates a JSON payload simulating object detections (a “person” and a “car”) with bounding boxes, provided both in pixel coordinates and normalized format. These detections oscillate slightly to simulate motion.

The JSON is serialized into a binary GByteArray and dynamically assigned to the metadata-binary property of seiinject, embedding the metadata directly into the video stream as SEI (Supplemental Enhancement Information) messages.

The example also includes a receiver pipeline using udpsrc to capture the RTP H.264 stream, and extract the embedded SEI metadata, allowing verification of the transmitted detection values in real time.

Sender



#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# LLC. No part of this program may be photocopied, reproduced or translated
# into another programming language without prior written consent of
# RidgeRun, LLC. The user is free to modify the source code after obtaining
# a software license from RidgeRun. All source code changes must be provided
# back to RidgeRun without any encumbrance.

import sys
import signal
import json
import time
import gi

gi.require_version('RrHelper', '1.0')
gi.require_version('Gst', '1.0')
gi.require_version('GObject', '2.0')
gi.require_version('GLib', '2.0')

from gi.repository import Gst, GLib, RrHelper

# ==========================
# General configuration
# ==========================
WIDTH = 640
HEIGHT = 360
FRAMERATE = 30

# Smooth box movement: oscillates ±5 pixels in X and Y
DELTA = 5
INTERVAL_MS = 500
OFFSETS = list(range(-DELTA, DELTA + 1))  # [-5..+5]

# ==========================
# Pipeline with seiinject
# ==========================

PIPELINE_DESCRIPTION = (
 "videotestsrc is-live=true ! "
    "x264enc tune=zerolatency key-int-max=30 insert-vui=true ! "
    "seiinject name=inject ! "
    "rtph264pay pt=96 config-interval=1 ! "
    "udpsink host=127.0.0.1 port=5000"
)

def clamp(v, lo, hi):
    return max(lo, min(hi, v))

def norm_bbox(x, y, w, h, img_w, img_h):
    # Normalized [0..1]
    return {
        "x": x / img_w,
        "y": y / img_h,
        "w": w / img_w,
        "h": h / img_h,
    }

def build_bboxes_payload(frame_idx: int, offset_idx: int):
    """
    Builds a JSON payload with sample detections.
    Two boxes: 'person' and 'car', moving slightly depending on OFFSETS[offset_idx].
    """
    dx = OFFSETS[offset_idx]
    dy = OFFSETS[-(offset_idx + 1)]  # counter step so they don’t move identically

    # Box 1 (person)
    x1, y1, w1, h1 = 80 + dx, 60 + dy, 120, 200
    x1 = clamp(x1, 0, WIDTH - 1)
    y1 = clamp(y1, 0, HEIGHT - 1)
    w1 = clamp(w1, 1, WIDTH - x1)
    h1 = clamp(h1, 1, HEIGHT - y1)

    # Box 2 (car)
    x2, y2, w2, h2 = 360 - dx, 140 - dy, 180, 120
    x2 = clamp(x2, 0, WIDTH - 1)
    y2 = clamp(y2, 0, HEIGHT - 1)
    w2 = clamp(w2, 1, WIDTH - x2)
    h2 = clamp(h2, 1, HEIGHT - y2)

    ts_ms = int(time.time() * 1000)

    payload = {
        "type": "detections",
        "format": "bbox-json",
        "timestamp_ms": ts_ms,
        "frame": frame_idx,
        "image_size": {"width": WIDTH, "height": HEIGHT},
        "detections": [
            {
                "id": 1,
                "label": "person",
                "score": 0.92,
                "bbox": {"x": x1, "y": y1, "w": w1, "h": h1},               # in pixels
                "bbox_norm": norm_bbox(x1, y1, w1, h1, WIDTH, HEIGHT),       # normalized
            },
            {
                "id": 2,
                "label": "car",
                "score": 0.88,
                "bbox": {"x": x2, "y": y2, "w": w2, "h": h2},
                "bbox_norm": norm_bbox(x2, y2, w2, h2, WIDTH, HEIGHT),
            },
        ],
    }

    return payload

class GstBBoxesDemo:
    def __init__(self):
        self.pipeline = None
        self.seiinject = None
        self.filesink = None
        self.loop = None
        self.offset_idx = 0
        self.frame_idx = 0

    def create_pipeline(self):
        self.loop = GLib.MainLoop()

        try:
            self.pipeline = Gst.parse_launch(PIPELINE_DESCRIPTION)
        except GLib.Error as e:
            print(f"Unable to build pipeline: {e.message}", file=sys.stderr)
            return False

        # Get pipeline elements
        self.seiinject = self.pipeline.get_by_name("inject")
        if not self.seiinject:
            print("Could not get seiinject element", file=sys.stderr)
            return False

        self.filesink = self.pipeline.get_by_name("sink")

        # Configure the bus to handle messages (EOS, ERROR, etc.)
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", self.on_bus_message)

        # Ctrl+C
        def _sigint_handler(*_args):
            print("Interrupted, exiting…")
            self.quit()
        signal.signal(signal.SIGINT, _sigint_handler)

        # Schedule update every 500 ms
        GLib.timeout_add(INTERVAL_MS, self._tick_update_metadata)

        return True

    def _tick_update_metadata(self):
        try:
            # Build JSON with bounding boxes
            payload = build_bboxes_payload(self.frame_idx, self.offset_idx)
            pkt = json.dumps(payload, separators=(",", ":"), ensure_ascii=False).encode("utf-8")

            # Set property as GByteArray via helper
            RrHelper.set_gbytearray_property(self.seiinject, "metadata-binary", pkt)

            # Friendly log
            dets = payload["detections"]
            summary = "; ".join(
                f"{d['label']}#{d['id']} "
                f"[{d['bbox']['x']},{d['bbox']['y']},{d['bbox']['w']},{d['bbox']['h']}]"
                for d in dets
            )
            print(f"[tick] frame={self.frame_idx} detections={len(dets)} {summary}  JSON={pkt[:64]!r}... (len={len(pkt)})")

            # Advance indices
            self.offset_idx = (self.offset_idx + 1) % len(OFFSETS)
            self.frame_idx += int((INTERVAL_MS / 1000.0) * FRAMERATE)

        except Exception as e:
            print(f"Error updating metadata: {e}", file=sys.stderr)

        return True  # keep the timeout active

    def on_bus_message(self, bus, message):
        t = message.type
        if t == Gst.MessageType.EOS:
            print("End of stream")
            self.quit()
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print(f"Error: {err.message}", file=sys.stderr)
            if debug:
                print(f"Debug info: {debug}", file=sys.stderr)
            self.quit()

    def start(self):
        self.pipeline.set_state(Gst.State.PLAYING)
        print("Running… (Bounding Boxes JSON via SEI, update every 500 ms)")
        self.loop.run()

    def quit(self):
        if self.loop is not None and self.loop.is_running():
            self.loop.quit()
        self.stop()
        self.release()

    def stop(self):
        if self.pipeline is not None:
            self.pipeline.set_state(Gst.State.NULL)

    def release(self):
        for attr in ("pipeline", "seiinject", "filesink"):
            obj = getattr(self, attr)
            if obj is not None:
                setattr(self, attr, None)
        self.loop = None

def main():
    Gst.init(None)

    demo = GstBBoxesDemo()
    if not demo.create_pipeline():
        sys.exit(1)

    print("Playing pipeline")
    demo.start()
    print("Returned, stopping playback")

if __name__ == "__main__":
    main()

Receiver

$ GST_DEBUG=*seiextract*:MEMDUMP gst-launch-1.0 udpsrc port=5000 ! "application/x-rtp,media=video,clock-rate=90000,encoding-name=H264" ! rtph264depay ! h264parse ! seiextract ! fakesink

Output

The extracted data is: 
<seiextract0>ESC[00m 00000000: 7b 22 74 79 70 65 22 3a 22 64 65 74 65 63 74 69  {"type":"detecti
<seiextract0>ESC[00m 00000010: 6f 6e 73 22 2c 22 66 6f 72 6d 61 74 22 3a 22 62  ons","format":"b
<seiextract0>ESC[00m 00000020: 62 6f 78 2d 6a 73 6f 6e 22 2c 22 74 69 6d 65 73  box-json","times
<seiextract0>ESC[00m 00000030: 74 61 6d 70 5f 6d 73 22 3a 31 37 35 37 30 32 36  tamp_ms":1757026
<seiextract0>ESC[00m 00000040: 34 35 35 33 35 35 2c 22 66 72 61 6d 65 22 3a 30  455355,"frame":0
<seiextract0>ESC[00m 00000050: 2c 22 69 6d 61 67 65 5f 73 69 7a 65 22 3a 7b 22  ,"image_size":{"
<seiextract0>ESC[00m 00000060: 77 69 64 74 68 22 3a 36 34 30 2c 22 68 65 69 67  width":640,"heig
<seiextract0>ESC[00m 00000070: 68 74 22 3a 33 36 30 7d 2c 22 64 65 74 65 63 74  ht":360},"detect
<seiextract0>ESC[00m 00000080: 69 6f 6e 73 22 3a 5b 7b 22 69 64 22 3a 31 2c 22  ions":[{"id":1,"
<seiextract0>ESC[00m 00000090: 6c 61 62 65 6c 22 3a 22 70 65 72 73 6f 6e 22 2c  label":"person",
<seiextract0>ESC[00m 000000a0: 22 73 63 6f 72 65 22 3a 30 2e 39 32 2c 22 62 62  "score":0.92,"bb
<seiextract0>ESC[00m 000000b0: 6f 78 22 3a 7b 22 78 22 3a 37 35 2c 22 79 22 3a  ox":{"x":75,"y":
<seiextract0>ESC[00m 000000c0: 36 35 2c 22 77 22 3a 31 32 30 2c 22 68 22 3a 32  65,"w":120,"h":2
<seiextract0>ESC[00m 000000d0: 30 30 7d 2c 22 62 62 6f 78 5f 6e 6f 72 6d 22 3a  00},"bbox_norm":
<seiextract0>ESC[00m 000000e0: 7b 22 78 22 3a 30 2e 31 31 37 31 38 37 35 2c 22  {"x":0.1171875,"
<seiextract0>ESC[00m 000000f0: 79 22 3a 30 2e 31 38 30 35 35 35 35 35 35 35 35  y":0.18055555555
<seiextract0>ESC[00m 00000100: 35 35 35 35 35 35 2c 22 77 22 3a 30 2e 31 38 37  555555,"w":0.187
<seiextract0>ESC[00m 00000110: 35 2c 22 68 22 3a 30 2e 35 35 35 35 35 35 35 35  5,"h":0.55555555
<seiextract0>ESC[00m 00000120: 35 35 35 35 35 35 35 36 7d 7d 2c 7b 22 69 64 22  55555556}},{"id"
<seiextract0>ESC[00m 00000130: 3a 32 2c 22 6c 61 62 65 6c 22 3a 22 63 61 72 22  :2,"label":"car"
<seiextract0>ESC[00m 00000140: 2c 22 73 63 6f 72 65 22 3a 30 2e 38 38 2c 22 62  ,"score":0.88,"b
<seiextract0>ESC[00m 00000150: 62 6f 78 22 3a 7b 22 78 22 3a 33 36 35 2c 22 79  box":{"x":365,"y
<seiextract0>ESC[00m 00000160: 22 3a 31 33 35 2c 22 77 22 3a 31 38 30 2c 22 68  ":135,"w":180,"h
<seiextract0>ESC[00m 00000170: 22 3a 31 32 30 7d 2c 22 62 62 6f 78 5f 6e 6f 72  ":120},"bbox_nor
<seiextract0>ESC[00m 00000180: 6d 22 3a 7b 22 78 22 3a 30 2e 35 37 30 33 31 32  m":{"x":0.570312
<seiextract0>ESC[00m 00000190: 35 2c 22 79 22 3a 30 2e 33 37 35 2c 22 77 22 3a  5,"y":0.375,"w":
<seiextract0>ESC[00m 000001a0: 30 2e 32 38 31 32 35 2c 22 68 22 3a 30 2e 33 33  0.28125,"h":0.33
<seiextract0>ESC[00m 000001b0: 33 33 33 33 33 33 33 33 33 33 33 33 33 33 7d 7d  33333333333333}}
<seiextract0>ESC[00m 000001c0: 5d 7d                                            ]}