[2nd]Error during editing main window : Failed to map buffer for writing

Hi, omria

I already did that but the result is the same. Below is a combined code for hailo_common_funcs.py, hailo_rpi_common.py and object_detection_hailo.py. Currently, I can see only for my customized inference. I really want to edit main window. It’s not easy for me to do that. These are some prints of my customized lss_callback function:
lss_data id: 140731942412560
lss_data.use_frame: False
Frame not retrieved due to missing format or dimensions.
label: JJH, bbox: <hailo.HailoBBox(140731448667904)>, confidence: 0.15631043910980225
Frame count: 0

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import json
import hailo
import supervision as sv

# Try to import hailo python module
    import hailo
except ImportError:
    exit("Failed to import hailo python module. Make sure you are in hailo virtual environment.")

# ---------------------------------------------------------
# Functions used to get numpy arrays from GStreamer buffers
# ---------------------------------------------------------
def handle_rgb(map_info, width, height):
    return np.ndarray(shape=(height, width, 3), dtype=np.uint8, buffer=map_info.data).copy()

def handle_nv12(map_info, width, height):
    y_plane_size = width * height
    uv_plane_size = width * height // 2
    y_plane = np.ndarray(shape=(height, width), dtype=np.uint8, buffer=map_info.data[:y_plane_size]).copy()
    uv_plane = np.ndarray(shape=(height // 2, width // 2, 2), dtype=np.uint8,
    return y_plane, uv_plane

def handle_yuyv(map_info, width, height):
    return np.ndarray(shape=(height, width, 2), dtype=np.uint8, buffer=map_info.data).copy()

    'RGB': handle_rgb,
    'NV12': handle_nv12,
    'YUYV': handle_yuyv,

def get_numpy_from_buffer(buffer, format, width, height):
    success, map_info = buffer.map(Gst.MapFlags.READ)
    if not success:
        raise ValueError("Buffer mapping failed")

        handler = FORMAT_HANDLERS.get(format)
        if handler is None:
            raise ValueError(f"Unsupported format: {format}")
        return handler(map_info, width, height)

# ---------------------------------------------------------
# Useful functions for working with GStreamer
# ---------------------------------------------------------
def disable_qos(pipeline):
    if not isinstance(pipeline, Gst.Pipeline):
        print("The provided object is not a GStreamer Pipeline")

    it = pipeline.iterate_elements()
    while True:
        result, element = it.next()
        if result != Gst.IteratorResult.OK:

        if 'qos' in GObject.list_properties(element):
            element.set_property('qos', False)
            print(f"Set qos to False for {element.get_name()}")

# -----------------------------------------------------------------------------------------------
# User defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
class lss_callback_class:
    def __init__(self):
        self.frame_count = 0
        self.use_frame = False
        self.frame_queue = multiprocessing.Queue(maxsize=3)
        self.running = True

    def increment(self):
        self.frame_count += 1

    def get_count(self):
        return self.frame_count

    def set_frame(self, frame):
        if not self.frame_queue.full():

    def get_frame(self):
        if not self.frame_queue.empty():
            return self.frame_queue.get()
            return None

# -----------------------------------------------------------------------------------------------
# Common functions
# -----------------------------------------------------------------------------------------------
def get_caps_from_pad(pad: Gst.Pad):
    caps = pad.get_current_caps()
    if caps:
        structure = caps.get_structure(0)
        if structure:
            format = structure.get_value('format')
            width = structure.get_value('width')
            height = structure.get_value('height')
            return format, width, height
    return None, None, None

def display_user_data_frame(lss_data: lss_callback_class):
    while lss_data.running:
        frame = lss_data.get_frame()
        if frame is not None:
            cv2.imshow("User Frame", frame)

def display_lss_data_frame(lss_data: lss_callback_class):
    while lss_data.running:
        frame = lss_data.get_frame()
        if frame is not None:
            cv2.imshow("User Frame", frame)

def get_default_parser():
    parser = argparse.ArgumentParser(description="Hailo App Help")
    parser.add_argument("--input", "-i", type=str, default="/dev/video0", help="Input source. Can be a file, USB or RPi camera (CSI camera module). \
                        For RPi camera use '-i rpi' (Still in Beta). \
                        Defaults to /dev/video0")
    parser.add_argument("--use-frame", "-u", action="store_true", help="Use frame from the callback function")
    parser.add_argument("--show-fps", "-f", action="store_true", help="Print FPS on sink")
    parser.add_argument("--disable-sync", action="store_true", help="Disables display sink sync, will run as fast possible. Relevant when using file source.")
    parser.add_argument("--dump-dot", action="store_true", help="Dump the pipeline graph to a dot file pipeline.dot")
    return parser

def QUEUE(name, max_size_buffers=3, max_size_bytes=0, max_size_time=0):
    return f"queue name={name} max-size-buffers={max_size_buffers} max-size-bytes={max_size_bytes} max-size-time={max_size_time} ! "

def get_source_type(input_source):
    if input_source.startswith("/dev/video"):
        return 'usb'
        if input_source.startswith("rpi"):
            return 'rpi'
            return 'file'

def USER_CALLBACK_PIPELINE(name='identity_callback'):
    return f'{QUEUE(name=f"{name}_q")} ! identity name={name}'

# -----------------------------------------------------------------------------------------------
# GStreamerApp class
# -----------------------------------------------------------------------------------------------
class GStreamerApp:
    def __init__(self, args, lss_data: lss_callback_class):
        # def __init__(self, args, process_frame_callback: lss_callback_class):
        # Set the process title
        setproctitle.setproctitle("Hailo Python App")

        # Create an empty options menu
        self.options_menu = args

        # Initialize variables
        tappas_postprocess_dir = os.environ.get('TAPPAS_POST_PROC_DIR', '')
        if tappas_postprocess_dir == '':
            print("TAPPAS_POST_PROC_DIR environment variable is not set. Please set it to by sourcing setup_env.sh")
        self.current_path = os.path.dirname(os.path.abspath(__file__))
        self.postprocess_dir = tappas_postprocess_dir
        self.video_source = self.options_menu.input
        self.source_type = get_source_type(self.video_source)
        # self.user_data = user_data
        self.lss_data = lss_data
        # self.video_sink = "xvimagesink"
        self.video_sink = "autovideosink"
        # self.video_sink = "fakesink"

        # Set Hailo parameters these parameters shuold be set based on the model used
        self.batch_size = 1
        self.network_width = 640
        self.network_height = 640
        self.network_format = "RGB"
        self.default_postprocess_so = None
        self.hef_path = None
        self.app_callback = None
        self.lss_callback = lss_callback_class

        # Set user data parameters
        lss_data.use_frame = self.options_menu.use_frame

        if (self.options_menu.disable_sync or self.source_type != "file"):
            self.sync = "false"
            self.sync = "true"

        if (self.options_menu.dump_dot):
            os.environ["GST_DEBUG_DUMP_DOT_DIR"] = self.current_path

    def on_fps_measurement(self, sink, fps, droprate, avgfps):
        print(f"FPS: {fps:.2f}, Droprate: {droprate:.2f}, Avg FPS: {avgfps:.2f}")
        return True

    def create_pipeline(self):
        # Initialize GStreamer

        pipeline_string = self.get_pipeline_string()
            self.pipeline = Gst.parse_launch(pipeline_string)

            identity = self.pipeline.get_by_name("identity_callback")
            if identity:
                identity_pad = identity.get_static_pad("src")
                identity_pad.add_probe(Gst.PadProbeType.BUFFER, self.lss_callback, self.lss_data)

        except Exception as e:

        # connect to hailo_display fps-measurements
        if (self.options_menu.show_fps):
            print("Showing FPS")
            self.pipeline.get_by_name("hailo_display").connect("fps-measurements", self.on_fps_measurement)

        # Create a GLib Main Loop
        self.loop = GLib.MainLoop()

    def bus_call(self, bus, message, loop):
        t = message.type
        if t == Gst.MessageType.EOS:
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print(f"Error: {err}, {debug}")
        # QOS
        elif t == Gst.MessageType.QOS:
            # Handle QoS message here
            qos_element = message.src.get_name()
            print(f"QoS message received from {qos_element}")
        return True

    def get_pipeline_string(self):

        return ""

    def dump_dot_file(self):
        print("Dumping dot file...")
        Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
        return False

    def run(self):

        # Add a watch for messages on the pipeline's bus
        bus = self.pipeline.get_bus()
        bus.connect("message", self.bus_call, self.loop)

        # get xvimagesink element and disable qos
        # xvimagesink is instantiated by fpsdisplaysink
        hailo_display = self.pipeline.get_by_name("hailo_display")
        if hailo_display is None:
                "Warning: hailo_display element not found, add <fpsdisplaysink name=hailo_display> to your pipeline to support fps display.")
            xvimagesink = hailo_display.get_by_name("xvimagesink0")
            if xvimagesink is not None:
                xvimagesink.set_property("qos", False)

        # Disable QoS to prevent frame drops

        # start a sub process to run the display_user_data_frame function
        if (self.options_menu.use_frame):
            # display_process = multiprocessing.Process(target=display_user_data_frame, args=(self.user_data,))
            display_process = multiprocessing.Process(target=display_user_data_frame, args=(self.lss_data,))

        # Set pipeline to PLAYING state

        # dump dot file
        if (self.options_menu.dump_dot):
            GLib.timeout_add_seconds(3, self.dump_dot_file)

        # Run the GLib event loop

        # Clean up
        self.lss_data.running = False
        if (self.options_menu.use_frame):

# -----------------------------------------------------------------------------------------------
# User defined callback function
# -----------------------------------------------------------------------------------------------
def lss_callback(pad, info, lss_data):
    print(f"lss_data id: {id(lss_data)}")
    print(f"lss_data.use_frame: {lss_data.use_frame}")
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK
    format, width, height = get_caps_from_pad(pad)
    string_to_print = f"Frame count: {lss_data.get_count()}\n"

    frame = None  # Initialize frame to None

    if lss_data.use_frame and format and width and height:
        frame = get_numpy_from_buffer(buffer, format, width, height)
        if frame is None:
            print("Failed to get frame from buffer.")
            print(f"Frame shape: {frame.shape}")
        print("Frame not retrieved due to missing format or dimensions.")
    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
    detection_count = 0
    for detection in detections:
        label = detection.get_label()
        bbox = detection.get_bbox()
        confidence = detection.get_confidence()
        print(f"label: {label}, bbox: {bbox}, confidence: {confidence}")
        if label != "JJH" and lss_data.use_frame:
            frame = lss_data.get_frame()
            if frame is not None:
                cv2.rectangle(frame, (int(bbox.xmin()), int(bbox.ymin())), (int(bbox.xmax()), int(bbox.ymax())), (0, 255, 0), 2)
                x1_norm = bbox.xmin()
                y1_norm = bbox.ymin()
                x2_norm = bbox.xmax()
                y2_norm = bbox.ymax()
                x1 = int(x1_norm * width)
                y1 = int(y1_norm * height)
                x2 = int(x2_norm * width)
                y2 = int(y2_norm * height)
                okay_bbox = (x1, y1, x2 - x1, y2 - y1)
                string_to_print += f"Detection {detection_count}: Label = {label}, BBox = {okay_bbox}, Confidence = {confidence:.2f}\n"
                detection_count += 1
                output_dir = '/opt/iot/results'
                os.makedirs(output_dir, exist_ok=True)
                output_path = os.path.join(output_dir, f"frame_{lss_data.get_count()}.jpg")
                cv2.imwrite(output_path, cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
                print(f"Saved frame to {output_path}")
    if detection_count > 0:
        string_to_print += f"Total detections: {detection_count}\n"
    if lss_data.use_frame and frame is not None:
        cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
    cv2.putText(frame, "Custom Text", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    cv2.circle(frame, (width // 2, height // 2), 30, (0, 255, 255), 3)
    cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    return Gst.PadProbeReturn.OK
# -----------------------------------------------------------------------------------------------
# User Gstreamer Application
# -----------------------------------------------------------------------------------------------
class GStreamerDetectionApp(GStreamerApp):
    def __init__(self, args, lss_data):
        super().__init__(args, lss_data)
        self.batch_size = 2
        self.network_width = 640
        self.network_height = 640
        self.network_format = "RGB"
        self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')
        # self.thresholds_str = ""  # Initialize thresholds_str as empty string

        if args.network == "yolov6n":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolov6n.hef')
        elif args.network == "yolov8s":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolov8s_h8l.hef')
        elif args.network == "yolov8n":
            nms_score_threshold = 0.1
            nms_iou_threshold = 0.1
            self.thresholds_str = f"nms-score-threshold={nms_score_threshold} nms-iou-threshold={nms_iou_threshold} output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
            self.hef_path = os.path.join(self.current_path, './hailomodel/eeg_employee_2.hef')
        elif args.network == "yolox_s_leaky":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolox_s_leaky_h8l_mz.hef')
            assert False, "Invalid network type"
        self.lss_callback = lss_callback
        setproctitle.setproctitle("Hailo Detection App")

    def get_pipeline_string(self):
        if (self.source_type == "rpi"):
            source_element = f"libcamerasrc name=src_0 auto-focus-mode=2 ! "
            source_element += f"video/x-raw, format={self.network_format}, width=1536, height=864 ! "
            source_element += QUEUE("queue_src_scale")
            source_element += f"videoscale ! "
            source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "

        elif (self.source_type == "usb"):
            source_element = f"v4l2src device={self.video_source} name=src_0 ! "
            source_element += f"video/x-raw, width=640, height=480, framerate=30/1 ! "
            source_element = f"filesrc location={self.video_source} name=src_0 ! "
            source_element += QUEUE("queue_dec264")
            source_element += f" qtdemux ! h264parse ! avdec_h264 max-threads=2 ! "
            source_element += f" video/x-raw,format=I420 ! "
        source_element += QUEUE("queue_scale")
        source_element += f" videoscale n-threads=2 ! "
        source_element += QUEUE("queue_src_convert")
        source_element += f" videoconvert n-threads=3 name=src_convert qos=false ! "
        source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! "

        pipeline_string = "hailomuxer name=hmux "
        pipeline_string += source_element
        pipeline_string += "tee name=t ! "
        pipeline_string += QUEUE("bypass_queue", max_size_buffers=20) + "hmux.sink_0 "
        pipeline_string += "t. ! " + QUEUE("queue_hailonet")
        pipeline_string += "videoconvert n-threads=3 ! "
        pipeline_string += f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! "
        pipeline_string += QUEUE("queue_hailofilter")

        self.json_config_path = "./resource/eeg_employee_2.json"
        pipeline_string += f"hailofilter function-name=\"filter\" so-path={self.default_postprocess_so} config-path={self.json_config_path} qos=false ! "

        pipeline_string += QUEUE("queue_hmuc") + " hmux.sink_1 "
        pipeline_string += "hmux. ! " + QUEUE("queue_hailo_python")
        pipeline_string += QUEUE("queue_user_callback")
        pipeline_string += f"identity name=identity_callback ! "
        pipeline_string += QUEUE("queue_hailooverlay")
        pipeline_string += f"hailooverlay ! "
        pipeline_string += QUEUE("queue_videoconvert")
        pipeline_string += f"videoconvert n-threads=3 qos=false ! "
        pipeline_string += QUEUE("queue_hailo_display")
        pipeline_string += f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true "
        return pipeline_string

if __name__ == "__main__":
    parser = get_default_parser()
    # Add additional arguments here
    parser.add_argument("--network", default="yolov8n", choices=['yolov6n', 'yolov8s', 'yolox_s_leaky'],
                        help="Which Network to use, defult is yolov6n")
    args = parser.parse_args()
    lss_data = lss_callback_class()
    # app = GStreamerDetectionApp(args, user_data)
    app = GStreamerDetectionApp(args, lss_data)
    print("Starting GStreamerDetectionApp")
    begin = time.time()
    print("Application run completed")
    end = time.time()
    print("Total time: ", 733 / (end - begin))

Hey @ssms1225

You are experiencing issues while attempting to edit the main window to display custom inference results. The specific error message you encountered is:

Frame not retrieved due to missing format or dimensions.

1. Fix Frame Retrieval Issue

Ensure that the frame format and dimensions are correctly retrieved from the GStreamer pad using the get_caps_from_pad() function:

format, width, height = get_caps_from_pad(pad)
if not format or not width or not height:
    print("Error: Format, width, or height not retrieved from pad caps.")

Verify that the pipeline source is configured to output frames in the expected format:

source_element = f"v4l2src device={self.video_source} name=src_0 ! "
source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "

Test the logic for converting the buffer to a NumPy array:

frame = get_numpy_from_buffer(buffer, format, width, height)
if frame is not None:
    print(f"Frame shape: {frame.shape}")
    print("Failed to convert buffer to NumPy array.")

2. Customize the Display Window

Modify the display_lss_data_frame function to add overlays, such as bounding boxes, labels, or other custom graphics:

def display_lss_data_frame(lss_data: lss_callback_class):
    while lss_data.running:
        frame = lss_data.get_frame()
        if frame is not None:
            # Add custom overlays
            cv2.putText(frame, "Custom Overlay Text", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
            cv2.circle(frame, (frame.shape[1] // 2, frame.shape[0] // 2), 50, (0, 255, 0), 3)
            cv2.imshow("User Frame", frame)

3. Enhance lss_callback for More Informative Outputs

Process detection results dynamically and visualize them on the frames:

for detection in detections:
    label = detection.get_label()
    bbox = detection.get_bbox()
    confidence = detection.get_confidence()

    if frame is not None:
            (int(bbox.xmin()), int(bbox.ymin())),
            (int(bbox.xmax()), int(bbox.ymax())),
            (0, 255, 0),
            f"{label}: {confidence:.2f}",
            (int(bbox.xmin()), int(bbox.ymin()) - 10),
            (0, 255, 0),

Hi, omria
I tried like you said, but there was an error like these:
(venv_hailo_rpi5_examples) root@iot:/home/test_camera/Benchmarking-YOLOv8-on-Raspberry-PI-reComputer-r1000-and-AIkit-Hailo-8L-main# python hailo_app_combined_5.py --input /dev/video0 -f

hailomuxer name=hmux v4l2src device=/dev/video0 name=src_0 ! video/x-raw, format=RGB, width=640, height=640, framerate=30/1 ! queue name=queue_scale max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoscale n-threads=2 ! queue name=queue_src_convert max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoconvert n-threads=3 name=src_convert qos=false ! video/x-raw, format=RGB, width=640, height=640, pixel-aspect-ratio=1/1 ! tee name=t ! queue name=bypass_queue max-size-buffers=20 max-size-bytes=0 max-size-time=0 ! hmux.sink_0 t. ! queue name=queue_hailonet max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoconvert n-threads=3 ! hailonet hef-path=/home/test_camera/Benchmarking-YOLOv8-on-Raspberry-PI-reComputer-r1000-and-AIkit-Hailo-8L-main/./hailomodel/eeg_employee_2.hef batch-size=2 nms-score-threshold=0.1 nms-iou-threshold=0.1 output-format-type=HAILO_FORMAT_TYPE_FLOAT32 force-writable=true ! queue name=queue_hailofilter max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! hailofilter function-name=“filter” so-path=/usr/lib/aarch64-linux-gnu/hailo/tappas/post_processes/libyolo_hailortpp_post.so config-path=./resource/eeg_employee_2.json qos=false ! queue name=queue_hmuc max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! hmux.sink_1 hmux. ! queue name=queue_hailo_python max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! queue name=queue_user_callback max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! identity name=identity_callback ! queue name=queue_hailooverlay max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! hailooverlay ! queue name=queue_videoconvert max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoconvert n-threads=3 qos=false ! queue name=queue_hailo_display max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! fpsdisplaysink video-sink=autovideosink name=hailo_display sync=false text-overlay=True signal-fps-measurements=true
Showing FPS
Starting GStreamerDetectionApp
Error: gst-stream-error-quark: Internal data stream error. (1), …/libs/gst/base/gstbasesrc.c(3132): gst_base_src_loop (): /GstPipeline:pipeline0/GstV4l2Src:src_0:
streaming stopped, reason not-negotiated (-4)
Application run completed
Total time: 265.24232344324963

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import json
import hailo
import supervision as sv

# Try to import hailo python module
    import hailo
except ImportError:
    exit("Failed to import hailo python module. Make sure you are in hailo virtual environment.")

# ---------------------------------------------------------
# Functions used to get numpy arrays from GStreamer buffers
# ---------------------------------------------------------
def handle_rgb(map_info, width, height):
    return np.ndarray(shape=(height, width, 3), dtype=np.uint8, buffer=map_info.data).copy()

def handle_nv12(map_info, width, height):
    y_plane_size = width * height
    uv_plane_size = width * height // 2
    y_plane = np.ndarray(shape=(height, width), dtype=np.uint8, buffer=map_info.data[:y_plane_size]).copy()
    uv_plane = np.ndarray(shape=(height // 2, width // 2, 2), dtype=np.uint8,
    return y_plane, uv_plane

def handle_yuyv(map_info, width, height):
    return np.ndarray(shape=(height, width, 2), dtype=np.uint8, buffer=map_info.data).copy()

    'RGB': handle_rgb,
    'NV12': handle_nv12,
    'YUYV': handle_yuyv,

def get_numpy_from_buffer(buffer, format, width, height):
    success, map_info = buffer.map(Gst.MapFlags.READ)
    if not success:
        raise ValueError("Buffer mapping failed")

        handler = FORMAT_HANDLERS.get(format)
        if handler is None:
            raise ValueError(f"Unsupported format: {format}")
        return handler(map_info, width, height)

# ---------------------------------------------------------
# Useful functions for working with GStreamer
# ---------------------------------------------------------
def disable_qos(pipeline):
    if not isinstance(pipeline, Gst.Pipeline):
        print("The provided object is not a GStreamer Pipeline")

    it = pipeline.iterate_elements()
    while True:
        result, element = it.next()
        if result != Gst.IteratorResult.OK:

        if 'qos' in GObject.list_properties(element):
            element.set_property('qos', False)
            print(f"Set qos to False for {element.get_name()}")

# -----------------------------------------------------------------------------------------------
# User defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
class lss_callback_class:
    def __init__(self):
        self.frame_count = 0
        self.use_frame = False
        self.frame_queue = multiprocessing.Queue(maxsize=3)
        self.running = True

    def increment(self):
        self.frame_count += 1

    def get_count(self):
        return self.frame_count

    def set_frame(self, frame):
        if not self.frame_queue.full():

    def get_frame(self):
        if not self.frame_queue.empty():
            return self.frame_queue.get()
            return None

# -----------------------------------------------------------------------------------------------
# Common functions
# -----------------------------------------------------------------------------------------------
def get_caps_from_pad(pad: Gst.Pad):
    caps = pad.get_current_caps()
    if caps:
        structure = caps.get_structure(0)
        if structure:
            format = structure.get_value('format')
            width = structure.get_value('width')
            height = structure.get_value('height')
            return format, width, height
    return None, None, None

def display_user_data_frame(lss_data: lss_callback_class):
    while lss_data.running:
        frame = lss_data.get_frame()
        if frame is not None:
            cv2.imshow("User Frame", frame)

def display_lss_data_frame(lss_data: lss_callback_class):
    # while lss_data.running:
    #     frame = lss_data.get_frame()
    #     if frame is not None:
    #         cv2.imshow("User Frame", frame)
    #     cv2.waitKey(1)
    # cv2.destroyAllWindows()

    while lss_data.running:
        frame = lss_data.get_frame()
        if frame is not None:
            # Add custom overlays
            cv2.putText(frame, "Custom Overlay Text", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
            cv2.circle(frame, (frame.shape[1] // 2, frame.shape[0] // 2), 50, (0, 255, 0), 3)
            cv2.imshow("User Frame", frame)

def get_default_parser():
    parser = argparse.ArgumentParser(description="Hailo App Help")
    parser.add_argument("--input", "-i", type=str, default="/dev/video0", help="Input source. Can be a file, USB or RPi camera (CSI camera module). \
                        For RPi camera use '-i rpi' (Still in Beta). \
                        Defaults to /dev/video0")
    parser.add_argument("--use-frame", "-u", action="store_true", help="Use frame from the callback function")
    parser.add_argument("--show-fps", "-f", action="store_true", help="Print FPS on sink")
    parser.add_argument("--disable-sync", action="store_true", help="Disables display sink sync, will run as fast possible. Relevant when using file source.")
    parser.add_argument("--dump-dot", action="store_true", help="Dump the pipeline graph to a dot file pipeline.dot")
    return parser

def QUEUE(name, max_size_buffers=3, max_size_bytes=0, max_size_time=0):
    return f"queue name={name} max-size-buffers={max_size_buffers} max-size-bytes={max_size_bytes} max-size-time={max_size_time} ! "

Hi, Omria
Recently, I came to know that this command is working and I can see live streaming from it.

gst-launch-1.0 v4l2src device=/dev/video0 ! image/jpeg,width=320,height=240,framerate=15/1 ! jpegdec ! autovideosink

Could you help me correct my combined script to edit the main window if possible?

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import json
import hailo
import supervision as sv

# Try to import hailo python module
    import hailo
except ImportError:
    exit("Failed to import hailo python module. Make sure you are in hailo virtual environment.")

# ---------------------------------------------------------
# Functions used to get numpy arrays from GStreamer buffers
# ---------------------------------------------------------
def handle_rgb(map_info, width, height):
    return np.ndarray(shape=(height, width, 3), dtype=np.uint8, buffer=map_info.data).copy()

def handle_nv12(map_info, width, height):
    y_plane_size = width * height
    uv_plane_size = width * height // 2
    y_plane = np.ndarray(shape=(height, width), dtype=np.uint8, buffer=map_info.data[:y_plane_size]).copy()
    uv_plane = np.ndarray(shape=(height // 2, width // 2, 2), dtype=np.uint8,
    return y_plane, uv_plane

def handle_yuyv(map_info, width, height):
    return np.ndarray(shape=(height, width, 2), dtype=np.uint8, buffer=map_info.data).copy()

    'RGB': handle_rgb,
    'NV12': handle_nv12,
    'YUYV': handle_yuyv,

def get_numpy_from_buffer(buffer, format, width, height):
    success, map_info = buffer.map(Gst.MapFlags.READ)
    if not success:
        raise ValueError("Buffer mapping failed")

        # format='YUYV'
        handler = FORMAT_HANDLERS.get(format)
        if handler is None:
            raise ValueError(f"Unsupported format: {format}")
        return handler(map_info, width, height)

# ---------------------------------------------------------
# Useful functions for working with GStreamer
# ---------------------------------------------------------
def disable_qos(pipeline):
    if not isinstance(pipeline, Gst.Pipeline):
        print("The provided object is not a GStreamer Pipeline")

    it = pipeline.iterate_elements()
    while True:
        result, element = it.next()
        if result != Gst.IteratorResult.OK:

        if 'qos' in GObject.list_properties(element):
            element.set_property('qos', False)
            print(f"Set qos to False for {element.get_name()}")

# -----------------------------------------------------------------------------------------------
# User defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
class lss_callback_class:
    def __init__(self):
        self.frame_count = 0
        self.use_frame = False
        self.frame_queue = multiprocessing.Queue(maxsize=3)
        self.running = True

    def increment(self):
        self.frame_count += 1

    def get_count(self):
        return self.frame_count

    def set_frame(self, frame):
        if not self.frame_queue.full():

    def get_frame(self):
        if not self.frame_queue.empty():
            return self.frame_queue.get()
            return None

# -----------------------------------------------------------------------------------------------
# Common functions
# -----------------------------------------------------------------------------------------------
def get_caps_from_pad(pad: Gst.Pad):
    caps = pad.get_current_caps()
    if caps:
        structure = caps.get_structure(0)
        if structure:
            format = structure.get_value('format')
            width = structure.get_value('width')
            height = structure.get_value('height')
            return format, width, height
    return None, None, None

def display_user_data_frame(lss_data: lss_callback_class):
    while lss_data.running:
        frame = lss_data.get_frame()
        if frame is not None:
            cv2.imshow("User Frame", frame)

def display_lss_data_frame(lss_data: lss_callback_class):
    while lss_data.running:
        frame = lss_data.get_frame()
        if frame is not None:
            # Add custom overlays
            cv2.putText(frame, "Custom Overlay Text", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
            cv2.circle(frame, (frame.shape[1] // 2, frame.shape[0] // 2), 50, (0, 255, 0), 3)
            cv2.imshow("User Frame", frame)

def get_default_parser():
    parser = argparse.ArgumentParser(description="Hailo App Help")
    parser.add_argument("--input", "-i", type=str, default="/dev/video0", help="Input source. Can be a file, USB or RPi camera (CSI camera module). \
                        For RPi camera use '-i rpi' (Still in Beta). \
                        Defaults to /dev/video0")
    parser.add_argument("--use-frame", "-u", action="store_true", help="Use frame from the callback function")
    parser.add_argument("--show-fps", "-f", action="store_true", help="Print FPS on sink")
    parser.add_argument("--disable-sync", action="store_true", help="Disables display sink sync, will run as fast possible. Relevant when using file source.")
    parser.add_argument("--dump-dot", action="store_true", help="Dump the pipeline graph to a dot file pipeline.dot")
    return parser

def QUEUE(name, max_size_buffers=3, max_size_bytes=0, max_size_time=0):
    return f"queue name={name} max-size-buffers={max_size_buffers} max-size-bytes={max_size_bytes} max-size-time={max_size_time} ! "

def get_source_type(input_source):
    if input_source.startswith("/dev/video"):
        return 'usb'
        if input_source.startswith("rpi"):
            return 'rpi'
            return 'file'

def USER_CALLBACK_PIPELINE(name='identity_callback'):
    return f'{QUEUE(name=f"{name}_q")} ! identity name={name}'

# -----------------------------------------------------------------------------------------------
# GStreamerApp class
# -----------------------------------------------------------------------------------------------
class GStreamerApp:
    def __init__(self, args, lss_data: lss_callback_class):
        # def __init__(self, args, process_frame_callback: lss_callback_class):
        # Set the process title
        setproctitle.setproctitle("Hailo Python App")

        # Create an empty options menu
        self.options_menu = args

        # Initialize variables
        tappas_postprocess_dir = os.environ.get('TAPPAS_POST_PROC_DIR', '')
        if tappas_postprocess_dir == '':
            print("TAPPAS_POST_PROC_DIR environment variable is not set. Please set it to by sourcing setup_env.sh")
        self.current_path = os.path.dirname(os.path.abspath(__file__))
        self.postprocess_dir = tappas_postprocess_dir
        self.video_source = self.options_menu.input
        self.source_type = get_source_type(self.video_source)
        # self.user_data = user_data
        self.lss_data = lss_data
        # self.video_sink = "xvimagesink"
        self.video_sink = "autovideosink"
        # self.video_sink = "fakesink"

        # Set Hailo parameters these parameters shuold be set based on the model used
        self.batch_size = 1

        # self.network_width = 640
        # self.network_height = 640

        # self.network_width = 1080
        # self.network_height = 720

        # self.network_width = 640
        # self.network_height = 480

        self.network_width = 320
        self.network_height = 240

        self.network_format = "RGB"
        # self.network_format = "YUYV"

        self.default_postprocess_so = None
        self.hef_path = None
        self.app_callback = None
        self.lss_callback = lss_callback_class

        # Set user data parameters
        lss_data.use_frame = self.options_menu.use_frame

        if (self.options_menu.disable_sync or self.source_type != "file"):
            self.sync = "false"
            self.sync = "true"

        if (self.options_menu.dump_dot):
            os.environ["GST_DEBUG_DUMP_DOT_DIR"] = self.current_path

    def on_fps_measurement(self, sink, fps, droprate, avgfps):
        print(f"FPS: {fps:.2f}, Droprate: {droprate:.2f}, Avg FPS: {avgfps:.2f}")
        return True

    def create_pipeline(self):
        # Initialize GStreamer

        pipeline_string = self.get_pipeline_string()
            self.pipeline = Gst.parse_launch(pipeline_string)

            identity = self.pipeline.get_by_name("identity_callback")
            if identity:
                identity_pad = identity.get_static_pad("src")
                identity_pad.add_probe(Gst.PadProbeType.BUFFER, self.lss_callback, self.lss_data)

        except Exception as e:

        # connect to hailo_display fps-measurements
        if (self.options_menu.show_fps):
            print("Showing FPS")
            self.pipeline.get_by_name("hailo_display").connect("fps-measurements", self.on_fps_measurement)

        # Create a GLib Main Loop
        self.loop = GLib.MainLoop()

    def bus_call(self, bus, message, loop):
        t = message.type
        if t == Gst.MessageType.EOS:
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print(f"Error: {err}, {debug}")
        # QOS
        elif t == Gst.MessageType.QOS:
            # Handle QoS message here
            qos_element = message.src.get_name()
            print(f"QoS message received from {qos_element}")
        return True

    def get_pipeline_string(self):

        return ""

    def dump_dot_file(self):
        print("Dumping dot file...")
        Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
        return False

    def run(self):

        # Add a watch for messages on the pipeline's bus
        bus = self.pipeline.get_bus()
        bus.connect("message", self.bus_call, self.loop)

        # get xvimagesink element and disable qos
        # xvimagesink is instantiated by fpsdisplaysink
        hailo_display = self.pipeline.get_by_name("hailo_display")
        if hailo_display is None:
                "Warning: hailo_display element not found, add <fpsdisplaysink name=hailo_display> to your pipeline to support fps display.")
            xvimagesink = hailo_display.get_by_name("xvimagesink0")
            if xvimagesink is not None:
                xvimagesink.set_property("qos", False)

        # Disable QoS to prevent frame drops

        # start a sub process to run the display_user_data_frame function
        if (self.options_menu.use_frame):
            # display_process = multiprocessing.Process(target=display_user_data_frame, args=(self.user_data,))
            # display_process = multiprocessing.Process(target=display_user_data_frame, args=(self.lss_data,))
            display_process = multiprocessing.Process(target=display_lss_data_frame, args=(self.lss_data,))

        # Set pipeline to PLAYING state

        # dump dot file
        if (self.options_menu.dump_dot):
            GLib.timeout_add_seconds(3, self.dump_dot_file)

        # Run the GLib event loop

        # Clean up
        self.lss_data.running = False
        if (self.options_menu.use_frame):

# -----------------------------------------------------------------------------------------------
# User defined callback function
# -----------------------------------------------------------------------------------------------
def lss_callback(pad, info, lss_data):
    print(f"lss_data id: {id(lss_data)}")
    print(f"lss_data.use_frame: {lss_data.use_frame}")
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK
    format, width, height = get_caps_from_pad(pad)
    print(f"width: {width}")
    print(f"height: {height}")

    # from omria in hailo moderator
    if not format or not width or not height:
        print("Error: Format, width, or height not retrieved from pad caps.")

    string_to_print = f"Frame count: {lss_data.get_count()}\n"

    frame = None  # Initialize frame to None

    if lss_data.use_frame and format and width and height:
        frame = get_numpy_from_buffer(buffer, format, width, height)

        # from omria in hailo moderator
        if frame is not None:
            print(f"Frame shape: {frame.shape}")
            print("Failed to convert buffer to NumPy array.")

        # if frame is None:
        #     print("Failed to get frame from buffer.")
        #     # print(f"Frame shape: {frame.shape}")
        # else:
        #     print(f"Frame shape: {frame.shape}")
        print("Frame not retrieved due to missing format or dimensions.")
    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
    detection_count = 0

    # for detection in detections:
    #     label = detection.get_label()
    #     bbox = detection.get_bbox()
    #     confidence = detection.get_confidence()
    #     print(f"label: {label}, bbox: {bbox}, confidence: {confidence}")
    #     if label != "JJH" and lss_data.use_frame:
    #         frame = lss_data.get_frame()
    #         if frame is not None:
    #             cv2.rectangle(frame, (int(bbox.xmin()), int(bbox.ymin())), (int(bbox.xmax()), int(bbox.ymax())), (0, 255, 0), 2)
    #             x1_norm = bbox.xmin()
    #             y1_norm = bbox.ymin()
    #             x2_norm = bbox.xmax()
    #             y2_norm = bbox.ymax()
    #             x1 = int(x1_norm * width)
    #             y1 = int(y1_norm * height)
    #             x2 = int(x2_norm * width)
    #             y2 = int(y2_norm * height)
    #             okay_bbox = (x1, y1, x2 - x1, y2 - y1)
    #             string_to_print += f"Detection {detection_count}: Label = {label}, BBox = {okay_bbox}, Confidence = {confidence:.2f}\n"
    #             detection_count += 1
    #             output_dir = '/opt/iot/results'
    #             os.makedirs(output_dir, exist_ok=True)
    #             output_path = os.path.join(output_dir, f"frame_{lss_data.get_count()}.jpg")
    #             cv2.imwrite(output_path, cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
    #             print(f"Saved frame to {output_path}")

    for detection in detections:
        label = detection.get_label()
        bbox = detection.get_bbox()
        confidence = detection.get_confidence()

        if frame is not None:
                (int(bbox.xmin()), int(bbox.ymin())),
                (int(bbox.xmax()), int(bbox.ymax())),
                (0, 255, 0),
                f"{label}: {confidence:.2f}",
                (int(bbox.xmin()), int(bbox.ymin()) - 10),
                (0, 255, 0),

    if detection_count > 0:
        string_to_print += f"Total detections: {detection_count}\n"
    if lss_data.use_frame and frame is not None:
        cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
    cv2.putText(frame, "Custom Text", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    cv2.circle(frame, (width // 2, height // 2), 30, (0, 255, 255), 3)
    cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    return Gst.PadProbeReturn.OK
# -----------------------------------------------------------------------------------------------
# User Gstreamer Application
# -----------------------------------------------------------------------------------------------
class GStreamerDetectionApp(GStreamerApp):
    def __init__(self, args, lss_data):
        super().__init__(args, lss_data)
        self.batch_size = 2

        # self.network_width = 640
        # self.network_height = 640

        # self.network_width = 1080
        # self.network_height = 720

        # self.network_width = 640
        # self.network_height = 480

        self.network_width = 320
        self.network_height = 240

        self.network_format = "RGB"
        # self.network_format = "YUYV"
        self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')
        # self.thresholds_str = ""  # Initialize thresholds_str as empty string

        if args.network == "yolov6n":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolov6n.hef')
        elif args.network == "yolov8s":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolov8s_h8l.hef')
        elif args.network == "yolov8n":
            nms_score_threshold = 0.1
            nms_iou_threshold = 0.1

            # nms_score_threshold = 0.3
            # nms_iou_threshold = 0.45

            self.thresholds_str = f"nms-score-threshold={nms_score_threshold} nms-iou-threshold={nms_iou_threshold} output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
            self.hef_path = os.path.join(self.current_path, './hailomodel/eeg_employee_2.hef')
        elif args.network == "yolox_s_leaky":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolox_s_leaky_h8l_mz.hef')
            assert False, "Invalid network type"
        self.lss_callback = lss_callback
        setproctitle.setproctitle("Hailo Detection App")

    def get_pipeline_string(self):
        if (self.source_type == "rpi"):
            source_element = f"libcamerasrc name=src_0 auto-focus-mode=2 ! "
            source_element += f"video/x-raw, format={self.network_format}, width=1536, height=864 ! "
            source_element += QUEUE("queue_src_scale")
            source_element += f"videoscale ! "
            source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "

        elif (self.source_type == "usb"):
            # source_element = f"v4l2src device={self.video_source} name=src_0 ! "
            # source_element += f"video/x-raw, width=640, height=480, framerate=30/1 ! "

            # # from omria in hailo moderator
            # source_element = f"v4l2src device={self.video_source} name=src_0 ! "
            # source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "
            # # source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=5/1 ! "
            # # source_element += f"video/x-raw, format={'YUYV'}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "

            source_element = f"v4l2src device={self.video_source} name=src_0 ! "
            # Use JPEG encoding and specify resolution and framerate
            source_element += f"image/jpeg, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "
            source_element += f"jpegdec ! "  # Decode JPEG back to raw video
            source_element += f"video/x-raw, format={self.network_format} ! "  # Ensure correct format after decoding

            source_element = f"filesrc location={self.video_source} name=src_0 ! "
            source_element += QUEUE("queue_dec264")
            source_element += f" qtdemux ! h264parse ! avdec_h264 max-threads=2 ! "
            source_element += f" video/x-raw,format=I420 ! "
        source_element += QUEUE("queue_scale")
        source_element += f" videoscale n-threads=2 ! "
        source_element += QUEUE("queue_src_convert")
        source_element += f" videoconvert n-threads=3 name=src_convert qos=false ! "
        source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! "

        pipeline_string = "hailomuxer name=hmux "
        pipeline_string += source_element
        pipeline_string += "tee name=t ! "
        pipeline_string += QUEUE("bypass_queue", max_size_buffers=20) + "hmux.sink_0 "
        pipeline_string += "t. ! " + QUEUE("queue_hailonet")
        pipeline_string += "videoconvert n-threads=3 ! "

        pipeline_string += f"capsfilter caps=\"video/x-raw,format=RGB,width={self.network_width},height={self.network_height}\" ! "
        pipeline_string += f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! "
        pipeline_string += QUEUE("queue_hailofilter")

        self.json_config_path = "./resource/eeg_employee_2.json"
        pipeline_string += f"hailofilter function-name=\"filter\" so-path={self.default_postprocess_so} config-path={self.json_config_path} qos=false ! "

        pipeline_string += QUEUE("queue_hmuc") + " hmux.sink_1 "
        pipeline_string += "hmux. ! " + QUEUE("queue_hailo_python")
        pipeline_string += QUEUE("queue_user_callback")
        pipeline_string += f"identity name=identity_callback ! "
        pipeline_string += QUEUE("queue_hailooverlay")
        pipeline_string += f"hailooverlay ! "
        pipeline_string += QUEUE("queue_videoconvert")
        pipeline_string += f"videoconvert n-threads=3 qos=false ! "
        pipeline_string += QUEUE("queue_hailo_display")
        pipeline_string += f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true "
        return pipeline_string

if __name__ == "__main__":
    parser = get_default_parser()
    # Add additional arguments here
    parser.add_argument("--network", default="yolov8n", choices=['yolov6n', 'yolov8s', 'yolox_s_leaky'],
                        help="Which Network to use, defult is yolov6n")
    args = parser.parse_args()
    lss_data = lss_callback_class()
    # app = GStreamerDetectionApp(args, user_data)
    app = GStreamerDetectionApp(args, lss_data)
    print("Starting GStreamerDetectionApp")
    begin = time.time()
    print("Application run completed")
    end = time.time()
    print("Total time: ", 733 / (end - begin))