I would like to create a pipeline to display (and later perform inference on) a live webcam stream from YouTube. This would be very useful for evaluating the future performance of what I’m working on. I spent three days working on the inference pipeline with YOLOv8, only to discover that the problem lies in how GStreamer handles live streaming.
I always get intermittent and choppy video, far from smooth. How should I modify the pipeline to achieve the much-desired smoothness?
One of the videos I’d like to use is this: https://www.youtube.com/watch?v=6dp-bvQ7RWo
‘’’ import gi
import time
from queue import Queue
gi.require_version(‘Gst’, ‘1.0’)
from gi.repository import Gst, GLib
class FPSCounter:
“”“Calculates FPS using a moving average over a time window.”“”
def init(self, window_size=30):
self.window_size = window_size
self.timestamps =
self.last_update = time.time()
def update(self):
“”“Updates the FPS counter with a new frame.”“”
current_time = time.time()
self.timestamps.append(current_time)
# Keeps only timestamps within the time window
while self.timestamps and current_time - self.timestamps[0] > self.window_size:
self.timestamps.pop(0)
def get_fps(self):
“”“Calculates the current FPS based on the time window.”“”
if not self.timestamps:
return 0
time_diff = self.timestamps[-1] - self.timestamps[0]
if time_diff == 0:
return 0
return len(self.timestamps) / time_diff
class GStreamerPipeline:
“”“Manages the GStreamer pipeline for video streaming.”“”
def init(self, fps_queue=None, hef_path=None):
Gst.init(None)
self.pipeline = None
self.fps_queue = fps_queue
self.fps_counter = FPSCounter()
self.bus = None
self.fps_update_running = False
def build_pipeline(self, video_url, scale=50, threshold=0.4):
“”"
Builds a pipeline for video playback with HLS support.
“”"
try:
pipeline_str = f’‘’
playbin uri=“{video_url}” flags=0x00000057
video-sink=“queue max-size-buffers=4096 max-size-bytes=0 max-size-time=0 !
videoconvert n-threads=4 !
videoscale method=nearest-neighbour !
fpsdisplaysink video-sink=autovideosink sync=false text-overlay=true”
‘’’
print(f"Creating pipeline: {pipeline_str}")
self.pipeline = Gst.parse_launch(pipeline_str.strip())
# Configures buffering and streaming
self.pipeline.set_property('buffer-size', 10485760) # 10MB
self.pipeline.set_property('buffer-duration', 5000000000) # 5 seconds
self.pipeline.set_property('connection-speed', 2000000) # 2Mbps
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect('message::error', self._on_error)
self.bus.connect('message::warning', self._on_warning)
self.bus.connect('message::state-changed', self._on_state_changed)
self.bus.connect('message::buffering', self._on_buffering)
print("Pipeline successfully created")
return True
except GLib.Error as error:
print(f"Error creating the pipeline: {error}")
return False
def _on_buffering(self, bus, message):
“”“Handles buffering events.”“”
percent = message.parse_buffering()
print(f"Buffering… {percent}%")
def _update_fps(self):
“”“Periodically updates the FPS queue.”“”
if self.fps_queue and not self.fps_queue.full():
self.fps_counter.update()
fps = self.fps_counter.get_fps()
try:
self.fps_queue.put_nowait(fps)
except:
pass
return self.fps_update_running
def _on_error(self, bus, message):
“”“Handles pipeline errors.”“”
err, debug = message.parse_error()
print(f"Pipeline error: {err.message}“)
print(f"Debug info: {debug}”)
def _on_warning(self, bus, message):
“”“Handles pipeline warnings.”“”
warn, debug = message.parse_warning()
print(f"Pipeline warning: {warn.message}“)
print(f"Debug info: {debug}”)
def _on_state_changed(self, bus, message):
“”“Monitors pipeline state changes.”“”
if message.src == self.pipeline:
old_state, new_state, pending_state = message.parse_state_changed()
state_name = lambda state: state.value_nick
print(f"Pipeline state changed: {state_name(old_state)} → {state_name(new_state)}")
def start(self):
“”“Starts the pipeline and FPS monitoring.”“”
if not self.pipeline:
return False
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Unable to start the pipeline")
return False
self.fps_update_running = True
GLib.timeout_add(100, self._update_fps) # Update FPS every 100ms
return True
def stop(self):
“”“Stops the pipeline and FPS monitoring.”“”
self.fps_update_running = False
if self.pipeline:
self.pipeline.set_state(Gst.State.NULL)
‘’’