GStreamer: UDP streaming with tee – only one branch works at a time

Hi,

I’m working on a Raspberry Pi 5 project with a USB camera, using GStreamer and Hailo8l for AI inference. I want to generate two UDP streams simultaneously from the same camera source:

  1. One stream with AI inference and overlay (port 6976)
  2. One raw stream without inference (port 6977)

Here’s the simplified Python GStreamer pipeline I’m using

def get_pipeline_string(self):
    # =========================
    # SOURCE (common)
    # =========================
    source_pipeline = (
        f'appsrc name=app_source is-live=true format=time do-timestamp=true max-buffers=3 leaky-type=downstream ! '
        'videoflip method=horizontal-flip ! '
        'videoconvert ! '
        f'video/x-raw,format={self.video_format},width={self.video_width},height={self.video_height} ! '
        f'{QUEUE(name="source_q", max_size_buffers=500)} ! '
        'tee name=main_tee allow-not-linked=true'
    )

    # =========================
    # INFERENCE BRANCH (6976)
    # =========================
    detection_pipeline = INFERENCE_PIPELINE(
        hef_path=self.hef_path,
        post_process_so=self.post_process_so,
        post_function_name=self.post_function_name,
        batch_size=self.batch_size,
        config_json=self.labels_json,
        additional_params=self.thresholds_str
    )
    detection_pipeline_wrapper = INFERENCE_PIPELINE_WRAPPER(detection_pipeline, name='inference_wrapper')

    ia_branch = (
        'main_tee. ! '
        f'{QUEUE(name="ia_q", max_size_buffers=500)} ! '
        f'{detection_pipeline_wrapper} ! '
        f'{QUEUE(name="identity_q", max_size_buffers=500)} ! '
        'identity name=identity_callback ! '
        f'{QUEUE(name="hailopython_q", max_size_buffers=500)} ! '
        'hailopython module=<path_to_module> function=run qos=false ! '
        f'{QUEUE(name="overlay_q", max_size_buffers=500)} ! '
        'hailooverlay font-thickness=2 line-thickness=4 show-confidence=true ! '
        'videoconvert ! '
        'openh264enc ! h264parse ! mpegtsmux ! rtpmp2tpay ! '
        'udpsink host=127.0.0.1 port=6976 sync=false async=false'
    )

    # =========================
    # RAW BRANCH (6977)
    # =========================
    raw_branch = (
        'main_tee. ! '
        f'{QUEUE(name="raw_q", max_size_buffers=500)} ! '
        'videoconvert ! '
        'video/x-raw,format=I420 ! '
        'rtpvrawpay ! '
        'udpsink host=127.0.0.1 port=6977 sync=false async=false'
    )

    # =========================
    # FULL PIPELINE
    # =========================
    pipeline_string = f'{source_pipeline} {ia_branch} {raw_branch}'

    print("=" * 60)
    print("Active UDP streams:")
    print("  • 6976 → Inference + overlay")
    print("  • 6977 → Raw video")
    print("=" * 60)

    return pipeline_string

Problem:

  • Only one UDP stream works at a time.
  • If I disable the AI branch, the raw branch works.
  • If I disable the raw branch, the AI branch works.
  • CPU usage is fine.
  • I tried multiqueue for the branches and increased max-size-buffers, but the result is the same.
  • No errors are printed.

I suspect the issue may still be related to the camera itself. I already know it doesn’t support being accessed twice simultaneously, but I thought using a tee should allow sharing a single access between the two branches.

Questions:

  1. Has anyone successfully streamed two UDP outputs from the same live camera with one branch doing AI inference?
  2. Is there a better way to share the camera feed so that both branches work simultaneously?
  3. Any recommended settings for queue, multiqueue, or udpsink to make this stable?

Thanks in advance for any advice!

Just to add some clarifications to my previous post :

VideoCamera class used to consume UDP streams

import cv2
import threading

class VideoCamera:
    def __init__(self, source):
        self.video = cv2.VideoCapture(source, cv2.CAP_FFMPEG)
        self.lock = threading.Lock()
        self.latest_frame = None  # Always keeps the last frame
        self.running = True

        # Start capture in a separate thread
        self.thread = threading.Thread(target=self.update_frame, daemon=True)
        self.thread.start()

    def update_frame(self):
        """Continuously capture the latest available frame."""
        while self.running:
            success, frame = self.video.read()
            if success:
                with self.lock:
                    self.latest_frame = frame  # Overwrite previous frame

    def get_frame(self):
        """Returns the latest frame encoded as JPEG"""
        with self.lock:
            if self.latest_frame is None:
                return None
            ret, buffer = cv2.imencode('.jpg', self.latest_frame)
            return buffer.tobytes()

    def flush_buffer(self):
        """Flushes old frames before streaming"""
        for _ in range(10):
            self.video.read()

    def release(self):
        self.running = False
        self.thread.join()
        self.video.release()

Instances declared in the Flask API

from flask import Flask, Response

app = Flask(name, static_url_path=‘/static’, static_folder=‘static’)

video_camera_without_bbox = VideoCamera(
source=“udp://127.0.0.1:6977?fflags=nobuffer&fifo_size=0”
)

video_camera = VideoCamera(
source=“udp://127.0.0.1:6976?fflags=nobuffer&fifo_size=0”
)

Flask routes to serve the streams

@app.route('/video_feed_raw')
def video_feed_raw():
    def generator():
        while True:
            frame = video_camera_without_bbox.get_frame()
            if frame is None:
                continue
            yield (
                b"--frame\r\n"
                b"Content-Type: image/jpeg\r\n\r\n" +
                frame +
                b"\r\n"
            )
    return Response(generator(), mimetype="multipart/x-mixed-replace; boundary=frame")


@app.route('/video_feed')
def video_feed():
    def generator():
        while True:
            frame = video_camera.get_frame()
            if frame is None:
                continue
            yield (
                b"--frame\r\n"
                b"Content-Type: image/jpeg\r\n\r\n" +
                frame +
                b"\r\n"
            )
    return Response(generator(), mimetype="multipart/x-mixed-replace; boundary=frame")