Template:NVIDIA Advanced GStreamer Pipelines with Gstd-Template1

From RidgeRun Developer Wiki
import sys

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GObject
import numpy


def message_handler(bus, msg, loop):
    """Handle gracefully the EOS and errors"""
    if msg.type in [Gst.MessageType.EOS, Gst.MessageType.ERROR]:
        loop.quit()


def on_new_sample(sink, data):
    """Get the KLV data on every buffer the appsink receives"""
    sample = sink.emit("pull-sample")
    buffer = sample.get_buffer()
    size = buffer.get_size()

    # Extract the KLV data into an array and do your processing
    klv_array = numpy.ndarray(
        size, buffer=buffer.extract_dup(0, size), dtype=numpy.uint8)

    print("\nMeta: ", end="")
    for byte in klv_array:
        print(chr(byte), end="")

    return Gst.FlowReturn.OK


def main(args):
    Gst.init(args)

    timeout_seconds = 3

    pipeline = Gst.parse_launch(
        "udpsrc address=224.1.1.1 port=12345 ! tsdemux name=demux ! queue ! h265parse ! avdec_h265 "
        "! queue ! videoconvert ! autovideosink sync=false demux.private_0_0036 "
        "! queue ! meta/x-klv ! appsink name=sink emit-signals=true")

    sink = pipeline.get_by_name("sink")
    sink.connect("new-sample", on_new_sample, sink)

    # Init GObject loop to handle Gstreamer Bus Events
    loop = GObject.MainLoop()

    # Listen to bus messages to handle errors and EOS

    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.enable_sync_message_emission()
    bus.connect("message", message_handler, loop)

    print("Playing...\nPress Ctrl+C to exit\n")

    pipeline.set_state(Gst.State.PLAYING)
    pipeline.get_state(timeout_seconds * Gst.SECOND)

    try:
        loop.run()
    except BaseException:
        loop.quit()

    print("\nClosing app...")
    pipeline.set_state(Gst.State.NULL)
    pipeline.get_state(timeout_seconds * Gst.SECOND)


if __name__ == "__main__":
    sys.exit(main(sys.argv))