Quick tip - Gstreamer Queues

Queues are very important in Gstreamer and are used very often.
Queues are used to separate the pipeline into different threads.
See gstreamer documentation regarding when-would-you-want-to-force-a-thread for more details.
In order to keep latency down you want to limit the queue sizes.
You should use the shortest queue possible on most use cases.
The logic behind it is this: If you have long queues in your pipeline, assuming some limiting bottleneck in the pipeline. All queues up to this bottleneck will be full. A new frame entering the pipeline will have to “stand in line”. Lets take for example a pipeline which is limited to 30 fps (not by the source, by a post process for example). Adding 3 queues of 30 entries each will force latency of 3 seconds just for waiting in queues!
Don’t use 1 entry queues though… you need at least a double buffer to allow both sides of the queue to work in parallel, each side on a separate buffer.
Using queue size of 3 is a good starting point.

Queues configurations are abundant and repetitive (more about queues).
Using a macro or a function can make you code much more readable and configurable.

CPP example: (simple)

const std::string QUEUE = "queue leaky=no max-size-bytes=0 max-size-time=0 ";

Usage example:

// Create the pipeline string
    pipeline_string += "hailoroundrobin name=roundroubin  ! ";
    pipeline_string += QUEUE + " name=roundrobin_q max-size-buffers=3 ! ";
    pipeline_string += "identity name=roundrobin_probe sync=true ! ";
    pipeline_string += QUEUE + " name=buf_q max-size-buffers=30 ! ";
    pipeline_string += "videoconvert name=preproc_convert qos=false ! ";
    pipeline_string += QUEUE + " name=convert_q max-size-buffers=3 ! ";
    pipeline_string += "identity name=fps_probe_inference sync=true ! ";
    pipeline_string += "videoscale ! ";
    

A python example: (more complex)

def QUEUE(name=None, buffer_size=3, name_suffix=""):
    q_str = f'queue leaky=no max-size-buffers={buffer_size} max-size-bytes=0 max-size-time=0 silent=true '
    if name is not None:
        q_str += f'name={name}{name_suffix} '
    return q_str

Usage example:

CLIP_CROPPER_PIPELINE = f'hailocropper so-path={DEFAULT_CROP_SO} function-name={crop_function_name} \
    use-letterbox=true internal-offset=true name=cropper \
    hailoaggregator name=agg \
    cropper. ! {QUEUE(buffer_size=20, name="clip_bypass_q")} ! agg.sink_0 \
    cropper. ! {CLIP_PIPELINE} ! agg.sink_1 \
    agg. ! {QUEUE()} '

If you found this (or other posts) useful, please hit the :heart: button to help promote it to more people. Have insights, corrections, or questions? Share your thoughts in the comments!

3 Likes

Thank you for the above tip.
I’ll just add that in case the queue is full it’ll block the pipeline.
although possible but not recommend is the use of the queue leaky parameter.

The leaky property will allow you to leak (drop) new or old buffers to avoid the queue to block when full.
possible ‘leaky’ values are:
no – Not Leaky
upstream – Leaky on upstream (new buffers will be dropped)
downstream – Leaky on downstream (old buffers will be dropped)

Using leaky queues should be done “as a general statement” only in the source pipeline (If you have a special case requiring dropping frames make sure you know what you are doing).
From a performance point of view you don’t want to drop frames that you are already “invested” in. In most cases once you started working on a frame its better to finish its processing than throwing it away and start from scratch.

IMPORTANT:
There are cases where dropping frames will cause errors and unpredictable behavior.
As a rule of thumb, anywhere your pipeline is split and than merged again should not drop frames. This may cause the merged frames to be misaligned, for example metadata of a specific frame is attached to a video of a different frame.
On specific cases like cropper-aggregator or tiling throwing frames may cause fatal error because the plugins expects a specific number of frames to be sent on each branch, messing around with that will cause issues.