GstKinesisWebRTC: Getting Started - Python Example Application

From RidgeRun Developer Wiki


Previous: Getting Started/C Example Application Index Next: Contact_Us





This Python example code shows how to use the kinesiswebrtcbin element in master mode. It shows how to create a pipeline to send audio and video and how dynamically connect bins to receive audio and video. The application supports multiple viewer connections and disconnection. This example is part of the plugin source code.

You can enable video receiving using the -v switch.

import argparse

import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GObject

TEST_SEND_PIPELINE =(f"videotestsrc is-live=True ! queue ! "
    f"video/x-raw,width=640,height=480,framerate=30/1 ! "
    f"vp8enc error-resilient=partitions keyframe-max-dist=10 auto-alt-ref=True"
    f" cpu-used=5 deadline=1 ! queue ! bin. "
    f""
    f"audiotestsrc is-live=TRUE ! queue leaky=2 max-size-buffers=400 ! "
    f"audioconvert ! audioresample ! opusenc ! "
    f"audio/x-opus,rate=48000,channels=2 ! queue ! bin. "
    f""
    f"kinesiswebrtcbin channel=test-ridgerun name=bin" )

TEST_RECEIVE_VIDEO_PIPELINE =(f"queue name=video_queue ! vp8dec ! queue ! "
    f"videoconvert ! ximagesink")

TEST_RECEIVE_AUDIO_PIPELINE =(f"queue name=audio_queue ! opusparse ! opusdec "
    f"! queue !  pulsesink")


class KinesisWebRTCTest:


    def __init__(self,
                 enable_video: bool):

        self.pipeline = None
        self.enable_video = enable_video

        self.__create_pipeline()

    # Handler for the pad-added signal
    def __pad_added_handler(self,
                            webrtc: Gst.Element,
                            new_pad: Gst.Pad):

        print(f"Received new pad '{new_pad.get_name()}' from "
              f"'{webrtc.get_name()}'.")

        # Check the new pad's type
        if new_pad.get_name().startswith("audio"):
            print("It is an audio pad.")
            receive_bin = Gst.parse_bin_from_description(
                TEST_RECEIVE_AUDIO_PIPELINE, True)
        else:
            print("It is a video pad.")
            if self.enable_video:
                receive_bin = Gst.parse_bin_from_description(
                    TEST_RECEIVE_VIDEO_PIPELINE, True)
            else:
                print("Video receiving is disabled.")
                return

        bin_name = f"bin_{new_pad.get_name()}"
        receive_bin.set_name(bin_name)

        self.pipeline.add(receive_bin)
        receive_bin.sync_state_with_parent()
        sink_pad = receive_bin.get_static_pad("sink")

        # Attempt the link
        ret = new_pad.link(sink_pad)
        if ret != Gst.PadLinkReturn.OK:
            print("Link failed.")
        else:
            print(
                f"Link {new_pad.get_name()} -> {webrtc.get_name()} succeeded")

    # Handler for the pad-removed signal */
    def __pad_removed_handler(self,
                              webrtc: Gst.Element,
                              new_pad: Gst.Pad):

        print(
            f"Removing pad '{new_pad.get_name()}' from '{webrtc.get_name()}'")

        if new_pad.direction == Gst.PadDirection.SINK:
            return
        bin_name = f"bin_{new_pad.get_name()}"
        receive_bin = self.pipeline.get_by_name(bin_name)

        if receive_bin is not None:
            print(f"Removing null element '{receive_bin.get_name()}'.")

            receive_bin.set_state(Gst.State.NULL)

            self.pipeline.remove(receive_bin)

    # Handler for the peer-connected signal */
    def __peer_connected_handler(self,
                                 webrtc: Gst.Element,
                                 peer_id: str):

        print(f"Peer {peer_id} connected")

    # Handler for the peer-disconnected signal */
    def __peer_disconnected_handler(self,
                                    webrtc: Gst.Element,
                                    peer_id: str):

        print(f"Peer {peer_id} disconnected")

    def __create_pipeline(self):
        # Create pipeline
        self.pipeline = Gst.parse_launch(TEST_SEND_PIPELINE)
        webrtcbin = self.pipeline.get_by_name("bin")

        webrtcbin.connect("pad-added", self.__pad_added_handler)
        webrtcbin.connect("pad-removed", self.__pad_removed_handler)
        webrtcbin.connect("peer-connected", self.__peer_connected_handler)
        webrtcbin.connect("peer-disconnected",
                          self.__peer_disconnected_handler)

    def play_pipeline(self) -> bool:
        ret = self.pipeline.set_state(Gst.State.PLAYING)
        if ret == Gst.StateChangeReturn.FAILURE:
            print("Unable to set the pipeline to the playing state.")
            return False
        return True

    def stop_pipeline(self):
        self.pipeline.set_state(Gst.State.NULL)


def bus_handler(bus: Gst.Bus,
                msg: Gst.Message,
                loop):
    if msg.type == Gst.MessageType.ERROR:
        err, dbg = msg.parse_error()
        print(f"Error from {msg.src.get_name()}: {err.message} ({dbg})")
        loop.quit()


def main(args):
    # Initialize GStreamer
    Gst.init(None)

    kinesiswebrtc_test = KinesisWebRTCTest(args.enable_video)

    # Start playing
    if not kinesiswebrtc_test.play_pipeline():
        return -1

    # Block until CTRL+C is pressed
    main_loop = GObject.MainLoop()

    bus = kinesiswebrtc_test.pipeline.get_bus()
    bus.add_signal_watch()
    bus.enable_sync_message_emission()
    bus.connect("message", bus_handler, main_loop)

    try:
        main_loop.run()
    except BaseException:
        main_loop.quit()

    print("Closing ...")
    kinesiswebrtc_test.stop_pipeline()
    print("Successfully closed\n")

    return 0


if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description="Test app for Kinesis WebRTC bin element"
    )
    parser.add_argument(
        "-v", "--enable-video",
        default=False,
        action="store_true",
        help="Enable video receiving"
    )

    args = parser.parse_args()
    main(args)



Previous: Getting Started/C Example Application Index Next: Contact_Us