From RidgeRun Developer Wiki
import sys
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GObject
import numpy
def message_handler(bus, msg, loop):
"""Handle gracefully the EOS and errors"""
if msg.type in [Gst.MessageType.EOS, Gst.MessageType.ERROR]:
loop.quit()
def on_new_sample(sink, data):
"""Get the KLV data on every buffer the appsink receives"""
sample = sink.emit("pull-sample")
buffer = sample.get_buffer()
size = buffer.get_size()
# Extract the KLV data into an array and do your processing
klv_array = numpy.ndarray(
size, buffer=buffer.extract_dup(0, size), dtype=numpy.uint8)
print("\nMeta: ", end="")
for byte in klv_array:
print(chr(byte), end="")
return Gst.FlowReturn.OK
def main(args):
Gst.init(args)
timeout_seconds = 3
pipeline = Gst.parse_launch(
"udpsrc address=224.1.1.1 port=12345 ! tsdemux name=demux ! queue ! h265parse ! avdec_h265 "
"! queue ! videoconvert ! autovideosink sync=false demux.private_0_0036 "
"! queue ! meta/x-klv ! appsink name=sink emit-signals=true")
sink = pipeline.get_by_name("sink")
sink.connect("new-sample", on_new_sample, sink)
# Init GObject loop to handle Gstreamer Bus Events
loop = GObject.MainLoop()
# Listen to bus messages to handle errors and EOS
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.enable_sync_message_emission()
bus.connect("message", message_handler, loop)
print("Playing...\nPress Ctrl+C to exit\n")
pipeline.set_state(Gst.State.PLAYING)
pipeline.get_state(timeout_seconds * Gst.SECOND)
try:
loop.run()
except BaseException:
loop.quit()
print("\nClosing app...")
pipeline.set_state(Gst.State.NULL)
pipeline.get_state(timeout_seconds * Gst.SECOND)
if __name__ == "__main__":
sys.exit(main(sys.argv))