gst_parse_error: could not link videoconvert0 to hailonet0, hailonet0 can't handle caps

Hi,

I managed to came to here, but I don’t why this error causes.
It’s for editing customized window. Recently, I came to know that this is working.
But, there are still continuous error in python script.

gst-launch-1.0 v4l2src device=/dev/video0 ! image/jpeg,width=320,height=240,framerate=15/1 ! jpegdec ! autovideosink

And , belows are customized python code.

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import json
import hailo
# import supervision as sv
import sys

# Try to import hailo python module
try:
    import hailo
except ImportError:
    exit("Failed to import hailo python module. Make sure you are in hailo virtual environment.")

# ---------------------------------------------------------
# Functions used to get numpy arrays from GStreamer buffers
# ---------------------------------------------------------
def handle_rgb(map_info, width, height):
    return np.ndarray(shape=(height, width, 3), dtype=np.uint8, buffer=map_info.data).copy()

def handle_nv12(map_info, width, height):
    y_plane_size = width * height
    uv_plane_size = width * height // 2
    y_plane = np.ndarray(shape=(height, width), dtype=np.uint8, buffer=map_info.data[:y_plane_size]).copy()
    uv_plane = np.ndarray(shape=(height // 2, width // 2, 2), dtype=np.uint8,
                          buffer=map_info.data[y_plane_size:]).copy()
    return y_plane, uv_plane

def handle_yuyv(map_info, width, height):
    return np.ndarray(shape=(height, width, 2), dtype=np.uint8, buffer=map_info.data).copy()

FORMAT_HANDLERS = {
    'RGB': handle_rgb,
    'NV12': handle_nv12,
    'YUYV': handle_yuyv,
}

def get_numpy_from_buffer(buffer, format, width, height):
    success, map_info = buffer.map(Gst.MapFlags.READ)
    if not success:
        raise ValueError("Buffer mapping failed")

    try:
        # format='YUYV'
        handler = FORMAT_HANDLERS.get(format)
        if handler is None:
            raise ValueError(f"Unsupported format: {format}")
        return handler(map_info, width, height)
    finally:
        buffer.unmap(map_info)

# ---------------------------------------------------------
# Useful functions for working with GStreamer
# ---------------------------------------------------------
def disable_qos(pipeline):
    if not isinstance(pipeline, Gst.Pipeline):
        print("The provided object is not a GStreamer Pipeline")
        return

    it = pipeline.iterate_elements()
    while True:
        result, element = it.next()
        if result != Gst.IteratorResult.OK:
            break

        if 'qos' in GObject.list_properties(element):
            element.set_property('qos', False)
            print(f"Set qos to False for {element.get_name()}")

# -----------------------------------------------------------------------------------------------
# User defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
class lss_callback_class:
    def __init__(self):
        self.frame_count = 0
        self.use_frame = False
        self.frame_queue = multiprocessing.Queue(maxsize=3)
        self.running = True

    def increment(self):
        self.frame_count += 1

    def get_count(self):
        return self.frame_count

    def set_frame(self, frame):
        if not self.frame_queue.full():
            self.frame_queue.put(frame)

    def get_frame(self):
        if not self.frame_queue.empty():
            return self.frame_queue.get()
        else:
            return None

# -----------------------------------------------------------------------------------------------
# Common functions
# -----------------------------------------------------------------------------------------------
def get_caps_from_pad(pad: Gst.Pad):
    caps = pad.get_current_caps()
    if caps:
        structure = caps.get_structure(0)
        if structure:
            format = structure.get_value('format')
            width = structure.get_value('width')
            height = structure.get_value('height')
            return format, width, height
    return None, None, None

def display_user_data_frame(lss_data: lss_callback_class):
    while lss_data.running:
        frame = lss_data.get_frame()
        if frame is not None:
            cv2.imshow("User Frame", frame)
        cv2.waitKey(1)
    cv2.destroyAllWindows()

def display_lss_data_frame(lss_data: lss_callback_class):
    while lss_data.running:
        frame = lss_data.get_frame()
        if frame is not None:
            # Add custom overlays
            cv2.putText(frame, "Custom Overlay Text", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
            cv2.circle(frame, (frame.shape[1] // 2, frame.shape[0] // 2), 50, (0, 255, 0), 3)
            cv2.imshow("User Frame", frame)
            cv2.waitKey(1)
    cv2.destroyAllWindows()

def get_default_parser():
    parser = argparse.ArgumentParser(description="Hailo App Help")
    parser.add_argument("--input", "-i", type=str, default="/dev/video0", help="Input source. Can be a file, USB or RPi camera (CSI camera module). \
                        For RPi camera use '-i rpi' (Still in Beta). \
                        Defaults to /dev/video0")
    parser.add_argument("--use-frame", "-u", action="store_true", help="Use frame from the callback function")
    parser.add_argument("--show-fps", "-f", action="store_true", help="Print FPS on sink")
    parser.add_argument("--disable-sync", action="store_true", help="Disables display sink sync, will run as fast possible. Relevant when using file source.")
    parser.add_argument("--dump-dot", action="store_true", help="Dump the pipeline graph to a dot file pipeline.dot")
    return parser

def QUEUE(name, max_size_buffers=3, max_size_bytes=0, max_size_time=0):
    return f"queue name={name} max-size-buffers={max_size_buffers} max-size-bytes={max_size_bytes} max-size-time={max_size_time} ! "

def get_source_type(input_source):
    if input_source.startswith("/dev/video"):
        return 'usb'
    else:
        if input_source.startswith("rpi"):
            return 'rpi'
        else:
            return 'file'

def USER_CALLBACK_PIPELINE(name='identity_callback'):
    return f'{QUEUE(name=f"{name}_q")} ! identity name={name}'

# -----------------------------------------------------------------------------------------------
# GStreamerApp class
# -----------------------------------------------------------------------------------------------
class GStreamerApp:
    def __init__(self, args, lss_data: lss_callback_class):
        # def __init__(self, args, process_frame_callback: lss_callback_class):
        # Set the process title
        setproctitle.setproctitle("Hailo Python App")

        # Create an empty options menu
        self.options_menu = args

        # Initialize variables
        tappas_postprocess_dir = os.environ.get('TAPPAS_POST_PROC_DIR', '')
        if tappas_postprocess_dir == '':
            print("TAPPAS_POST_PROC_DIR environment variable is not set. Please set it to by sourcing setup_env.sh")
            exit(1)
        self.current_path = os.path.dirname(os.path.abspath(__file__))
        self.postprocess_dir = tappas_postprocess_dir
        self.video_source = self.options_menu.input
        self.source_type = get_source_type(self.video_source)
        # self.user_data = user_data
        self.lss_data = lss_data
        self.video_sink = "xvimagesink"
        # self.video_sink = "autovideosink"
        # self.video_sink = "fakesink"

        # Set Hailo parameters these parameters shuold be set based on the model used
        self.batch_size = 1

        self.network_width = 1280
        self.network_height = 720

        self.network_format = "RGB"
        # self.network_format = "YUYV"

        self.default_postprocess_so = None
        self.hef_path = None
        self.app_callback = None
        self.lss_callback = lss_callback_class

        # Set user data parameters
        lss_data.use_frame = self.options_menu.use_frame

        if (self.options_menu.disable_sync or self.source_type != "file"):
            self.sync = "false"
        else:
            self.sync = "true"

        if (self.options_menu.dump_dot):
            os.environ["GST_DEBUG_DUMP_DOT_DIR"] = self.current_path

    def on_fps_measurement(self, sink, fps, droprate, avgfps):
        print(f"FPS: {fps:.2f}, Droprate: {droprate:.2f}, Avg FPS: {avgfps:.2f}")
        return True

    def create_pipeline(self):
        # Initialize GStreamer
        Gst.init(None)

        pipeline_string = self.get_pipeline_string()
        try:
            self.pipeline = Gst.parse_launch(pipeline_string)

            identity = self.pipeline.get_by_name("identity_callback")
            if identity:
                identity_pad = identity.get_static_pad("src")
                identity_pad.add_probe(Gst.PadProbeType.BUFFER, self.lss_callback, self.lss_data)

        except Exception as e:
            print(e)
            print(pipeline_string)
            exit(1)

        # connect to hailo_display fps-measurements
        if (self.options_menu.show_fps):
            print("Showing FPS")
            self.pipeline.get_by_name("hailo_display").connect("fps-measurements", self.on_fps_measurement)

        # Create a GLib Main Loop
        self.loop = GLib.MainLoop()

    # internal builted and let them know when there is a event.
    def bus_call(self, bus, message, loop):
        t = message.type
        if t == Gst.MessageType.EOS:
            print("End-of-stream")
            loop.quit()
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print(f"Error: {err}, {debug}")
            loop.quit()
        # QOS
        elif t == Gst.MessageType.QOS:
            # Handle QoS message here
            qos_element = message.src.get_name()
            print(f"QoS message received from {qos_element}")
        return True

    def get_pipeline_string(self):

        return ""

    def dump_dot_file(self):
        print("Dumping dot file...")
        Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
        return False

    def run(self):

        # Add a watch for messages on the pipeline's bus
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", self.bus_call, self.loop)

        # get xvimagesink element and disable qos
        # xvimagesink is instantiated by fpsdisplaysink
        hailo_display = self.pipeline.get_by_name("hailo_display")
        if hailo_display is None:
            print(
                "Warning: hailo_display element not found, add <fpsdisplaysink name=hailo_display> to your pipeline to support fps display.")
        else:
            xvimagesink = hailo_display.get_by_name("xvimagesink0")
            if xvimagesink is not None:
                xvimagesink.set_property("qos", False)

        # Disable QoS to prevent frame drops
        disable_qos(self.pipeline)

        # start a sub process to run the display_user_data_frame function
        if (self.options_menu.use_frame):
            # display_process = multiprocessing.Process(target=display_user_data_frame, args=(self.user_data,))
            # display_process = multiprocessing.Process(target=display_user_data_frame, args=(self.lss_data,))
            display_process = multiprocessing.Process(target=display_lss_data_frame, args=(self.lss_data,))
            display_process.start()

        # Set pipeline to PLAYING state
        self.pipeline.set_state(Gst.State.PLAYING)

        # dump dot file
        if (self.options_menu.dump_dot):
            GLib.timeout_add_seconds(3, self.dump_dot_file)

        # Run the GLib event loop and wait event in the loop.
        try:
            self.loop.run()
        except:
            pass

        # Clean up
        self.lss_data.running = False
        self.pipeline.set_state(Gst.State.NULL)
        if (self.options_menu.use_frame):
            display_process.terminate()
            display_process.join()

# -----------------------------------------------------------------------------------------------
# User defined callback function
# -----------------------------------------------------------------------------------------------
def lss_callback(pad, info, lss_data):
    print(f"lss_data id: {id(lss_data)}")
    print(f"lss_data.use_frame: {lss_data.use_frame}")
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK
    format, width, height = get_caps_from_pad(pad)
    print(f"width: {width}")
    print(f"height: {height}")

    # # from omria in hailo moderator
    # if not format or not width or not height:
    #     print("Error: Format, width, or height not retrieved from pad caps.")

    if not format or not width or not height:
        print("Error: Format, width, or height not retrieved from pad caps.", file=sys.stderr)  # Print to stderr
        return Gst.PadProbeReturn.DROP  # Drop the buffer


    string_to_print = f"Frame count: {lss_data.get_count()}\n"

    frame = None  # Initialize frame to None



    if lss_data.use_frame and format and width and height:
        frame = get_numpy_from_buffer(buffer, format, width, height)

        # from omria in hailo moderator
        if frame is not None:
            print(f"Frame shape: {frame.shape}")
        else:
            print("Failed to convert buffer to NumPy array.")
    else:
        print("Frame not retrieved due to missing format or dimensions.")
    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
    detection_count = 0

    for detection in detections:
        label = detection.get_label()
        bbox = detection.get_bbox()
        confidence = detection.get_confidence()

        if frame is not None:
            cv2.rectangle(
                frame,
                (int(bbox.xmin()), int(bbox.ymin())),
                (int(bbox.xmax()), int(bbox.ymax())),
                (0, 255, 0),
                2
            )
            cv2.putText(
                frame,
                f"{label}: {confidence:.2f}",
                (int(bbox.xmin()), int(bbox.ymin()) - 10),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.5,
                (0, 255, 0),
                2
            )

    if detection_count > 0:
        string_to_print += f"Total detections: {detection_count}\n"
    if lss_data.use_frame and frame is not None:
        cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        lss_data.set_frame(frame)
    cv2.putText(frame, "Custom Text", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    cv2.circle(frame, (width // 2, height // 2), 30, (0, 255, 255), 3)
    cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    print(string_to_print)
    return Gst.PadProbeReturn.OK
# -----------------------------------------------------------------------------------------------
# User Gstreamer Application
# -----------------------------------------------------------------------------------------------
class GStreamerDetectionApp(GStreamerApp):
    def __init__(self, args, lss_data):
        super().__init__(args, lss_data)
        # self.batch_size = 2
        self.batch_size = 1

        self.network_width = 1280
        self.network_height = 720

        self.network_format = "RGB"
        # self.network_format = "YUYV"
        self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')
        # self.thresholds_str = ""  # Initialize thresholds_str as empty string

        if args.network == "yolov6n":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolov6n.hef')
        elif args.network == "yolov8s":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolov8s_h8l.hef')
        elif args.network == "yolov8n":
            nms_score_threshold = 0.1
            nms_iou_threshold = 0.1

            # nms_score_threshold = 0.3
            # nms_iou_threshold = 0.45

            self.thresholds_str = f"nms-score-threshold={nms_score_threshold} nms-iou-threshold={nms_iou_threshold} output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
            self.hef_path = os.path.join(self.current_path, './hailomodel/eeg_employee_2.hef')
        elif args.network == "yolox_s_leaky":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolox_s_leaky_h8l_mz.hef')
        else:
            assert False, "Invalid network type"
        self.lss_callback = lss_callback
        setproctitle.setproctitle("Hailo Detection App")
        self.create_pipeline()

    def get_pipeline_string(self):
        if (self.source_type == "rpi"):
            source_element = f"libcamerasrc name=src_0 auto-focus-mode=2 ! "
            source_element += f"video/x-raw, format={self.network_format}, width=1536, height=864 ! "
            source_element += QUEUE("queue_src_scale")
            source_element += f"videoscale ! "
            source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "

        elif (self.source_type == "usb"):
            source_element = f"v4l2src device={self.video_source} name=src_0 ! "
            source_element += f"image/jpeg, width={self.network_width}, height={self.network_height}, framerate=100/1 ! "
            source_element += f"jpegdec ! "  # Decode JPEG back to raw video
            source_element += f"video/x-raw, format={self.network_format} ! "  # Ensure correct format after decoding

        else:
            source_element = f"filesrc location={self.video_source} name=src_0 ! "
            source_element += QUEUE("queue_dec264")
            source_element += f" qtdemux ! h264parse ! avdec_h264 max-threads=2 ! "
            source_element += f" video/x-raw,format=I420 ! "
        source_element += QUEUE("queue_scale")
        source_element += f" videoscale n-threads=2 ! "
        source_element += QUEUE("queue_src_convert")
        source_element += f" videoconvert n-threads=3 name=src_convert qos=false ! "
        source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! "

        pipeline_string = "hailomuxer name=hmux "
        pipeline_string += source_element
        pipeline_string += "tee name=t ! "
        pipeline_string += QUEUE("bypass_queue", max_size_buffers=20) + "hmux.sink_0 "
        pipeline_string += "t. ! " + QUEUE("queue_hailonet")
        # pipeline_string += "videoconvert n-threads=3 ! "

        # pipeline_string += f"videoconvert n-threads=3 ! video/x-raw, format=RGB ! " # Convert to RGB *before* hailonet

        pipeline_string += f"videoconvert n-threads=3 ! video/x-raw, format=RGB, width={self.network_width}, height={self.network_height} ! "

        pipeline_string += f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! "

        pipeline_string += QUEUE("queue_hailofilter")
        self.json_config_path = "./resource/eeg_employee_2.json"
        pipeline_string += f"hailofilter function-name=\"filter\" so-path={self.default_postprocess_so} config-path={self.json_config_path} qos=false ! "

        pipeline_string += QUEUE("queue_hmuc") + " hmux.sink_1 "
        pipeline_string += "hmux. ! " + QUEUE("queue_hailo_python")
        pipeline_string += QUEUE("queue_user_callback")
        pipeline_string += f"identity name=identity_callback ! "
        pipeline_string += QUEUE("queue_hailooverlay")
        pipeline_string += f"hailooverlay ! "
        pipeline_string += QUEUE("queue_videoconvert")
        pipeline_string += f"videoconvert n-threads=3 qos=false ! "

        pipeline_string += QUEUE("queue_hailo_display")
        pipeline_string += f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true "
        print(pipeline_string)
        return pipeline_string

if __name__ == "__main__":
    parser = get_default_parser()
    # Add additional arguments here
    parser.add_argument("--network", default="yolov8n", choices=['yolov6n', 'yolov8s', 'yolox_s_leaky'],
                        help="Which Network to use, defult is yolov6n")
    args = parser.parse_args()
    lss_data = lss_callback_class()

    # # app = GStreamerDetectionApp(args, user_data)
    # app = GStreamerDetectionApp(args, lss_data)
    # print("Starting GStreamerDetectionApp")
    # begin = time.time()
    # app.run()
    # print("Application run completed")
    # end = time.time()
    # print("Total time: ", 733 / (end - begin))

    try:
        app = GStreamerDetectionApp(args, lss_data)  # Initialize within try block
        print("Starting GStreamerDetectionApp")
        begin = time.time()
        app.run()
        print("Application run completed")
        end = time.time()
        print("Total time: ", 733 / (end - begin))
    except Exception as e:  # Catch exceptions during initialization or running
        print(f"Error: {e}", file=sys.stderr)  # Print error to stderr
        sys.exit(1)  # Exit with error code

The result is like these:

root@iot:/home/test_camera/Benchmarking-YOLOv8-on-Raspberry-PI-reComputer-r1000-and-AIkit-Hailo-8L-main# python hailo_app_combined_9.py --input /dev/video0 -f
hailomuxer name=hmux v4l2src device=/dev/video0 name=src_0 ! image/jpeg, width=1280, height=720, framerate=100/1 ! jpegdec ! video/x-raw, format=RGB ! queue name=queue_scale max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoscale n-threads=2 ! queue name=queue_src_convert max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoconvert n-threads=3 name=src_convert qos=false ! video/x-raw, format=RGB, width=1280, height=720, pixel-aspect-ratio=1/1 ! tee name=t ! queue name=bypass_queue max-size-buffers=20 max-size-bytes=0 max-size-time=0 ! hmux.sink_0 t. ! queue name=queue_hailonet max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoconvert n-threads=3 ! video/x-raw, format=RGB, width=1280, height=720 ! hailonet hef-path=/home/test_camera/Benchmarking-YOLOv8-on-Raspberry-PI-reComputer-r1000-and-AIkit-Hailo-8L-main/./hailomodel/eeg_employee_2.hef batch-size=1 nms-score-threshold=0.1 nms-iou-threshold=0.1 output-format-type=HAILO_FORMAT_TYPE_FLOAT32 force-writable=true ! queue name=queue_hailofilter max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! hailofilter function-name=“filter” so-path=/usr/lib/aarch64-linux-gnu/hailo/tappas/post_processes/libyolo_hailortpp_post.so config-path=./resource/eeg_employee_2.json qos=false ! queue name=queue_hmuc max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! hmux.sink_1 hmux. ! queue name=queue_hailo_python max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! queue name=queue_user_callback max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! identity name=identity_callback ! queue name=queue_hailooverlay max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! hailooverlay ! queue name=queue_videoconvert max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoconvert n-threads=3 qos=false ! queue name=queue_hailo_display max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! fpsdisplaysink video-sink=xvimagesink name=hailo_display sync=false text-overlay=True signal-fps-measurements=true
gst_parse_error: could not link videoconvert0 to hailonet0, hailonet0 can’t handle caps video/x-raw, format=(string)RGB, width=(int)1280, height=(int)720 (3)
hailomuxer name=hmux v4l2src device=/dev/video0 name=src_0 ! image/jpeg, width=1280, height=720, framerate=100/1 ! jpegdec ! video/x-raw, format=RGB ! queue name=queue_scale max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoscale n-threads=2 ! queue name=queue_src_convert max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoconvert n-threads=3 name=src_convert qos=false ! video/x-raw, format=RGB, width=1280, height=720, pixel-aspect-ratio=1/1 ! tee name=t ! queue name=bypass_queue max-size-buffers=20 max-size-bytes=0 max-size-time=0 ! hmux.sink_0 t. ! queue name=queue_hailonet max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoconvert n-threads=3 ! video/x-raw, format=RGB, width=1280, height=720 ! hailonet hef-path=/home/test_camera/Benchmarking-YOLOv8-on-Raspberry-PI-reComputer-r1000-and-AIkit-Hailo-8L-main/./hailomodel/eeg_employee_2.hef batch-size=1 nms-score-threshold=0.1 nms-iou-threshold=0.1 output-format-type=HAILO_FORMAT_TYPE_FLOAT32 force-writable=true ! queue name=queue_hailofilter max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! hailofilter function-name=“filter” so-path=/usr/lib/aarch64-linux-gnu/hailo/tappas/post_processes/libyolo_hailortpp_post.so config-path=./resource/eeg_employee_2.json qos=false ! queue name=queue_hmuc max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! hmux.sink_1 hmux. ! queue name=queue_hailo_python max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! queue name=queue_user_callback max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! identity name=identity_callback ! queue name=queue_hailooverlay max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! hailooverlay ! queue name=queue_videoconvert max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoconvert n-threads=3 qos=false ! queue name=queue_hailo_display max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! fpsdisplaysink video-sink=xvimagesink name=hailo_display sync=false text-overlay=True signal-fps-measurements=true

Hey @ssms1225 ,

Let me help you resolve the error could not link videoconvert0 to hailonet0 where HailoNet can’t handle your video format.

This issue happens when there’s a mismatch between your video format and what HailoNet expects. Here’s how to fix it:

  1. First, check what format your HEF file expects:
hailortcli hef-info -f /path/to/your.hef

This will show you the required dimensions, format, and data type.

  1. Then, modify your pipeline to match these requirements. For example, if HailoNet needs BGR format at 416x416:
source_element = f"v4l2src device={self.video_source} name=src_0 ! "
source_element += f"image/jpeg, width=1280, height=720, framerate=30/1 ! jpegdec ! "
source_element += f"video/x-raw, format=BGR ! videoscale ! "
source_element += f"video/x-raw, width=416, height=416 ! videoconvert ! "
  1. Add specific caps for HailoNet:
pipeline_string += f"videoconvert ! video/x-raw, format=BGR, width=416, height=416, framerate=30/1 ! "
pipeline_string += f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} force-writable=true ! "

Let me know if you need help adjusting these settings for your specific use case!