GstKinesisWebRTC: Getting Started - Python Example Application
GstKinesisWebRTC Amazon Kinesis WebRTC GStreamer Plugin |
---|
Context Overview |
GstKinesisWebrtc Description |
Evaluating GstKinesisWebRTC |
Getting the code |
Building GstKinesisWebRTC |
Getting Started |
Contact Us |
This Python example code shows how to use the kinesiswebrtcbin element in master mode. It shows how to create a pipeline to send audio and video and how dynamically connect bins to receive audio and video. The application supports multiple viewer connections and disconnection. This example is part of the plugin source code.
You can enable video receiving using the -v switch.
import argparse import gi gi.require_version("Gst", "1.0") from gi.repository import Gst, GObject TEST_SEND_PIPELINE =(f"videotestsrc is-live=True ! queue ! " f"video/x-raw,width=640,height=480,framerate=30/1 ! " f"vp8enc error-resilient=partitions keyframe-max-dist=10 auto-alt-ref=True" f" cpu-used=5 deadline=1 ! queue ! bin. " f"" f"audiotestsrc is-live=TRUE ! queue leaky=2 max-size-buffers=400 ! " f"audioconvert ! audioresample ! opusenc ! " f"audio/x-opus,rate=48000,channels=2 ! queue ! bin. " f"" f"kinesiswebrtcbin channel=test-ridgerun name=bin" ) TEST_RECEIVE_VIDEO_PIPELINE =(f"queue name=video_queue ! vp8dec ! queue ! " f"videoconvert ! ximagesink") TEST_RECEIVE_AUDIO_PIPELINE =(f"queue name=audio_queue ! opusparse ! opusdec " f"! queue ! pulsesink") class KinesisWebRTCTest: def __init__(self, enable_video: bool): self.pipeline = None self.enable_video = enable_video self.__create_pipeline() # Handler for the pad-added signal def __pad_added_handler(self, webrtc: Gst.Element, new_pad: Gst.Pad): print(f"Received new pad '{new_pad.get_name()}' from " f"'{webrtc.get_name()}'.") # Check the new pad's type if new_pad.get_name().startswith("audio"): print("It is an audio pad.") receive_bin = Gst.parse_bin_from_description( TEST_RECEIVE_AUDIO_PIPELINE, True) else: print("It is a video pad.") if self.enable_video: receive_bin = Gst.parse_bin_from_description( TEST_RECEIVE_VIDEO_PIPELINE, True) else: print("Video receiving is disabled.") return bin_name = f"bin_{new_pad.get_name()}" receive_bin.set_name(bin_name) self.pipeline.add(receive_bin) receive_bin.sync_state_with_parent() sink_pad = receive_bin.get_static_pad("sink") # Attempt the link ret = new_pad.link(sink_pad) if ret != Gst.PadLinkReturn.OK: print("Link failed.") else: print( f"Link {new_pad.get_name()} -> {webrtc.get_name()} succeeded") # Handler for the pad-removed signal */ def __pad_removed_handler(self, webrtc: Gst.Element, new_pad: Gst.Pad): print( f"Removing pad '{new_pad.get_name()}' from '{webrtc.get_name()}'") if new_pad.direction == Gst.PadDirection.SINK: return bin_name = f"bin_{new_pad.get_name()}" receive_bin = self.pipeline.get_by_name(bin_name) if receive_bin is not None: print(f"Removing null element '{receive_bin.get_name()}'.") receive_bin.set_state(Gst.State.NULL) self.pipeline.remove(receive_bin) # Handler for the peer-connected signal */ def __peer_connected_handler(self, webrtc: Gst.Element, peer_id: str): print(f"Peer {peer_id} connected") # Handler for the peer-disconnected signal */ def __peer_disconnected_handler(self, webrtc: Gst.Element, peer_id: str): print(f"Peer {peer_id} disconnected") def __create_pipeline(self): # Create pipeline self.pipeline = Gst.parse_launch(TEST_SEND_PIPELINE) webrtcbin = self.pipeline.get_by_name("bin") webrtcbin.connect("pad-added", self.__pad_added_handler) webrtcbin.connect("pad-removed", self.__pad_removed_handler) webrtcbin.connect("peer-connected", self.__peer_connected_handler) webrtcbin.connect("peer-disconnected", self.__peer_disconnected_handler) def play_pipeline(self) -> bool: ret = self.pipeline.set_state(Gst.State.PLAYING) if ret == Gst.StateChangeReturn.FAILURE: print("Unable to set the pipeline to the playing state.") return False return True def stop_pipeline(self): self.pipeline.set_state(Gst.State.NULL) def bus_handler(bus: Gst.Bus, msg: Gst.Message, loop): if msg.type == Gst.MessageType.ERROR: err, dbg = msg.parse_error() print(f"Error from {msg.src.get_name()}: {err.message} ({dbg})") loop.quit() def main(args): # Initialize GStreamer Gst.init(None) kinesiswebrtc_test = KinesisWebRTCTest(args.enable_video) # Start playing if not kinesiswebrtc_test.play_pipeline(): return -1 # Block until CTRL+C is pressed main_loop = GObject.MainLoop() bus = kinesiswebrtc_test.pipeline.get_bus() bus.add_signal_watch() bus.enable_sync_message_emission() bus.connect("message", bus_handler, main_loop) try: main_loop.run() except BaseException: main_loop.quit() print("Closing ...") kinesiswebrtc_test.stop_pipeline() print("Successfully closed\n") return 0 if __name__ == '__main__': parser = argparse.ArgumentParser( description="Test app for Kinesis WebRTC bin element" ) parser.add_argument( "-v", "--enable-video", default=False, action="store_true", help="Enable video receiving" ) args = parser.parse_args() main(args)