[6th]Error during editing main window : Failed to map buffer for writing

Hi, Omria
Recently, I came to know that this command is working and I can see live streaming from it.

gst-launch-1.0 v4l2src device=/dev/video0 ! image/jpeg,width=320,height=240,framerate=15/1 ! jpegdec ! autovideosink

Could you help me modify my combined script to edit the main window?

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import json
import hailo
import supervision as sv

# Try to import hailo python module
try:
    import hailo
except ImportError:
    exit("Failed to import hailo python module. Make sure you are in hailo virtual environment.")

# ---------------------------------------------------------
# Functions used to get numpy arrays from GStreamer buffers
# ---------------------------------------------------------
def handle_rgb(map_info, width, height):
    return np.ndarray(shape=(height, width, 3), dtype=np.uint8, buffer=map_info.data).copy()

def handle_nv12(map_info, width, height):
    y_plane_size = width * height
    uv_plane_size = width * height // 2
    y_plane = np.ndarray(shape=(height, width), dtype=np.uint8, buffer=map_info.data[:y_plane_size]).copy()
    uv_plane = np.ndarray(shape=(height // 2, width // 2, 2), dtype=np.uint8,
                          buffer=map_info.data[y_plane_size:]).copy()
    return y_plane, uv_plane

def handle_yuyv(map_info, width, height):
    return np.ndarray(shape=(height, width, 2), dtype=np.uint8, buffer=map_info.data).copy()

FORMAT_HANDLERS = {
    'RGB': handle_rgb,
    'NV12': handle_nv12,
    'YUYV': handle_yuyv,
}

def get_numpy_from_buffer(buffer, format, width, height):
    success, map_info = buffer.map(Gst.MapFlags.READ)
    if not success:
        raise ValueError("Buffer mapping failed")

    try:
        # format='YUYV'
        handler = FORMAT_HANDLERS.get(format)
        if handler is None:
            raise ValueError(f"Unsupported format: {format}")
        return handler(map_info, width, height)
    finally:
        buffer.unmap(map_info)

# ---------------------------------------------------------
# Useful functions for working with GStreamer
# ---------------------------------------------------------
def disable_qos(pipeline):
    if not isinstance(pipeline, Gst.Pipeline):
        print("The provided object is not a GStreamer Pipeline")
        return

    it = pipeline.iterate_elements()
    while True:
        result, element = it.next()
        if result != Gst.IteratorResult.OK:
            break

        if 'qos' in GObject.list_properties(element):
            element.set_property('qos', False)
            print(f"Set qos to False for {element.get_name()}")

# -----------------------------------------------------------------------------------------------
# User defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
class lss_callback_class:
    def __init__(self):
        self.frame_count = 0
        self.use_frame = False
        self.frame_queue = multiprocessing.Queue(maxsize=3)
        self.running = True

    def increment(self):
        self.frame_count += 1

    def get_count(self):
        return self.frame_count

    def set_frame(self, frame):
        if not self.frame_queue.full():
            self.frame_queue.put(frame)

    def get_frame(self):
        if not self.frame_queue.empty():
            return self.frame_queue.get()
        else:
            return None

# -----------------------------------------------------------------------------------------------
# Common functions
# -----------------------------------------------------------------------------------------------
def get_caps_from_pad(pad: Gst.Pad):
    caps = pad.get_current_caps()
    if caps:
        structure = caps.get_structure(0)
        if structure:
            format = structure.get_value('format')
            width = structure.get_value('width')
            height = structure.get_value('height')
            return format, width, height
    return None, None, None

def display_user_data_frame(lss_data: lss_callback_class):
    while lss_data.running:
        frame = lss_data.get_frame()
        if frame is not None:
            cv2.imshow("User Frame", frame)
        cv2.waitKey(1)
    cv2.destroyAllWindows()

def display_lss_data_frame(lss_data: lss_callback_class):
    while lss_data.running:
        frame = lss_data.get_frame()
        if frame is not None:
            # Add custom overlays
            cv2.putText(frame, "Custom Overlay Text", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
            cv2.circle(frame, (frame.shape[1] // 2, frame.shape[0] // 2), 50, (0, 255, 0), 3)
            cv2.imshow("User Frame", frame)
            cv2.waitKey(1)
    cv2.destroyAllWindows()

def get_default_parser():
    parser = argparse.ArgumentParser(description="Hailo App Help")
    parser.add_argument("--input", "-i", type=str, default="/dev/video0", help="Input source. Can be a file, USB or RPi camera (CSI camera module). \
                        For RPi camera use '-i rpi' (Still in Beta). \
                        Defaults to /dev/video0")
    parser.add_argument("--use-frame", "-u", action="store_true", help="Use frame from the callback function")
    parser.add_argument("--show-fps", "-f", action="store_true", help="Print FPS on sink")
    parser.add_argument("--disable-sync", action="store_true", help="Disables display sink sync, will run as fast possible. Relevant when using file source.")
    parser.add_argument("--dump-dot", action="store_true", help="Dump the pipeline graph to a dot file pipeline.dot")
    return parser

def QUEUE(name, max_size_buffers=3, max_size_bytes=0, max_size_time=0):
    return f"queue name={name} max-size-buffers={max_size_buffers} max-size-bytes={max_size_bytes} max-size-time={max_size_time} ! "

def get_source_type(input_source):
    if input_source.startswith("/dev/video"):
        return 'usb'
    else:
        if input_source.startswith("rpi"):
            return 'rpi'
        else:
            return 'file'

def USER_CALLBACK_PIPELINE(name='identity_callback'):
    return f'{QUEUE(name=f"{name}_q")} ! identity name={name}'

# -----------------------------------------------------------------------------------------------
# GStreamerApp class
# -----------------------------------------------------------------------------------------------
class GStreamerApp:
    def __init__(self, args, lss_data: lss_callback_class):
        # def __init__(self, args, process_frame_callback: lss_callback_class):
        # Set the process title
        setproctitle.setproctitle("Hailo Python App")

        # Create an empty options menu
        self.options_menu = args

        # Initialize variables
        tappas_postprocess_dir = os.environ.get('TAPPAS_POST_PROC_DIR', '')
        if tappas_postprocess_dir == '':
            print("TAPPAS_POST_PROC_DIR environment variable is not set. Please set it to by sourcing setup_env.sh")
            exit(1)
        self.current_path = os.path.dirname(os.path.abspath(__file__))
        self.postprocess_dir = tappas_postprocess_dir
        self.video_source = self.options_menu.input
        self.source_type = get_source_type(self.video_source)
        # self.user_data = user_data
        self.lss_data = lss_data
        # self.video_sink = "xvimagesink"
        self.video_sink = "autovideosink"
        # self.video_sink = "fakesink"

        # Set Hailo parameters these parameters shuold be set based on the model used
        self.batch_size = 1

        # self.network_width = 640
        # self.network_height = 640

        # self.network_width = 1080
        # self.network_height = 720

        # self.network_width = 640
        # self.network_height = 480

        self.network_width = 320
        self.network_height = 240

        self.network_format = "RGB"
        # self.network_format = "YUYV"

        self.default_postprocess_so = None
        self.hef_path = None
        self.app_callback = None
        self.lss_callback = lss_callback_class

        # Set user data parameters
        lss_data.use_frame = self.options_menu.use_frame

        if (self.options_menu.disable_sync or self.source_type != "file"):
            self.sync = "false"
        else:
            self.sync = "true"

        if (self.options_menu.dump_dot):
            os.environ["GST_DEBUG_DUMP_DOT_DIR"] = self.current_path

    def on_fps_measurement(self, sink, fps, droprate, avgfps):
        print(f"FPS: {fps:.2f}, Droprate: {droprate:.2f}, Avg FPS: {avgfps:.2f}")
        return True

    def create_pipeline(self):
        # Initialize GStreamer
        Gst.init(None)

        pipeline_string = self.get_pipeline_string()
        try:
            self.pipeline = Gst.parse_launch(pipeline_string)

            identity = self.pipeline.get_by_name("identity_callback")
            if identity:
                identity_pad = identity.get_static_pad("src")
                identity_pad.add_probe(Gst.PadProbeType.BUFFER, self.lss_callback, self.lss_data)

        except Exception as e:
            print(e)
            print(pipeline_string)
            exit(1)

        # connect to hailo_display fps-measurements
        if (self.options_menu.show_fps):
            print("Showing FPS")
            self.pipeline.get_by_name("hailo_display").connect("fps-measurements", self.on_fps_measurement)

        # Create a GLib Main Loop
        self.loop = GLib.MainLoop()

    def bus_call(self, bus, message, loop):
        t = message.type
        if t == Gst.MessageType.EOS:
            print("End-of-stream")
            loop.quit()
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print(f"Error: {err}, {debug}")
            loop.quit()
        # QOS
        elif t == Gst.MessageType.QOS:
            # Handle QoS message here
            qos_element = message.src.get_name()
            print(f"QoS message received from {qos_element}")
        return True

    def get_pipeline_string(self):

        return ""

    def dump_dot_file(self):
        print("Dumping dot file...")
        Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
        return False

    def run(self):

        # Add a watch for messages on the pipeline's bus
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", self.bus_call, self.loop)

        # get xvimagesink element and disable qos
        # xvimagesink is instantiated by fpsdisplaysink
        hailo_display = self.pipeline.get_by_name("hailo_display")
        if hailo_display is None:
            print(
                "Warning: hailo_display element not found, add <fpsdisplaysink name=hailo_display> to your pipeline to support fps display.")
        else:
            xvimagesink = hailo_display.get_by_name("xvimagesink0")
            if xvimagesink is not None:
                xvimagesink.set_property("qos", False)

        # Disable QoS to prevent frame drops
        disable_qos(self.pipeline)

        # start a sub process to run the display_user_data_frame function
        if (self.options_menu.use_frame):
            # display_process = multiprocessing.Process(target=display_user_data_frame, args=(self.user_data,))
            # display_process = multiprocessing.Process(target=display_user_data_frame, args=(self.lss_data,))
            display_process = multiprocessing.Process(target=display_lss_data_frame, args=(self.lss_data,))
            display_process.start()

        # Set pipeline to PLAYING state
        self.pipeline.set_state(Gst.State.PLAYING)

        # dump dot file
        if (self.options_menu.dump_dot):
            GLib.timeout_add_seconds(3, self.dump_dot_file)

        # Run the GLib event loop
        try:
            self.loop.run()
        except:
            pass

        # Clean up
        self.lss_data.running = False
        self.pipeline.set_state(Gst.State.NULL)
        if (self.options_menu.use_frame):
            display_process.terminate()
            display_process.join()

# -----------------------------------------------------------------------------------------------
# User defined callback function
# -----------------------------------------------------------------------------------------------
def lss_callback(pad, info, lss_data):
    print(f"lss_data id: {id(lss_data)}")
    print(f"lss_data.use_frame: {lss_data.use_frame}")
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK
    format, width, height = get_caps_from_pad(pad)
    print(f"width: {width}")
    print(f"height: {height}")

    # from omria in hailo moderator
    if not format or not width or not height:
        print("Error: Format, width, or height not retrieved from pad caps.")


    string_to_print = f"Frame count: {lss_data.get_count()}\n"

    frame = None  # Initialize frame to None

    if lss_data.use_frame and format and width and height:
        frame = get_numpy_from_buffer(buffer, format, width, height)

        # from omria in hailo moderator
        if frame is not None:
            print(f"Frame shape: {frame.shape}")
        else:
            print("Failed to convert buffer to NumPy array.")

        # if frame is None:
        #     print("Failed to get frame from buffer.")
        #     # print(f"Frame shape: {frame.shape}")
        # else:
        #     print(f"Frame shape: {frame.shape}")
    else:
        print("Frame not retrieved due to missing format or dimensions.")
    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
    detection_count = 0

    # for detection in detections:
    #     label = detection.get_label()
    #     bbox = detection.get_bbox()
    #     confidence = detection.get_confidence()
    #     print(f"label: {label}, bbox: {bbox}, confidence: {confidence}")
    #     if label != "JJH" and lss_data.use_frame:
    #         frame = lss_data.get_frame()
    #         if frame is not None:
    #             cv2.rectangle(frame, (int(bbox.xmin()), int(bbox.ymin())), (int(bbox.xmax()), int(bbox.ymax())), (0, 255, 0), 2)
    #             x1_norm = bbox.xmin()
    #             y1_norm = bbox.ymin()
    #             x2_norm = bbox.xmax()
    #             y2_norm = bbox.ymax()
    #             x1 = int(x1_norm * width)
    #             y1 = int(y1_norm * height)
    #             x2 = int(x2_norm * width)
    #             y2 = int(y2_norm * height)
    #             okay_bbox = (x1, y1, x2 - x1, y2 - y1)
    #             string_to_print += f"Detection {detection_count}: Label = {label}, BBox = {okay_bbox}, Confidence = {confidence:.2f}\n"
    #             detection_count += 1
    #             output_dir = '/opt/iot/results'
    #             os.makedirs(output_dir, exist_ok=True)
    #             output_path = os.path.join(output_dir, f"frame_{lss_data.get_count()}.jpg")
    #             cv2.imwrite(output_path, cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
    #             print(f"Saved frame to {output_path}")

    for detection in detections:
        label = detection.get_label()
        bbox = detection.get_bbox()
        confidence = detection.get_confidence()

        if frame is not None:
            cv2.rectangle(
                frame,
                (int(bbox.xmin()), int(bbox.ymin())),
                (int(bbox.xmax()), int(bbox.ymax())),
                (0, 255, 0),
                2
            )
            cv2.putText(
                frame,
                f"{label}: {confidence:.2f}",
                (int(bbox.xmin()), int(bbox.ymin()) - 10),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.5,
                (0, 255, 0),
                2
            )

    if detection_count > 0:
        string_to_print += f"Total detections: {detection_count}\n"
    if lss_data.use_frame and frame is not None:
        cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        lss_data.set_frame(frame)
    cv2.putText(frame, "Custom Text", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    cv2.circle(frame, (width // 2, height // 2), 30, (0, 255, 255), 3)
    cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    print(string_to_print)
    return Gst.PadProbeReturn.OK
# -----------------------------------------------------------------------------------------------
# User Gstreamer Application
# -----------------------------------------------------------------------------------------------
class GStreamerDetectionApp(GStreamerApp):
    def __init__(self, args, lss_data):
        super().__init__(args, lss_data)
        self.batch_size = 2

        # self.network_width = 640
        # self.network_height = 640

        # self.network_width = 1080
        # self.network_height = 720

        # self.network_width = 640
        # self.network_height = 480

        self.network_width = 320
        self.network_height = 240

        self.network_format = "RGB"
        # self.network_format = "YUYV"
        self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')
        # self.thresholds_str = ""  # Initialize thresholds_str as empty string

        if args.network == "yolov6n":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolov6n.hef')
        elif args.network == "yolov8s":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolov8s_h8l.hef')
        elif args.network == "yolov8n":
            nms_score_threshold = 0.1
            nms_iou_threshold = 0.1

            # nms_score_threshold = 0.3
            # nms_iou_threshold = 0.45

            self.thresholds_str = f"nms-score-threshold={nms_score_threshold} nms-iou-threshold={nms_iou_threshold} output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
            self.hef_path = os.path.join(self.current_path, './hailomodel/eeg_employee_2.hef')
        elif args.network == "yolox_s_leaky":
            self.hef_path = os.path.join(self.current_path, './hailomodel/yolox_s_leaky_h8l_mz.hef')
        else:
            assert False, "Invalid network type"
        self.lss_callback = lss_callback
        setproctitle.setproctitle("Hailo Detection App")
        self.create_pipeline()

    def get_pipeline_string(self):
        if (self.source_type == "rpi"):
            source_element = f"libcamerasrc name=src_0 auto-focus-mode=2 ! "
            source_element += f"video/x-raw, format={self.network_format}, width=1536, height=864 ! "
            source_element += QUEUE("queue_src_scale")
            source_element += f"videoscale ! "
            source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "

        elif (self.source_type == "usb"):
            # source_element = f"v4l2src device={self.video_source} name=src_0 ! "
            # source_element += f"video/x-raw, width=640, height=480, framerate=30/1 ! "

            # # from omria in hailo moderator
            # source_element = f"v4l2src device={self.video_source} name=src_0 ! "
            # source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "
            # # source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=5/1 ! "
            # # source_element += f"video/x-raw, format={'YUYV'}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "

            source_element = f"v4l2src device={self.video_source} name=src_0 ! "
            # Use JPEG encoding and specify resolution and framerate
            source_element += f"image/jpeg, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "
            source_element += f"jpegdec ! "  # Decode JPEG back to raw video
            source_element += f"video/x-raw, format={self.network_format} ! "  # Ensure correct format after decoding

        else:
            source_element = f"filesrc location={self.video_source} name=src_0 ! "
            source_element += QUEUE("queue_dec264")
            source_element += f" qtdemux ! h264parse ! avdec_h264 max-threads=2 ! "
            source_element += f" video/x-raw,format=I420 ! "
        source_element += QUEUE("queue_scale")
        source_element += f" videoscale n-threads=2 ! "
        source_element += QUEUE("queue_src_convert")
        source_element += f" videoconvert n-threads=3 name=src_convert qos=false ! "
        source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! "

        pipeline_string = "hailomuxer name=hmux "
        pipeline_string += source_element
        pipeline_string += "tee name=t ! "
        pipeline_string += QUEUE("bypass_queue", max_size_buffers=20) + "hmux.sink_0 "
        pipeline_string += "t. ! " + QUEUE("queue_hailonet")
        pipeline_string += "videoconvert n-threads=3 ! "

        pipeline_string += f"capsfilter caps=\"video/x-raw,format=RGB,width={self.network_width},height={self.network_height}\" ! "
        
        pipeline_string += f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! "
        pipeline_string += QUEUE("queue_hailofilter")

        self.json_config_path = "./resource/eeg_employee_2.json"
        pipeline_string += f"hailofilter function-name=\"filter\" so-path={self.default_postprocess_so} config-path={self.json_config_path} qos=false ! "

        pipeline_string += QUEUE("queue_hmuc") + " hmux.sink_1 "
        pipeline_string += "hmux. ! " + QUEUE("queue_hailo_python")
        pipeline_string += QUEUE("queue_user_callback")
        pipeline_string += f"identity name=identity_callback ! "
        pipeline_string += QUEUE("queue_hailooverlay")
        pipeline_string += f"hailooverlay ! "
        pipeline_string += QUEUE("queue_videoconvert")
        pipeline_string += f"videoconvert n-threads=3 qos=false ! "
        pipeline_string += QUEUE("queue_hailo_display")
        pipeline_string += f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true "
        print(pipeline_string)
        return pipeline_string

if __name__ == "__main__":
    parser = get_default_parser()
    # Add additional arguments here
    parser.add_argument("--network", default="yolov8n", choices=['yolov6n', 'yolov8s', 'yolox_s_leaky'],
                        help="Which Network to use, defult is yolov6n")
    args = parser.parse_args()
    lss_data = lss_callback_class()
    # app = GStreamerDetectionApp(args, user_data)
    app = GStreamerDetectionApp(args, lss_data)
    print("Starting GStreamerDetectionApp")
    begin = time.time()
    app.run()
    print("Application run completed")
    end = time.time()
    print("Total time: ", 733 / (end - begin))