GstInterpipe

From RidgeRun Developer Wiki

Introduction

Overview

GstInterpipe is a Gstreamer plug-in that allows communication between two or more independent pipelines. The plug-in consists of two elements:

  • interpipesink
  • interpipesrc

Generally speaking, the idea is that given a source pipeline:

v4l2src ! interpipesink name=video_srcor

Various independent sink pipelines may listen to it by:

interpipesrc listen-to=video_src ! xvimagesink
interpipesrc listen-to-video_src num-buffers=1 ! jpegenc ! filesink location=snapshot.jpeg 

The state of each pipeline is independent, each one can manage events by their own and sink pipelines can be attached or detached at any moment.

The concept behind the Interpipes project is to simplify the construction of GStreamer applications, which often has the complexity of requiring dynamic pipelines. It transforms the construction process from low level pad probe manipulation to the higher level of setting an element's parameter value. Application developers don't get mired down in stalled pipelines due to one branch of a complex pipeline changing state.

For example, take a look in a complex pipeline:

Figure 1: Complex Single Pipeline

The complex pipeline of figure 1 can be constructed into smaller, independent pipelines using interpipe elements as it is illustrated in the figure 2:

Figure 2: Ten independent simple pipelines using Interpipes

This way the stream flow in a complex pipeline is reduced to simply set the correct listeners in the interpipe elements taking away the complexity of re-configuring pads or some other complex and error-prone logic.

Features and Limitations

The GstInterpipe project exposes the following characteristics:

  • Inspired by intervideosrc/intervideosink
  • Uses appsrc and appsink as a base
  • Generic data handling
  • Multiple dynamic interpipesink - interpipesrc connections
  • Replace tee and selector elements allowing dynamic pipeline flow control with GStreamer Daemon

The principle list of plug-in features include:

  • Buffer transfer
    Interpipesink will transfer automagically the buffers it receives to all the interpipesrcs that are currently listening to it. There is no data copy.
  • Dynamic switching
    Interpipesrc can switch the interpipesink they are listening to at any time just by setting the property to the new value. No need to worry about the pipeline's state, dangerous events like EOS, pad probes, valves, selectors, pad links, etc...
  • Caps negotiation
    Even though interpipes break a big pipeline into smaller ones, the caps negotiation process takes into account all of them. This means that it is guaranteed that the negotiated caps will be supported by the source and all its listeners (or fail due to missing valid intersection).
  • Event forwarding
    Similar to buffers, events may be forwarded from the interpipesinks to the interpipesrcs, and vice versa. The project takes into account downstream and upstream events, as well as in-bounds and out-of-bands events.
  • Timestamp synchronization
    The base times of independent pipelines will likely be different. Given that a buffer will be transferred from one pipe to another, this may represent a problem in situations where synchronization is a must. GstInterpipe takes care of this situation by compensating the buffer's timestamps according to the pipeline's base time, ensuring appropriate synchronization.
  • New node notification
    An interpipesrc may be set to listen to an inexistent node-name. If this is the case, the interpipesrc will be registered to receive a notification when the desired interpipesink is created. At this point, the connection will be made and the buffer flow will start.

The current release exposes the following limitations and known bugs:

  • Specialized clocks
    For the time being, pipelines negotiate their clock independently. If a pipeline uses a special clock (i.e.: GstAudioSinkClock), the associated pipes (being independent) may negotiate different clocks, typically GstSystemClock. This could be a big problem if, for example, synchronization between streams is required. It is currently responsibility of the application to set the special clock in all the involved pipes by calling gst_pipeline_set_clock.

Getting the Software

GstInterpipe is an open source project. RidgeRun is actively pushing the interpipes to the next GStreamer release and the link will be available soon. The project is hosted at Github

https://github.com/RidgeRun/gst-interpipe

Contact support@ridgerun.com with any questions.

As with every open source project, there's an open invitation to provide feedback to the maintainers as bug reports, feature requests and source code collaborations. The following table summarizes the recommended methods to collaborate with RidgeRun:

Collaboration Method
Bug report GitHub's issue tracker
Feature request GitHub's issue tracker
Patch Submission Pull request

Installation Guide

Dependencies

The following packages are needed in order to build and use gst-interpipe:

  • gstreamer-1.0
  • gstreamer-plugins-base-1.0
  • gstreamer-app-1.0
  • gstreamer-check-1.0

These are likely already installed in your OS distribution. In case you want to double check and install the missing packages, run the following commands according to your OS:

Debian Based

This includes Ubuntu, Kubuntu, Debian, etc...

sudo apt-get install libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev

Mac OSX

Using Mac Ports:

sudo port install gstreamer1 gstreamer1-gst-plugins-base

Building the project

In order to build the project run the following commands. Note that the libdir may vary according to your system.

./autogen.sh --libdir /usr/lib/x86_64-linux-gnu/gstreamer-1.0/
make

The autogen.sh script will automatically run the configure script. In case a more complex configuration is needed, the configure step may be executed manually:

./autogen.sh --noconfigure
./configure --libdir /usr/lib/x86_64-linux-gnu/gstreamer-1.0/ <additional advanced options>
make

Finally, the status of the current version may be checked by running the unit tests:

make check

Installing the plugin

The plugin is installed to the GStreamer's default plug-in location in the file system by running:

sudo make install

If you don't want to install the plugin in the default location, it may also be found by setting GST_PLUGIN_PATH to the library location. For example:

GST_PLUGIN_PATH=$HOME/gst-interpipe-1.0/gst/interpipe/.libs gst-inspect-1.0 interpipe

User Guide

The following sections describe in detail the different capabilities of the elements, configurations and examples.

Features

Buffer Forwarding

The main purpose of the GstInterpipe Project is to communicate buffers from one pipeline to another. This can be done in a safe way, without worrying about the states or events of any of the pipelines. To set an interpipesrc (listener) to listen to a specific interpipesink (node), all it is needed is to set the listen-to property. A NULL or empty listen-to will disconnect the listener from any node. The following table summarises the properties involved in buffer transfer.

Property
Element interpipesrc
Name listen-to
Description The name of the node to listen to
Special Values NULL Stop listening
(empty) Stop listening


Property
Element interpipesink
Name name
Description The name of the node. It can only be set during construction.
Special Values


The following figures, and their respective gst-launch pipelines illustrate this concept:

gst-launch-1.0 \
videotestsrc ! interpipesink name=camera \
interpipesrc listen-to="" ! fakesink
Detached pipelines
gst-launch-1.0 \
videotestsrc ! interpipesink name=camera \
interpipesrc listen-to="camera" ! fakesink
Attached pipelines

Dynamic Switching

One of the greatest strengths of GstInterpipe is the ability of an interpipesrc to switch between different interpipesinks at runtime. To do so, change the listen-to property. Similarly, disconnections and reconnections can be performed by clearing and setting the property accordingly. When performing dynamic switching, caps must be taken into consideration. Specifically, a switch can be performed to a node in two scenarios:

No other listeners
Caps are re-negotiated between the two pipelines, looking for the optimum caps.
Existing listeners
The sink pipeline must support the caps already configured by the node and listeners, otherwise it'll fail.

The following figure illustrates this concept:

Dynamic Switch Scenarios

The caps re-negotiation capability can be disabled with the allow-renegotiation property. The following table summarises the involved property:

Property
Element interpipesrc
Name allow-renegotiation
Description Allow the interpipesrc to renegotiate caps when attached to an interpipesink with different caps.
Special Values

The switch can be disabled entirely by setting the block-switch property.

Property
Element interpipesrc
Name block-switch
Description Allow changing the node a listener is currently listening to.
Special Values

Programatically, the dynamic switch can be performed as the following:

/* Create pipelines */
GstElement *pipe1 = gst_parse_launch ("videotestsrc ! interpipesink name=camera1", NULL);
GstElement *pipe2 = gst_parse_launch ("videotestsrc ! interpipesink name=camera2", NULL);
GstElement *pipe3 = gst_parse_launch ("interpipesrc name=src listen-to=camera1 ! fakesink", NULL);

/* Grab a reference to the interpipesrc */
GstElement *src = gst_bin_get_by_name(pipe3, "src");

/* Perform the switch */
g_object_set (src, "listen-to", "camera2", NULL);

Caps Negotiation

GstInterpipe takes into account the node and all its listeners during the caps negotiation process. It is guaranteed that the resulting supported set of caps will be an intersection between all the listeners and node caps. This only applies to the listeners that are connected during the caps negotiation process. If a listener is adhered later, it will be handled independently (see Dynamic Switching).

The following figure shows this concept.

Caps negotiation process

Event Forwarding

As with buffers, events can be transferred from the nodes to the listeners (and viceversa). GStreamer handles three types of events:

  • Upstream Events: events that travel contrary to the buffer flow
  • Downstream Out-Of-Bounds Events: events traveling in the same direction as the buffer flow, but sent immediately.
  • Downstream In-Bounds Events: events traveling in the same direction as the buffer flow, but sent serialised with the buffers.

GstInterpipe supports the three types of events, each one under different considerations. Additionally, the EOS is treated with special care, hence controlled by independent properties. The following sections describe these scenarios

Upstream Events

Upstream events are sent from the listeners to the node. Upstream events are only transferred if the event emitter is the only listener connected to the node. This was designed this way in order to avoid changing the node state to the other listeners. The following image describes this situations:

Upstream events transfer

Downstream Out-Of-Bounds Events

Out-Of-Bounds downstream events are sent from the node to the listeners. These events are sent to all of the listeners as soon as they are received. By setting the forward-events, the node can be configured to avoid sending events. By setting the accept-events, individual listeners can be configured to avoid receiving events. The following tables summarise these properties:

Property
Element interpipesink
Name forward-events
Description Allow the interpipesink to forward any kind of events to the listeners
Special Values


Property
Element interpipesrc
Name accept-events
Description Accept events coming from the node
Special Values

Graphically, the following figure shows different configuration examples.

Downstream out-of-bounds event transfer

Downstream In-Bounds Events

In-bound downstream events travel from the node to all the listeners. The main difference is that they travel inlined with the buffer flow. Internally, both the interpipesink and interpipesrc can queue buffers, so serial events are queued as well so that they are pushed at the precise moment they need to be sent. The same set of properties control the transmission of these events.

Graphically, this can be appreciated as the following image:

Downstream in bounds event transfer

End-Of-Stream Events

EOS events are a special type of in-bounds downstream events. The same queuing principles apply to these. However, this event typically causes drastic pipeline consequences, to it is likely that a pipeline could want to receive all event types, but the EOS. For this reason, the EOS transfer can be controlled by setting forward-eos to disable EOS transfer completely from the node, or accept-eos to selectively block the EOS in individual listeners. The following table summarises the properties.

Graphically, this can be appreciated as the following image:

Downstream in bounds event transfer
Property
Element interpipesink
Name forward-eos
Description Allow the interpipesink to forward EOS events to the listeners
Special Values


Property
Element interpipesrc
Name accept-eos
Description Accept EOS events coming from the node
Special Values

Timestamp Synchronization

The transferred buffers will go through two (or more) different pipelines. Each pipeline will have its own base time and, hence, the buffer timestamp will have different meaning in each one. This can induce problems in pipelines where synchronisation is required. To solve this problem, the buffer timestamp is compensated taking into account the combination of the base times. The result is a equivalent relative buffer time. This feature, however, can be disabled if desired by setting the enable-sync property. The following table shows the property:

Property
Element interpipesrc
Name enable-sync
Description Compensate buffer timestamps in order to achieve equivalent buffer times
Special Values

The following image exemplifies the synchronisation potential problems.

Timestamp synchronization

New Node Notification

A listener can be set to listen an inexistent node. This doesn't necessarily means an error. It is very common that the node could be created later. For this reason, internally the interpipe project uses a notification mechanism to notify the listeners when a new node is connected. If the listener is currently configured to listen to the new node, the connection will be stablished.

Developers Guide

Currently, the API reference of the project is held at the GitHub's project page. Note that this documentation is meant for developers wishing to extend the GstInterpipe's current functionality.

Additional Info

GstInterPipeSink

The gst-inspect-1.0 output for the interpipesink looks like the following:

gst-inspect-1.0 interpipesink
Factory Details:
  Rank                     none (0)
  Long-name                Internal pipeline sink
  Klass                    Generic/Sink
  Description              Sink for internal pipeline buffers communication
  Author                   Michael Grüner <michael.gruner@ridgerun.com>

Plugin Details:
  Name                     interpipe
  Description              Elements to communicate buffers across pipelines
  Filename                 /opt/local/lib/gstreamer-1.0/libgstinterpipe.so
  Version                  1.0.0.1
  License                  Proprietary
  Source module            gst-interpipe
  Binary package           GstInterpipe
  Origin URL               http://www.ridgerun.com

GObject
 +----GInitiallyUnowned
       +----GstObject
             +----GstElement
                   +----GstBaseSink
                         +----GstAppSink
                               +----GstInterPipeSink

Implemented Interfaces:
  GstURIHandler
  GstInterPipeINode

Pad Templates:
  SINK template: 'sink'
    Availability: Always
    Capabilities:
      ANY


Element Flags:
  no flags set

Element Implementation:
  Has change_state() function: gst_base_sink_change_state

Element has no clocking capabilities.

URI handling capabilities:
  Element can act as sink.
  Supported URI protocols:
    appsink

Pads:
  SINK: 'sink'
    Pad Template: 'sink'

Element Properties:
  name                : The name of the object
                        flags: readable, writable
                        String. Default: "interpipesink0"
  parent              : The parent of the object
                        flags: readable, writable
                        Object of type "GstObject"
  sync                : Sync on the clock
                        flags: readable, writable
                        Boolean. Default: false
  max-lateness        : Maximum number of nanoseconds that a buffer can be late before it is dropped (-1 unlimited)
                        flags: readable, writable
                        Integer64. Range: -1 - 9223372036854775807 Default: -1
  qos                 : Generate Quality-of-Service events upstream
                        flags: readable, writable
                        Boolean. Default: false
  async               : Go asynchronously to PAUSED
                        flags: readable, writable
                        Boolean. Default: true
  ts-offset           : Timestamp offset in nanoseconds
                        flags: readable, writable
                        Integer64. Range: -9223372036854775808 - 9223372036854775807 Default: 0
  enable-last-sample  : Enable the last-sample property
                        flags: readable, writable
                        Boolean. Default: true
  last-sample         : The last sample received in the sink
                        flags: readable
                        Boxed pointer of type "GstSample"
  blocksize           : Size in bytes to pull per buffer (0 = default)
                        flags: readable, writable
                        Unsigned Integer. Range: 0 - 4294967295 Default: 4096
  render-delay        : Additional render delay of the sink in nanoseconds
                        flags: readable, writable
                        Unsigned Integer64. Range: 0 - 18446744073709551615 Default: 0
  throttle-time       : The time to keep between rendered buffers (0 = disabled)
                        flags: readable, writable
                        Unsigned Integer64. Range: 0 - 18446744073709551615 Default: 0
  max-bitrate         : The maximum bits per second to render (0 = disabled)
                        flags: readable, writable
                        Unsigned Integer64. Range: 0 - 18446744073709551615 Default: 0
  caps                : The allowed caps for the sink pad
                        flags: readable, writable
                        Caps (NULL)
  eos                 : Check if the sink is EOS or not started
                        flags: readable
                        Boolean. Default: true
  emit-signals        : Emit new-preroll and new-sample signals
                        flags: readable, writable
                        Boolean. Default: false
  max-buffers         : The maximum number of buffers to queue internally (0 = unlimited)
                        flags: readable, writable
                        Unsigned Integer. Range: 0 - 4294967295 Default: 3
  drop                : Drop old buffers when the buffer queue is filled
                        flags: readable, writable
                        Boolean. Default: true
  forward-eos         : Forward the EOS event to all the listeners
                        flags: writable
                        Boolean. Default: false Write only
  forward-events      : Forward downstream events to all the listeners (except for EOS)
                        flags: writable
                        Boolean. Default: false Write only

Element Signals:
  "eos" :  void user_function (GstElement* object,
                               gpointer user_data);
  "new-preroll" :  GstFlowReturn user_function (GstElement* object,
                                                gpointer user_data);
  "new-sample" :  GstFlowReturn user_function (GstElement* object,
                                               gpointer user_data);

Element Actions:
  "pull-preroll" :  GstSample * user_function (GstElement* object);
  "pull-sample" :  GstSample * user_function (GstElement* object);

GstInterPipeSrc

The gst-inspect-1.0 output for the interpipesrc looks like the following:

Factory Details:
  Rank                     none (0)
  Long-name                Inter pipeline source
  Klass                    Generic/Source
  Description              Source for internal pipeline buffers communication
  Author                   Michael Grüner <michael.gruner@ridgerun.com>

Plugin Details:
  Name                     interpipe
  Description              Elements to communicate buffers across pipelines
  Filename                 /opt/local/lib/gstreamer-1.0/libgstinterpipe.so
  Version                  1.0.0.1
  License                  Proprietary
  Source module            gst-interpipe
  Binary package           GstInterpipe
  Origin URL               http://www.ridgerun.com

GObject
 +----GInitiallyUnowned
       +----GstObject
             +----GstElement
                   +----GstBaseSrc
                         +----GstAppSrc
                               +----GstInterPipeSrc

Implemented Interfaces:
  GstURIHandler
  GstInterPipeIListener

Pad Templates:
  SRC template: 'src'
    Availability: Always
    Capabilities:
      ANY


Element Flags:
  no flags set

Element Implementation:
  Has change_state() function: gst_base_src_change_state

Element has no clocking capabilities.

URI handling capabilities:
  Element can act as source.
  Supported URI protocols:
    appsrc

Pads:
  SRC: 'src'
    Pad Template: 'src'

Element Properties:
  name                : The name of the object
                        flags: readable, writable
                        String. Default: "interpipesrc0"
  parent              : The parent of the object
                        flags: readable, writable
                        Object of type "GstObject"
  blocksize           : Size in bytes to read per buffer (-1 = default)
                        flags: readable, writable
                        Unsigned Integer. Range: 0 - 4294967295 Default: 4096
  num-buffers         : Number of buffers to output before sending EOS (-1 = unlimited)
                        flags: readable, writable
                        Integer. Range: -1 - 2147483647 Default: -1
  typefind            : Run typefind before negotiating
                        flags: readable, writable
                        Boolean. Default: false
  do-timestamp        : Apply current stream time to buffers
                        flags: readable, writable
                        Boolean. Default: false
  caps                : The allowed caps for the src pad
                        flags: readable, writable
                        Caps (NULL)
  size                : The size of the data stream in bytes (-1 if unknown)
                        flags: readable, writable
                        Integer64. Range: -1 - 9223372036854775807 Default: -1
  stream-type         : the type of the stream
                        flags: readable, writable
                        Enum "GstAppStreamType" Default: 0, "stream"
                           (0): stream           - GST_APP_STREAM_TYPE_STREAM
                           (1): seekable         - GST_APP_STREAM_TYPE_SEEKABLE
                           (2): random-access    - GST_APP_STREAM_TYPE_RANDOM_ACCESS
  max-bytes           : The maximum number of bytes to queue internally (0 = unlimited)
                        flags: readable, writable
                        Unsigned Integer64. Range: 0 - 18446744073709551615 Default: 200000
  format              : The format of the segment events and seek
                        flags: readable, writable
                        Enum "GstFormat" Default: 2, "bytes"
                           (0): undefined        - GST_FORMAT_UNDEFINED
                           (1): default          - GST_FORMAT_DEFAULT
                           (2): bytes            - GST_FORMAT_BYTES
                           (3): time             - GST_FORMAT_TIME
                           (4): buffers          - GST_FORMAT_BUFFERS
                           (5): percent          - GST_FORMAT_PERCENT
  block               : Block push-buffer when max-bytes are queued
                        flags: readable, writable
                        Boolean. Default: false
  is-live             : Whether to act as a live source
                        flags: readable, writable
                        Boolean. Default: false
  min-latency         : The minimum latency (-1 = default)
                        flags: readable, writable
                        Integer64. Range: -1 - 9223372036854775807 Default: -1
  max-latency         : The maximum latency (-1 = unlimited)
                        flags: readable, writable
                        Integer64. Range: -1 - 9223372036854775807 Default: -1
  emit-signals        : Emit need-data, enough-data and seek-data signals
                        flags: readable, writable
                        Boolean. Default: false
  min-percent         : Emit need-data when queued bytes drops below this percent of max-bytes
                        flags: readable, writable
                        Unsigned Integer. Range: 0 - 100 Default: 0
  current-level-bytes : The number of currently queued bytes
                        flags: readable
                        Unsigned Integer64. Range: 0 - 18446744073709551615 Default: 0
  listen-to           : The name of the node to listen to.
                        flags: readable, writable
                        String. Default: null
  block-switch        : Disable the ability to swich between nodes.
                        flags: writable
                        Boolean. Default: false Write only
  allow-renegotiation : Allow the caps renegotiation with an interpipesink with different caps only if the allow-renegotiation property is set to true
                        flags: writable
                        Boolean. Default: true Write only
  enable-sync         : Perform buffer timestamp compensation to have equivalent relative buffer times in the different pipelines
                        flags: writable
                        Boolean. Default: true Write only
  accept-events       : Accept the events received from the interpipesink
                        flags: writable
                        Boolean. Default: true Write only
  accept-eos-event    : Accept the EOS event received from the interpipesink only if it is set to true
                        flags: writable
                        Boolean. Default: true Write only

Element Signals:
  "need-data" :  void user_function (GstElement* object,
                                     guint arg0,
                                     gpointer user_data);
  "enough-data" :  void user_function (GstElement* object,
                                       gpointer user_data);
  "seek-data" :  gboolean user_function (GstElement* object,
                                         guint64 arg0,
                                         gpointer user_data);

Element Actions:
  "push-buffer" :  GstFlowReturn user_function (GstElement* object,
                                                GstBuffer* arg0);
  "push-sample" :  GstFlowReturn user_function (GstElement* object,
                                                GstSample* arg0);
  "end-of-stream" :  GstFlowReturn user_function (GstElement* object);

Examples

TODO.