gst_parse_error: could not link queue_hailonet to videoconvert1 (3)

Hello,

I think that it’s getting more difficult since I tried to resolve this error.

  1. This is my custom code. It’s almost the with example code. but I connected my custom hef. Then, I can see just inference through live streaming from a USB camera.
    There is no problem.

  2. I want to edit its main window. Below is its code.

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import json
import hailo
# import supervision as sv
import sys

# Try to import hailo python module
try:
    import hailo
except ImportError:
    exit("Failed to import hailo python module. Make sure you are in hailo virtual environment.")

# ---------------------------------------------------------
# Functions used to get numpy arrays from GStreamer buffers
# ---------------------------------------------------------
def handle_rgb(map_info, width, height):
    return np.ndarray(shape=(height, width, 3), dtype=np.uint8, buffer=map_info.data).copy()

def handle_nv12(map_info, width, height):
    y_plane_size = width * height
    uv_plane_size = width * height // 2
    y_plane = np.ndarray(shape=(height, width), dtype=np.uint8, buffer=map_info.data[:y_plane_size]).copy()
    uv_plane = np.ndarray(shape=(height // 2, width // 2, 2), dtype=np.uint8,
                          buffer=map_info.data[y_plane_size:]).copy()
    return y_plane, uv_plane

def handle_yuyv(map_info, width, height):
    return np.ndarray(shape=(height, width, 2), dtype=np.uint8, buffer=map_info.data).copy()

FORMAT_HANDLERS = {
    'RGB': handle_rgb,
    'NV12': handle_nv12,
    'YUYV': handle_yuyv,
}

def get_numpy_from_buffer(buffer, format, width, height):
    success, map_info = buffer.map(Gst.MapFlags.READ)
    if not success:
        raise ValueError("Buffer mapping failed")

    try:
        # format='YUYV'
        handler = FORMAT_HANDLERS.get(format)
        if handler is None:
            raise ValueError(f"Unsupported format: {format}")
        return handler(map_info, width, height)
    finally:
        buffer.unmap(map_info)

# ---------------------------------------------------------
# Useful functions for working with GStreamer
# ---------------------------------------------------------
def disable_qos(pipeline):
    if not isinstance(pipeline, Gst.Pipeline):
        print("The provided object is not a GStreamer Pipeline")
        return

    it = pipeline.iterate_elements()
    while True:
        result, element = it.next()
        if result != Gst.IteratorResult.OK:
            break

        if 'qos' in GObject.list_properties(element):
            element.set_property('qos', False)
            print(f"Set qos to False for {element.get_name()}")

# -----------------------------------------------------------------------------------------------
# User defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
class lss_callback_class:
    def __init__(self):
        self.frame_count = 0
        self.use_frame = False
        self.frame_queue = multiprocessing.Queue(maxsize=3)
        self.running = True

    def increment(self):
        self.frame_count += 1

    def get_count(self):
        return self.frame_count

    def set_frame(self, frame):
        if not self.frame_queue.full():
            self.frame_queue.put(frame)

    def get_frame(self):
        if not self.frame_queue.empty():
            return self.frame_queue.get()
        else:
            return None

# -----------------------------------------------------------------------------------------------
# Common functions
# -----------------------------------------------------------------------------------------------
def get_caps_from_pad(pad: Gst.Pad):
    caps = pad.get_current_caps()
    if caps:
        structure = caps.get_structure(0)
        if structure:
            format = structure.get_value('format')
            width = structure.get_value('width')
            height = structure.get_value('height')
            return format, width, height
    return None, None, None

def display_user_data_frame(lss_data: lss_callback_class):
    while lss_data.running:
        frame = lss_data.get_frame()
        if frame is not None:
            cv2.imshow("User Frame", frame)
        cv2.waitKey(1)
    cv2.destroyAllWindows()

def display_lss_data_frame(lss_data: lss_callback_class):
    while lss_data.running:
        frame = lss_data.get_frame()
        if frame is not None:
            # Add custom overlays
            cv2.putText(frame, "Custom Overlay Text", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
            cv2.circle(frame, (frame.shape[1] // 2, frame.shape[0] // 2), 50, (0, 255, 0), 3)
            cv2.imshow("User Frame", frame)
            cv2.waitKey(1)
    cv2.destroyAllWindows()

def get_default_parser():
    parser = argparse.ArgumentParser(description="Hailo App Help")
    parser.add_argument("--input", "-i", type=str, default="/dev/video0", help="Input source. Can be a file, USB or RPi camera (CSI camera module). \
                        For RPi camera use '-i rpi' (Still in Beta). \
                        Defaults to /dev/video0")
    parser.add_argument("--use-frame", "-u", action="store_true", help="Use frame from the callback function")
    parser.add_argument("--show-fps", "-f", action="store_true", help="Print FPS on sink")
    parser.add_argument("--disable-sync", action="store_true", help="Disables display sink sync, will run as fast possible. Relevant when using file source.")
    parser.add_argument("--dump-dot", action="store_true", help="Dump the pipeline graph to a dot file pipeline.dot")
    return parser

def QUEUE(name, max_size_buffers=3, max_size_bytes=0, max_size_time=0):
    return f"queue name={name} max-size-buffers={max_size_buffers} max-size-bytes={max_size_bytes} max-size-time={max_size_time} ! "

def get_source_type(input_source):
    if input_source.startswith("/dev/video"):
        return 'usb'
    else:
        if input_source.startswith("rpi"):
            return 'rpi'
        else:
            return 'file'

def USER_CALLBACK_PIPELINE(name='identity_callback'):
    return f'{QUEUE(name=f"{name}_q")} ! identity name={name}'

# -----------------------------------------------------------------------------------------------
# GStreamerApp class
# -----------------------------------------------------------------------------------------------
class GStreamerApp:
    def __init__(self, args, lss_data: lss_callback_class):
        # def __init__(self, args, process_frame_callback: lss_callback_class):
        # Set the process title
        setproctitle.setproctitle("Hailo Python App")

        # Create an empty options menu
        self.options_menu = args

        # Initialize variables
        tappas_postprocess_dir = os.environ.get('TAPPAS_POST_PROC_DIR', '')
        if tappas_postprocess_dir == '':
            print("TAPPAS_POST_PROC_DIR environment variable is not set. Please set it to by sourcing setup_env.sh")
            exit(1)
        self.current_path = os.path.dirname(os.path.abspath(__file__))
        self.postprocess_dir = tappas_postprocess_dir
        self.video_source = self.options_menu.input
        self.source_type = get_source_type(self.video_source)
        # self.user_data = user_data
        self.lss_data = lss_data
        self.video_sink = "xvimagesink"
        # self.video_sink = "autovideosink"
        # self.video_sink = "fakesink"

        # Set Hailo parameters these parameters shuold be set based on the model used
        self.batch_size = 1

        # self.network_width = 1280
        # self.network_height = 720

        self.network_width = 640
        self.network_height = 320

        self.network_format = "RGB"
        # self.network_format = "YUYV"

        self.default_postprocess_so = None
        self.hef_path = None
        self.app_callback = None
        self.lss_callback = lss_callback_class

        # Set user data parameters
        lss_data.use_frame = self.options_menu.use_frame

        if (self.options_menu.disable_sync or self.source_type != "file"):
            self.sync = "false"
        else:
            self.sync = "true"

        if (self.options_menu.dump_dot):
            os.environ["GST_DEBUG_DUMP_DOT_DIR"] = self.current_path

    def on_fps_measurement(self, sink, fps, droprate, avgfps):
        print(f"FPS: {fps:.2f}, Droprate: {droprate:.2f}, Avg FPS: {avgfps:.2f}")
        return True

    def create_pipeline(self):
        # Initialize GStreamer
        Gst.init(None)

        pipeline_string = self.get_pipeline_string()
        try:
            self.pipeline = Gst.parse_launch(pipeline_string)

            identity = self.pipeline.get_by_name("identity_callback")
            if identity:
                identity_pad = identity.get_static_pad("src")
                identity_pad.add_probe(Gst.PadProbeType.BUFFER, self.lss_callback, self.lss_data)

        except Exception as e:
            print(e)
            print(pipeline_string)
            exit(1)

        # connect to hailo_display fps-measurements
        if (self.options_menu.show_fps):
            print("Showing FPS")
            self.pipeline.get_by_name("hailo_display").connect("fps-measurements", self.on_fps_measurement)

        # Create a GLib Main Loop
        self.loop = GLib.MainLoop()

    # internal builted and let them know when there is a event.
    def bus_call(self, bus, message, loop):
        t = message.type
        if t == Gst.MessageType.EOS:
            print("End-of-stream")
            loop.quit()
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print(f"Error: {err}, {debug}")
            loop.quit()
        # QOS
        elif t == Gst.MessageType.QOS:
            # Handle QoS message here
            qos_element = message.src.get_name()
            print(f"QoS message received from {qos_element}")
        return True

    def get_pipeline_string(self):

        return ""

    def dump_dot_file(self):
        print("Dumping dot file...")
        Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
        return False

    def run(self):

        # Add a watch for messages on the pipeline's bus
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", self.bus_call, self.loop)

        # get xvimagesink element and disable qos
        # xvimagesink is instantiated by fpsdisplaysink
        hailo_display = self.pipeline.get_by_name("hailo_display")
        if hailo_display is None:
            print(
                "Warning: hailo_display element not found, add <fpsdisplaysink name=hailo_display> to your pipeline to support fps display.")
        else:
            xvimagesink = hailo_display.get_by_name("xvimagesink0")
            if xvimagesink is not None:
                xvimagesink.set_property("qos", False)

        # Disable QoS to prevent frame drops
        disable_qos(self.pipeline)

        # start a sub process to run the display_user_data_frame function
        if (self.options_menu.use_frame):
            # display_process = multiprocessing.Process(target=display_user_data_frame, args=(self.user_data,))
            # display_process = multiprocessing.Process(target=display_user_data_frame, args=(self.lss_data,))
            display_process = multiprocessing.Process(target=display_lss_data_frame, args=(self.lss_data,))
            display_process.start()

        # Set pipeline to PLAYING state
        self.pipeline.set_state(Gst.State.PLAYING)

        # dump dot file
        if (self.options_menu.dump_dot):
            GLib.timeout_add_seconds(3, self.dump_dot_file)

        # Run the GLib event loop and wait event in the loop.
        try:
            self.loop.run()
        except:
            pass

        # Clean up
        self.lss_data.running = False
        self.pipeline.set_state(Gst.State.NULL)
        if (self.options_menu.use_frame):
            display_process.terminate()
            display_process.join()

# -----------------------------------------------------------------------------------------------
# User defined callback function
# -----------------------------------------------------------------------------------------------
def lss_callback(pad, info, lss_data):
    print(f"lss_data id: {id(lss_data)}")
    print(f"lss_data.use_frame: {lss_data.use_frame}")
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK
    format, width, height = get_caps_from_pad(pad)
    print(f"width: {width}")
    print(f"height: {height}")

    # # from omria in hailo moderator
    # if not format or not width or not height:
    #     print("Error: Format, width, or height not retrieved from pad caps.")

    if not format or not width or not height:
        print("Error: Format, width, or height not retrieved from pad caps.", file=sys.stderr)  # Print to stderr
        return Gst.PadProbeReturn.DROP  # Drop the buffer


    string_to_print = f"Frame count: {lss_data.get_count()}\n"

    frame = None  # Initialize frame to None



    if lss_data.use_frame and format and width and height:
        frame = get_numpy_from_buffer(buffer, format, width, height)

        # from omria in hailo moderator
        if frame is not None:
            print(f"Frame shape: {frame.shape}")
        else:
            print("Failed to convert buffer to NumPy array.")
    else:
        print("Frame not retrieved due to missing format or dimensions.")
    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
    detection_count = 0

    for detection in detections:
        label = detection.get_label()
        bbox = detection.get_bbox()
        confidence = detection.get_confidence()

        if frame is not None:
            cv2.rectangle(
                frame,
                (int(bbox.xmin()), int(bbox.ymin())),
                (int(bbox.xmax()), int(bbox.ymax())),
                (0, 255, 0),
                2
            )
            cv2.putText(
                frame,
                f"{label}: {confidence:.2f}",
                (int(bbox.xmin()), int(bbox.ymin()) - 10),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.5,
                (0, 255, 0),
                2
            )

    if detection_count > 0:
        string_to_print += f"Total detections: {detection_count}\n"
    if lss_data.use_frame and frame is not None:
        cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        lss_data.set_frame(frame)
    cv2.putText(frame, "Custom Text", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    cv2.circle(frame, (width // 2, height // 2), 30, (0, 255, 255), 3)
    cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    print(string_to_print)
    return Gst.PadProbeReturn.OK
# -----------------------------------------------------------------------------------------------
# User Gstreamer Application
# -----------------------------------------------------------------------------------------------
class GStreamerDetectionApp(GStreamerApp):
    def __init__(self, args, lss_data):
        super().__init__(args, lss_data)

        # Set batch size (default to 1 for single-frame processing)
        self.batch_size = 1

        # Set network dimensions (default to 1280x720 for typical use cases)
        # self.network_width = 1280
        # self.network_height = 720

        self.network_width = 640
        self.network_height = 360

        # Set network format (default to RGB for Hailo compatibility)
        self.network_format = "RGB"
        # self.network_format = "YUYV"  # Uncomment if YUYV is required

        # Set default post-processing shared object path
        self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')

        # Initialize thresholds string (used for NMS thresholds)
        self.thresholds_str = ""  # Initialize thresholds_str as empty string

        # Set HEF path based on the network type
        if args.network == "yolov6n":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolov6n.hef')
        elif args.network == "yolov8s":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolov8s_h8l.hef')
        elif args.network == "yolov8n":
            # Set NMS thresholds for YOLOv8n
            nms_score_threshold = 0.1
            nms_iou_threshold = 0.1

            # Alternative thresholds (commented out)
            # nms_score_threshold = 0.3
            # nms_iou_threshold = 0.45

            # Construct thresholds string for YOLOv8n
            self.thresholds_str = f"nms-score-threshold={nms_score_threshold} nms-iou-threshold={nms_iou_threshold} output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
            self.hef_path = os.path.join(self.current_path, './hailomodel/eeg_employee_2.hef')
        elif args.network == "yolox_s_leaky":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolox_s_leaky_h8l_mz.hef')
        else:
            assert False, "Invalid network type"

        # Set LSS callback
        self.lss_callback = lss_callback

        # Set process title for easier identification in system monitoring
        setproctitle.setproctitle("Hailo Detection App")

        # Create the pipeline
        self.create_pipeline()

    def get_pipeline_string(self):
        # Define source element based on the source type
        if self.source_type == "rpi":
            # Use libcamerasrc for Raspberry Pi camera
            source_element = f"libcamerasrc name=src_0 auto-focus-mode=2 ! "
            source_element += f"video/x-raw, format={self.network_format}, width=1536, height=864 ! "
            source_element += QUEUE("queue_src_scale")
            source_element += f"videoscale ! "
            source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "

        elif self.source_type == "usb":
            # Use v4l2src for USB camera with the specified pipeline structure
            source_element = f"v4l2src device={self.video_source} name=src_0 ! "
            source_element += f"capsfilter caps=\"video/x-raw,format=YUY2\" ! "
            source_element += QUEUE("queue_src_convert")
            source_element += f"videoconvert ! "
            source_element += f"capsfilter caps=\"video/x-raw,format=I420\" ! "
            source_element += f"jpegenc ! "

            # Add multifilesink for saving frames (optional, comment out if not needed)
            # source_element += f"multifilesink location=\"frame_%05d.jpg\""

        else:
            # Use filesrc for video file input
            source_element = f"filesrc location={self.video_source} name=src_0 ! "
            source_element += QUEUE("queue_dec264")
            source_element += f" qtdemux ! h264parse ! avdec_h264 max-threads=2 ! "
            source_element += f" video/x-raw,format=I420 ! "

        # Common scaling and conversion steps for all source types (except USB)
        if self.source_type != "usb":
            source_element += QUEUE("queue_scale")
            source_element += f" videoscale n-threads=2 ! "
            source_element += QUEUE("queue_src_convert")
            source_element += f" videoconvert n-threads=3 name=src_convert qos=false ! "
            source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! "

        # Construct the full pipeline string
        pipeline_string = "hailomuxer name=hmux "
        pipeline_string += source_element
        pipeline_string += "tee name=t ! "
        pipeline_string += QUEUE("bypass_queue", max_size_buffers=20) + "hmux.sink_0 "
        pipeline_string += "t. ! " + QUEUE("queue_hailonet")

        # Convert to RGB format before hailonet (required for Hailo processing)
        pipeline_string += f"videoconvert n-threads=3 ! video/x-raw, format=RGB, width={self.network_width}, height={self.network_height} ! "

        # Add hailonet element with specified HEF path and batch size
        pipeline_string += f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! "

        # Add post-processing with hailiofilter
        pipeline_string += QUEUE("queue_hailofilter")
        self.json_config_path = "./resource/eeg_employee_2.json"
        pipeline_string += f"hailofilter function-name=\"filter\" so-path={self.default_postprocess_so} config-path={self.json_config_path} qos=false ! "

        # Merge processed and bypass streams
        pipeline_string += QUEUE("queue_hmuc") + " hmux.sink_1 "
        pipeline_string += "hmux. ! " + QUEUE("queue_hailo_python")

        # Add user callback (identity element for debugging or custom processing)
        pipeline_string += QUEUE("queue_user_callback")
        pipeline_string += f"identity name=identity_callback ! "

        # Add hailooverlay for drawing inference results
        pipeline_string += QUEUE("queue_hailooverlay")
        pipeline_string += f"hailooverlay ! "

        # Convert video format for display
        pipeline_string += QUEUE("queue_videoconvert")
        pipeline_string += f"videoconvert n-threads=3 qos=false ! "

        # Add display sink with FPS overlay
        pipeline_string += QUEUE("queue_hailo_display")
        pipeline_string += f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true "

        # Print the pipeline string for debugging
        print(pipeline_string)
        return pipeline_string

if __name__ == "__main__":
    parser = get_default_parser()
    # Add additional arguments here
    parser.add_argument("--network", default="yolov8n", choices=['yolov6n', 'yolov8s', 'yolox_s_leaky'],
                        help="Which Network to use, defult is yolov6n")
    args = parser.parse_args()
    lss_data = lss_callback_class()

    # # app = GStreamerDetectionApp(args, user_data)
    # app = GStreamerDetectionApp(args, lss_data)
    # print("Starting GStreamerDetectionApp")
    # begin = time.time()
    # app.run()
    # print("Application run completed")
    # end = time.time()
    # print("Total time: ", 733 / (end - begin))

    try:
        app = GStreamerDetectionApp(args, lss_data)  # Initialize within try block
        print("Starting GStreamerDetectionApp")
        begin = time.time()
        app.run()
        print("Application run completed")
        end = time.time()
        print("Total time: ", 733 / (end - begin))
    except Exception as e:  # Catch exceptions during initialization or running
        print(f"Error: {e}", file=sys.stderr)  # Print error to stderr
        sys.exit(1)  # Exit with error code

I just guessed that the error was related setting conditions and tried lots of things. All failed. Help me.

Hey @ssms1225,

We understand that users are experiencing difficulties modifying the main window. We’re working on updating our examples to make UI modifications more straightforward. In the meantime, we recommend either using a separate window or implementing the user callback functionality.

To better assist you with the could not link queue_hailonet to videoconvert1 error, could you please provide more details about your specific case? This error typically occurs due to a capability mismatch between pipeline elements.

Here’s a solution you can try:

The issue stems from mismatched capabilities between queue_hailonet and videoconvert1. To resolve this, we need to explicitly define the video format using a capsfilter. Here’s the modified pipeline configuration:

def get_pipeline_string(self):
    pipeline_string = f"""
        v4l2src device={self.video_source} ! video/x-raw, format=RGB, width=640, height=480, framerate=30/1 ! \
        hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! \
        capsfilter caps="video/x-raw, format=RGB, width={self.network_width}, height={self.network_height}" ! \
        videoconvert n-threads=3 qos=false ! \
        hailooverlay ! videoconvert ! fpsdisplaysink video-sink=xvimagesink sync=false text-overlay=true
    """
    print(f"Pipeline String: {pipeline_string}")
    return pipeline_string

The key change is the addition of the capsfilter element, which ensures proper format specification between the pipeline components.

Please let me know if this fixed you’re issue or not , hoping to hear from you soon.

Best Regards,
Omria

Hi, Omria

I tried lots of things to do my goal. My final goal is to do multi camera streaming. Currently, I think it’s getting better little by little.But,there are still some issues.

There are the current issue:

(venv_hailo_rpi5_examples) root@iot:/home/test_camera/Benchmarking-YOLOv8-on-Raspberry-PI-reComputer-r1000-and-AIkit-Hailo-8L-main# python hailo_app_combined_24_user_frame_multi_3.py --inputs /dev/video0 /dev/video2 -f --network yolov8n --disable-sync
Starting GStreamerDetectionApp
v4l2src device=/dev/video0 ! video/x-raw, width=640, height=360, framerate=10/1 ! queue name=queue_scale_0 max-size-buffers=5 max-size-bytes=0 max-size-time=0 ! videoscale ! video/x-raw, width=640, height=360 ! queue name=queue_convert_0 max-size-buffers=5 max-size-bytes=0 max-size-time=0 ! videoconvert ! video/x-raw, format=RGB ! tee name=t_0 ! queue name=queue_display_0 max-size-buffers=5 max-size-bytes=0 max-size-time=0 ! identity name=identity_callback_0 ! videoconvert ! fakesink sync=false name=hailo_display_0 t_0. ! queue name=queue_hailo_0 max-size-buffers=5 max-size-bytes=0 max-size-time=0 ! videoscale ! video/x-raw, width=640, height=640 ! hailonet hef-path=/home/test_camera/Benchmarking-YOLOv8-on-Raspberry-PI-reComputer-r1000-and-AIkit-Hailo-8L-main/./hailomodel/eeg_employee_2.hef batch-size=1 nms-score-threshold=0.1 nms-iou-threshold=0.1 output-format-type=HAILO_FORMAT_TYPE_FLOAT32 force-writable=true ! queue name=queue_filter_0 max-size-buffers=5 max-size-bytes=0 max-size-time=0 ! hailofilter function-name=“filter” so-path=/usr/lib/aarch64-linux-gnu/hailo/tappas/post_processes/libyolo_hailortpp_post.so config-path=./resource/eeg_employee_2.json qos=false ! queue name=queue_overlay_0 max-size-buffers=5 max-size-bytes=0 max-size-time=0 ! hailooverlay ! videoconvert ! fakesink sync=false v4l2src device=/dev/video2 ! video/x-raw, width=640, height=360, framerate=10/1 ! queue name=queue_scale_1 max-size-buffers=5 max-size-bytes=0 max-size-time=0 ! videoscale ! video/x-raw, width=640, height=360 ! queue name=queue_convert_1 max-size-buffers=5 max-size-bytes=0 max-size-time=0 ! videoconvert ! video/x-raw, format=RGB ! tee name=t_1 ! queue name=queue_display_1 max-size-buffers=5 max-size-bytes=0 max-size-time=0 ! identity name=identity_callback_1 ! videoconvert ! fakesink sync=false name=hailo_display_1 t_1. ! queue name=queue_hailo_1 max-size-buffers=5 max-size-bytes=0 max-size-time=0 ! videoscale ! video/x-raw, width=640, height=640 ! hailonet hef-path=/home/test_camera/Benchmarking-YOLOv8-on-Raspberry-PI-reComputer-r1000-and-AIkit-Hailo-8L-main/./hailomodel/eeg_employee_2.hef batch-size=1 nms-score-threshold=0.1 nms-iou-threshold=0.1 output-format-type=HAILO_FORMAT_TYPE_FLOAT32 force-writable=true ! queue name=queue_filter_1 max-size-buffers=5 max-size-bytes=0 max-size-time=0 ! hailofilter function-name=“filter” so-path=/usr/lib/aarch64-linux-gnu/hailo/tappas/post_processes/libyolo_hailortpp_post.so config-path=./resource/eeg_employee_2.json qos=false ! queue name=queue_overlay_1 max-size-buffers=5 max-size-bytes=0 max-size-time=0 ! hailooverlay ! videoconvert ! fakesink sync=false
[HailoRT] [error] CHECK failed - Failed to create vdevice. there are not enough free devices. requested: 1, found: 0
[HailoRT] [error] CHECK_SUCCESS failed with status=HAILO_OUT_OF_PHYSICAL_DEVICES(74)
[HailoRT] [error] CHECK_SUCCESS failed with status=HAILO_OUT_OF_PHYSICAL_DEVICES(74)
[HailoRT] [error] CHECK_SUCCESS failed with status=HAILO_OUT_OF_PHYSICAL_DEVICES(74)
[HailoRT] [error] CHECK_SUCCESS failed with status=HAILO_OUT_OF_PHYSICAL_DEVICES(74)
CHECK_EXPECTED failed with status=74
FPS measurement enabled
Started display process for Camera 0
Started display process for Camera 1
Segmentation fault

Also, this is my code: hailo_app_combined_24_user_frame_multi_3.py

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import json
import hailo
import supervision as sv
import queue
from typing import List, Optional
from multiprocessing import Process

# Try to import hailo python module
try:
    import hailo
except ImportError:
    exit("Failed to import hailo python module. Make sure you are in hailo virtual environment.")

def handle_rgb(map_info, width, height):
    return np.ndarray(shape=(height, width, 3), dtype=np.uint8, buffer=map_info.data).copy()

def handle_nv12(map_info, width, height):
    y_plane_size = width * height
    uv_plane_size = width * height // 2
    y_plane = np.ndarray(shape=(height, width), dtype=np.uint8, buffer=map_info.data[:y_plane_size]).copy()
    uv_plane = np.ndarray(shape=(height // 2, width // 2, 2), dtype=np.uint8,
                          buffer=map_info.data[y_plane_size:]).copy()
    return y_plane, uv_plane

def handle_yuyv(map_info, width, height):
    return np.ndarray(shape=(height, width, 2), dtype=np.uint8, buffer=map_info.data).copy()

FORMAT_HANDLERS = {
    'RGB': handle_rgb,
    'NV12': handle_nv12,
    'YUYV': handle_yuyv,
}

def get_numpy_from_buffer(buffer, format, width, height):
    success, map_info = buffer.map(Gst.MapFlags.READ)
    if not success:
        raise ValueError("Buffer mapping failed")

    try:
        handler = FORMAT_HANDLERS.get(format)
        if handler is None:
            raise ValueError(f"Unsupported format: {format}")
        return handler(map_info, width, height)
    finally:
        buffer.unmap(map_info)

def disable_qos(pipeline):
    if not isinstance(pipeline, Gst.Pipeline):
        print("The provided object is not a GStreamer Pipeline")
        return

    it = pipeline.iterate_elements()
    while True:
        result, element = it.next()
        if result != Gst.IteratorResult.OK:
            break

        if 'qos' in GObject.list_properties(element):
            element.set_property('qos', False)
            print(f"Set qos to False for {element.get_name()}")

# class LSStreamCallback:
class lss_callback_class:
    """Class to handle camera stream callbacks and frame processing"""
    DEFAULT_QUEUE_SIZE = 3
    DEFAULT_MAX_HISTORY = 100
    DEFAULT_FRAME_TIMEOUT = 0.01

    def __init__(self, camera_id: int):
        """Initialize the callback handler for a specific camera"""
        self.camera_id = camera_id
        self._initialize_counters()
        self._initialize_queues()
        self._initialize_state()

    def _initialize_counters(self):
        """Initialize frame and detection counters"""
        self.frame_count = 0
        self.last_frame_time = time.time()
        self.fps = 0
        self.detection_history = []
        self.max_history = self.DEFAULT_MAX_HISTORY

    def _initialize_queues(self):
        """Initialize frame queue"""
        self.frame_queue = multiprocessing.Queue(maxsize=self.DEFAULT_QUEUE_SIZE)

    def _initialize_state(self):
        """Initialize state flags"""
        self.running = True
        self.use_frame = True
        self._stop_event = multiprocessing.Event()

    def increment(self):
        self.frame_count += 1
        current_time = time.time()
        time_diff = current_time - self.last_frame_time
        if time_diff > 0:
            self.fps = 1.0 / time_diff
        self.last_frame_time = current_time

    def set_frame(self, frame):
        if self._stop_event.is_set():
            return

        if frame is None:
            print(f"Warning (Camera {self.camera_id}): Attempting to set None frame")
            return

        try:
            print(f"Camera {self.camera_id}: Got frame of shape {frame.shape}")  # Debug print
            if not self.frame_queue.full():
                self.frame_queue.put(frame, block=False)
            else:
                try:
                    self.frame_queue.get_nowait()  # Remove oldest frame
                    self.frame_queue.put(frame, block=False)
                    print(f"Camera {self.camera_id}: Replaced old frame")
                except queue.Empty:
                    print(f"Warning (Camera {self.camera_id}): Queue unexpectedly empty")
        except queue.Full:
            print(f"Warning (Camera {self.camera_id}): Frame queue is full, dropping frame")
        except Exception as e:
            print(f"Error (Camera {self.camera_id}) setting frame: {e}")

    def get_frame(self):
        if self._stop_event.is_set():
            return None

        try:
            frame = self.frame_queue.get(timeout=self.DEFAULT_FRAME_TIMEOUT)
            print(f"Camera {self.camera_id}: Retrieved frame")  # Debug print
            return frame
        except queue.Empty:
            return None
        except Exception as e:
            print(f"Error (Camera {self.camera_id}) getting frame: {e}")
            return None

    def add_detection_count(self, count: int):
        self.detection_history.append(count)
        if len(self.detection_history) > self.max_history:
            self.detection_history.pop(0)

    def get_average_detections(self) -> float:
        return sum(self.detection_history) / len(self.detection_history) if self.detection_history else 0

    def clear_queue(self):
        while not self.frame_queue.empty():
            try:
                self.frame_queue.get_nowait()
            except queue.Empty:
                break

    def stop(self):
        self.running = False
        self._stop_event.set()
        self.clear_queue()

    def __del__(self):
        try:
            self.stop()
        except Exception as e:
            print(f"Error (Camera {self.camera_id}) in cleanup: {e}")

    def get_count(self):
        return self.frame_count

    def get_fps(self):
        return self.fps

def display_user_data_frame(lss_data: lss_callback_class):
    os.environ['QT_QPA_PLATFORM'] = 'xcb'  # Use XCB instead of Wayland
    os.environ['DISPLAY'] = ':0'

    window_name = f"Camera {lss_data.camera_id}"
    window_width = 640
    window_height = 640

    try:
        # Initialize window with fixed size
        cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
        cv2.resizeWindow(window_name, window_width, window_height)

        if lss_data.camera_id == 0:
            cv2.moveWindow(window_name, 0, 0)
        else:
            cv2.moveWindow(window_name, window_width, 0)

        print(f"Starting display for Camera {lss_data.camera_id}")

        while lss_data.running:
            frame = lss_data.get_frame()
            if frame is not None:
                try:
                    frame = cv2.resize(frame, (window_width, window_height))

                    debug_info = [
                        f"Camera {lss_data.camera_id}",
                        f"FPS: {lss_data.get_fps():.1f}",
                        f"Frame: {lss_data.get_count()}"
                    ]

                    y_offset = 30
                    for text in debug_info:
                        cv2.putText(frame, text, (10, y_offset),
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.7,
                                    (0, 255, 0), 2)
                        y_offset += 25

                    cv2.imshow(window_name, frame)

                except cv2.error as e:
                    print(f"OpenCV error for Camera {lss_data.camera_id}: {e}")
                    continue

            key = cv2.waitKey(1) & 0xFF
            if key == ord('q'):
                print(f"Quit signal received for Camera {lss_data.camera_id}")
                lss_data.stop()
                break
            elif key == ord('f'):
                current_state = cv2.getWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN)
                new_state = cv2.WINDOW_NORMAL if current_state == cv2.WINDOW_FULLSCREEN else cv2.WINDOW_FULLSCREEN
                cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, new_state)

    except Exception as e:
        print(f"Display error for Camera {lss_data.camera_id}: {e}")
    finally:
        print(f"Closing display for Camera {lss_data.camera_id}")
        cv2.destroyWindow(window_name)

def toggle_fullscreen(window_name: str):
    """Toggle fullscreen mode for a window"""
    current_state = cv2.getWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN)
    new_state = (cv2.WINDOW_NORMAL if current_state == cv2.WINDOW_FULLSCREEN
                 else cv2.WINDOW_FULLSCREEN)
    cv2.setWindowProperty(window_name, cv2.WND_PROP_FULLSCREEN, new_state)


def get_caps_from_pad(pad: Gst.Pad) -> tuple:
    """Get format, width, and height from GStreamer pad"""
    caps = pad.get_current_caps()
    if caps and (structure := caps.get_structure(0)):
        return (structure.get_value('format'),
                structure.get_value('width'),
                structure.get_value('height'))
    return None, None, None

def get_default_parser() -> argparse.ArgumentParser:
    """Create and configure argument parser"""
    parser = argparse.ArgumentParser(description="Hailo Multi-Camera App Help")
    parser.add_argument("--inputs", "-i", nargs='+', type=str,
                        default=['/dev/video0'],
                        help="List of input sources.")
    parser.add_argument("--show-fps", "-f", action="store_true",
                        help="Print FPS on sink")
    parser.add_argument("--disable-sync", action="store_true",
                        help="Disables display sink sync.")
    parser.add_argument("--dump-dot", action="store_true",
                        help="Dump the pipeline graph to a dot file.")
    return parser

def QUEUE(name, max_size_buffers=5, max_size_bytes=0, max_size_time=0):
    return f"queue name={name} max-size-buffers={max_size_buffers} max-size-bytes={max_size_bytes} max-size-time={max_size_time} ! "

def get_source_type(input_source: str) -> str:
    """Determine the type of input source"""
    if input_source.startswith("/dev/video"):
        return 'usb'
    return 'rpi' if input_source.startswith("rpi") else 'file'

def USER_CALLBACK_PIPELINE(name: str = 'identity_callback') -> str:
    """Create a user callback pipeline string"""
    return f'{QUEUE(name=f"{name}_q")} ! identity name={name}'

class GStreamerApp:
    DEFAULT_NETWORK_WIDTH = 640
    DEFAULT_NETWORK_HEIGHT = 640
    DEFAULT_NETWORK_FORMAT = "RGB"

    def __init__(self, args, lss_data_list):
        setproctitle.setproctitle("Hailo Multi-Camera App")
        self.options_menu = args
        self._initialize_environment()
        self._initialize_parameters(lss_data_list)
        self._initialize_pipeline_components()

    def _initialize_environment(self):
        """Initialize environment-related parameters"""
        self.postprocess_dir = os.environ.get('TAPPAS_POST_PROC_DIR', '')
        if not self.postprocess_dir:
            raise EnvironmentError("TAPPAS_POST_PROC_DIR environment variable is not set.")

        self.current_path = os.path.dirname(os.path.abspath(__file__))
        if self.options_menu.dump_dot:
            os.environ["GST_DEBUG_DUMP_DOT_DIR"] = self.current_path

    def _initialize_parameters(self, lss_data_list):
        """Initialize main application parameters"""
        self.video_sources = self.options_menu.inputs
        self.source_types = [get_source_type(source) for source in self.video_sources]
        self.lss_data_list = lss_data_list
        self.num_cameras = len(self.video_sources)

        self.batch_size = 1
        self.network_width = self.DEFAULT_NETWORK_WIDTH
        self.network_height = self.DEFAULT_NETWORK_HEIGHT
        self.network_format = self.DEFAULT_NETWORK_FORMAT

        self.sync = "false" if self.options_menu.disable_sync else "true"

    def _initialize_pipeline_components(self):
        """Initialize GStreamer pipeline components"""
        self.video_sink = "fakesink"
        self.default_postprocess_so = None
        self.hef_path = None
        self.app_callback = None
        self.lss_callbacks = [lss_callback] * self.num_cameras
        self.pipeline = None
        self.loop = None

    def quit(self, display_processes: Optional[List[multiprocessing.Process]] = None):
        """Gracefully exit the application."""
        print("Cleaning up resources...")

        if display_processes:
            for process in display_processes:
                if process.is_alive():
                    print(f"Terminating process for Camera {process.pid}...")
                    process.terminate()
                    process.join(timeout=2.0)
                    if process.is_alive():
                        print(f"Forcefully killing process {process.pid}")
                        process.kill()

        if self.pipeline:
            self.pipeline.set_state(Gst.State.NULL)
            print("Pipeline stopped")

        if self.loop and self.loop.is_running():
            self.loop.quit()

        print("Cleanup complete.")

    def create_pipeline(self):
        """Create and configure the GStreamer pipeline"""
        Gst.init(None)
        self.loop = GLib.MainLoop()

        pipeline_string = self.get_pipeline_string()

        try:
            self.pipeline = Gst.parse_launch(pipeline_string)

            self._setup_callbacks()
            self._setup_fps_measurement()
            self._setup_bus()

            return True

        except Exception as e:
            print(f"Pipeline creation error: {e}")
            print(f"Pipeline string: {pipeline_string}")
            self.quit()
            return False

    def get_pipeline_string(self):

        return ""

    def _setup_callbacks(self):
        for i in range(self.num_cameras):
            identity = self.pipeline.get_by_name(f"identity_callback_{i}")
            if identity:
                identity_pad = identity.get_static_pad("src")
                identity_pad.add_probe(
                    Gst.PadProbeType.BUFFER,
                    self.lss_callbacks[i],
                    self.lss_data_list[i]
                )
            else:
                print(f"Warning: Could not find identity_callback_{i} element")

    def fps_probe_callback(self, pad, info, user_data_list):
        try:
            current_time = time.time()
            for user_data in user_data_list:
                if not hasattr(user_data, 'last_fps_time'):
                    user_data.last_fps_time = current_time
                    user_data.fps_frame_count = 0

                user_data.fps_frame_count += 1
                time_diff = current_time - user_data.last_fps_time

                if time_diff >= 1.0:
                    user_data.current_fps = user_data.fps_frame_count / time_diff
                    if self.options_menu.show_fps:
                        print(f"Camera {user_data.camera_id} FPS: {user_data.current_fps:.2f}")
                    user_data.fps_frame_count = 0
                    user_data.last_fps_time = current_time

        except Exception as e:
            print(f"Error in FPS measurement: {e}")

        return Gst.PadProbeReturn.OK

    def _setup_fps_measurement(self):
        """Set up FPS measurement if enabled"""
        if not self.options_menu.show_fps:
            return

        print("FPS measurement enabled")
        for data in self.lss_data_list:
            if not hasattr(data, 'last_fps_time'):
                data.last_fps_time = time.time()
                data.fps_frame_count = 0
                data.current_fps = 0

        fakesink = self.pipeline.get_by_name("hailo_display")
        if fakesink:
            sink_pad = fakesink.get_static_pad("sink")
            if sink_pad:
                sink_pad.add_probe(
                    Gst.PadProbeType.BUFFER,
                    self.fps_probe_callback,
                    self.lss_data_list
                )

    def bus_call(self, bus, message, loop):
        """Enhanced bus call handler"""
        try:
            t = message.type
            if t == Gst.MessageType.EOS:
                print("End-of-stream")
                loop.quit()
            elif t == Gst.MessageType.ERROR:
                err, debug = message.parse_error()
                print(f"Error: {err}")
                print(f"Debug info: {debug}")
                loop.quit()
            elif t == Gst.MessageType.WARNING:
                err, debug = message.parse_warning()
                print(f"Warning: {err}")
                print(f"Debug info: {debug}")
            elif t == Gst.MessageType.STATE_CHANGED:
                if message.src == self.pipeline:
                    old_state, new_state, pending_state = message.parse_state_changed()
                    print(f"Pipeline state changed from {old_state.value_nick} to {new_state.value_nick}")
        except Exception as e:
            print(f"Error in bus_call: {e}")
        return True

    def _setup_bus(self):
        """Set up the GStreamer bus"""
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", self.bus_call, self.loop)

    def dump_dot_file(self):
        print("Dumping dot file...")
        Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
        return False

    def run(self):
        """Main application run method"""
        display_processes = None
        try:
            if not self.create_pipeline():
                return

            display_processes = self._start_display_processes()
            if not display_processes:
                return

            if not self._start_pipeline():
                return

            if self.options_menu.dump_dot:
                GLib.timeout_add_seconds(3, self.dump_dot_file)

            print("Starting main loop")
            self.loop.run()

        except KeyboardInterrupt:
            print("\nPipeline interrupted by user. Exiting...")
        except Exception as e:
            print(f"Unexpected error during pipeline execution: {e}")
        finally:
            self.quit(display_processes)

    def _start_display_processes(self):
        """Start display processes for each camera"""
        display_processes = []
        for lss_data in self.lss_data_list:
            try:
                process = multiprocessing.Process(
                    target=display_user_data_frame,
                    args=(lss_data,)
                )
                display_processes.append(process)
                process.start()
                print(f"Started display process for Camera {lss_data.camera_id}")
            except Exception as e:
                print(f"Error starting display process for Camera {lss_data.camera_id}: {e}")
                self.quit(display_processes)
                return None
        return display_processes

    def _start_pipeline(self):
        """Start the GStreamer pipeline"""
        ret = self.pipeline.set_state(Gst.State.PLAYING)
        if ret == Gst.StateChangeReturn.FAILURE:
            print("Failed to set pipeline to PLAYING state.")
            return False
        print(f"Pipeline state change result: {ret.value_name}")
        return True

def lss_callback(pad, info, lss_data):
    if not lss_data.running:
        return Gst.PadProbeReturn.DROP

    try:
        buffer = info.get_buffer()
        if buffer is None:
            print(f"Error (Camera {lss_data.camera_id}): No buffer received")
            return Gst.PadProbeReturn.OK

        format, width, height = get_caps_from_pad(pad)
        if None in (format, width, height):
            print(f"Error (Camera {lss_data.camera_id}): Missing format information")
            return Gst.PadProbeReturn.OK

        frame = get_numpy_from_buffer(buffer, format, width, height)
        if frame is None:
            print(f"Error (Camera {lss_data.camera_id}): Could not get frame from buffer")
            return Gst.PadProbeReturn.OK

        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)

        roi = hailo.get_roi_from_buffer(buffer)
        detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
        detection_count = 0

        display_frame = frame.copy()

        for detection in detections:
            label = detection.get_label()
            bbox = detection.get_bbox()
            confidence = detection.get_confidence()

            x1 = max(0, int(bbox.xmin() * width))
            y1 = max(0, int(bbox.ymin() * height))
            x2 = min(width, int(bbox.xmax() * width))
            y2 = min(height, int(bbox.ymax() * height))

            cv2.rectangle(display_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            label_text = f"{label} {confidence:.2f}"

            (text_width, text_height), _ = cv2.getTextSize(
                label_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)
            cv2.rectangle(display_frame,
                         (x1, y1 - text_height - 10),
                         (x1 + text_width, y1),
                         (0, 255, 0),
                         -1)

            cv2.putText(display_frame, label_text,
                       (x1, y1 - 5),
                       cv2.FONT_HERSHEY_SIMPLEX,
                       0.5,
                       (0, 0, 0),
                       2)

            detection_count += 1

        lss_data.add_detection_count(detection_count)

        info_text = [
            f"Camera {lss_data.camera_id}",
            f"FPS: {lss_data.get_fps():.1f}",
            f"Detections: {detection_count}",
            f"Avg Detections: {lss_data.get_average_detections():.1f}"
        ]
        y_offset = 30
        for text in info_text:
            cv2.putText(display_frame, text, (10, y_offset),
                       cv2.FONT_HERSHEY_SIMPLEX,
                       0.7,
                       (0, 255, 0),
                       2)
            y_offset += 25

        lss_data.set_frame(display_frame)
        lss_data.increment()

        return Gst.PadProbeReturn.OK

    except Exception as e:
        print(f"Error in callback (Camera {lss_data.camera_id}): {e}")
        return Gst.PadProbeReturn.OK

class GStreamerDetectionApp(GStreamerApp):
    def __init__(self, args, lss_data_list):
        super().__init__(args, lss_data_list)
        self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')

        self.batch_size = self.num_cameras if self.num_cameras > 1 else 1
        self.json_config_path = "./resource/eeg_employee_2.json"

        if args.network == "yolov8n":
            nms_score_threshold = 0.1
            nms_iou_threshold = 0.1
            self.thresholds_str = (f"nms-score-threshold={nms_score_threshold} "
                                   f"nms-iou-threshold={nms_iou_threshold} "
                                   f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32")
            self.hef_path = os.path.join(self.current_path, './hailomodel/eeg_employee_2.hef')

        setproctitle.setproctitle("Hailo Detection App")

    def get_pipeline_string(self):
        if self.num_cameras == 1:
            return self._get_single_camera_pipeline()
        else:
            return self._get_multi_camera_pipeline()

    def _get_single_camera_pipeline(self):
        """Original single camera pipeline implementation"""
        source = self.video_sources[0]
        pipeline_string = (
            f"v4l2src device={source} ! "
            f"video/x-raw, width=640, height=360, framerate=10/1 ! "
            f"{QUEUE('queue_scale')} "
            f"videoscale n-threads=2 ! "
            f"{QUEUE('queue_convert')} "
            f"videoconvert n-threads=3 qos=false ! "
            f"video/x-raw, format={self.network_format}, "
            f"width={self.network_width}, height={self.network_height}, "
            f"pixel-aspect-ratio=1/1 ! "
            f"tee name=t ! "
            f"{QUEUE('queue_hailo')} "
            f"hailonet hef-path={self.hef_path} batch-size=1 {self.thresholds_str} "
            f"force-writable=true ! "
            f"{QUEUE('queue_filter')} "
            f"hailofilter function-name=\"filter\" so-path={self.default_postprocess_so} "
            f"config-path={self.json_config_path} qos=false ! "
            f"{QUEUE('queue_overlay')} "
            f"identity name=identity_callback_0 ! "
            f"hailooverlay ! "
            f"{QUEUE('queue_display')} "
            f"videoconvert n-threads=3 qos=false ! "
            f"fakesink sync={self.sync} name=hailo_display"
        )
        print(pipeline_string)
        return pipeline_string

    def _get_multi_camera_pipeline(self):
        """Generate pipeline string for multiple cameras"""
        pipeline_parts = []

        for i, source in enumerate(self.video_sources):
            source_pipeline = (
                f"v4l2src device={source} ! "
                f"video/x-raw, width=640, height=360, framerate=10/1 ! "
                f"{QUEUE(f'queue_scale_{i}')} videoscale ! "
                f"video/x-raw, width=640, height=480 ! "  # Consistent output size
                # f"video/x-raw, width=640, height=360 ! "  # Consistent output size
                f"{QUEUE(f'queue_convert_{i}')} videoconvert ! "
                f"video/x-raw, format={self.network_format} ! "
                f"tee name=t_{i} ! "  # First tee branch for display
                f"{QUEUE(f'queue_display_{i}')} "
                f"identity name=identity_callback_{i} ! "
                f"videoconvert ! "
                f"fakesink sync={self.sync} name=hailo_display_{i} "
                f"t_{i}. ! "  # Second tee branch for Hailo processing
                f"{QUEUE(f'queue_hailo_{i}')} "
                f"videoscale ! "
                f"video/x-raw, width={self.network_width}, height={self.network_height} ! "
                f"hailonet hef-path={self.hef_path} batch-size=1 {self.thresholds_str} "
                f"force-writable=true ! "
                f"{QUEUE(f'queue_filter_{i}')} "
                f"hailofilter function-name=\"filter\" "
                f"so-path={self.default_postprocess_so} "
                f"config-path={self.json_config_path} qos=false ! "
                f"{QUEUE(f'queue_overlay_{i}')} "
                f"hailooverlay ! "
                f"videoconvert ! "
                f"fakesink sync={self.sync} "  # Second sink for processed frames
            )
            pipeline_parts.append(source_pipeline)
        pipeline_string = " ".join(pipeline_parts)
        print(pipeline_string)
        return pipeline_string

if __name__ == "__main__":
    parser = get_default_parser()
    parser.add_argument("--network",
                        default="yolov8n",
                        choices=['yolov6n', 'yolov8s', 'yolox_s_leaky', 'yolov8n'],
                        help="Which Network to use, default is yolov8n")
    args = parser.parse_args()

    try:
        lss_data_list = [lss_callback_class(camera_id=i)
                         for i in range(len(args.inputs))]

        app = GStreamerDetectionApp(args, lss_data_list)
        print("Starting GStreamerDetectionApp")
        begin = time.time()
        app.run()
        print("Application run completed")
        end = time.time()
        print("Total time: ", 733 / (end - begin))

    except KeyboardInterrupt:
        print("\nApplication interrupted by user")
    except Exception as e:
        print(f"Error running application: {e}")
        import traceback

        traceback.print_exc()
    finally:
        print("Application terminated")

I tried to streaming /dev/video0 and /dev/video2 …There is no problem when I do one by one. Omria … give me some guides. Thanks in advance.