Hello,
I think that it’s getting more difficult since I tried to resolve this error.
-
This is my custom code. It’s almost the with example code. but I connected my custom hef. Then, I can see just inference through live streaming from a USB camera.
There is no problem. -
I want to edit its main window. Below is its code.
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import json
import hailo
# import supervision as sv
import sys
# Try to import hailo python module
try:
import hailo
except ImportError:
exit("Failed to import hailo python module. Make sure you are in hailo virtual environment.")
# ---------------------------------------------------------
# Functions used to get numpy arrays from GStreamer buffers
# ---------------------------------------------------------
def handle_rgb(map_info, width, height):
return np.ndarray(shape=(height, width, 3), dtype=np.uint8, buffer=map_info.data).copy()
def handle_nv12(map_info, width, height):
y_plane_size = width * height
uv_plane_size = width * height // 2
y_plane = np.ndarray(shape=(height, width), dtype=np.uint8, buffer=map_info.data[:y_plane_size]).copy()
uv_plane = np.ndarray(shape=(height // 2, width // 2, 2), dtype=np.uint8,
buffer=map_info.data[y_plane_size:]).copy()
return y_plane, uv_plane
def handle_yuyv(map_info, width, height):
return np.ndarray(shape=(height, width, 2), dtype=np.uint8, buffer=map_info.data).copy()
FORMAT_HANDLERS = {
'RGB': handle_rgb,
'NV12': handle_nv12,
'YUYV': handle_yuyv,
}
def get_numpy_from_buffer(buffer, format, width, height):
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success:
raise ValueError("Buffer mapping failed")
try:
# format='YUYV'
handler = FORMAT_HANDLERS.get(format)
if handler is None:
raise ValueError(f"Unsupported format: {format}")
return handler(map_info, width, height)
finally:
buffer.unmap(map_info)
# ---------------------------------------------------------
# Useful functions for working with GStreamer
# ---------------------------------------------------------
def disable_qos(pipeline):
if not isinstance(pipeline, Gst.Pipeline):
print("The provided object is not a GStreamer Pipeline")
return
it = pipeline.iterate_elements()
while True:
result, element = it.next()
if result != Gst.IteratorResult.OK:
break
if 'qos' in GObject.list_properties(element):
element.set_property('qos', False)
print(f"Set qos to False for {element.get_name()}")
# -----------------------------------------------------------------------------------------------
# User defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
class lss_callback_class:
def __init__(self):
self.frame_count = 0
self.use_frame = False
self.frame_queue = multiprocessing.Queue(maxsize=3)
self.running = True
def increment(self):
self.frame_count += 1
def get_count(self):
return self.frame_count
def set_frame(self, frame):
if not self.frame_queue.full():
self.frame_queue.put(frame)
def get_frame(self):
if not self.frame_queue.empty():
return self.frame_queue.get()
else:
return None
# -----------------------------------------------------------------------------------------------
# Common functions
# -----------------------------------------------------------------------------------------------
def get_caps_from_pad(pad: Gst.Pad):
caps = pad.get_current_caps()
if caps:
structure = caps.get_structure(0)
if structure:
format = structure.get_value('format')
width = structure.get_value('width')
height = structure.get_value('height')
return format, width, height
return None, None, None
def display_user_data_frame(lss_data: lss_callback_class):
while lss_data.running:
frame = lss_data.get_frame()
if frame is not None:
cv2.imshow("User Frame", frame)
cv2.waitKey(1)
cv2.destroyAllWindows()
def display_lss_data_frame(lss_data: lss_callback_class):
while lss_data.running:
frame = lss_data.get_frame()
if frame is not None:
# Add custom overlays
cv2.putText(frame, "Custom Overlay Text", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
cv2.circle(frame, (frame.shape[1] // 2, frame.shape[0] // 2), 50, (0, 255, 0), 3)
cv2.imshow("User Frame", frame)
cv2.waitKey(1)
cv2.destroyAllWindows()
def get_default_parser():
parser = argparse.ArgumentParser(description="Hailo App Help")
parser.add_argument("--input", "-i", type=str, default="/dev/video0", help="Input source. Can be a file, USB or RPi camera (CSI camera module). \
For RPi camera use '-i rpi' (Still in Beta). \
Defaults to /dev/video0")
parser.add_argument("--use-frame", "-u", action="store_true", help="Use frame from the callback function")
parser.add_argument("--show-fps", "-f", action="store_true", help="Print FPS on sink")
parser.add_argument("--disable-sync", action="store_true", help="Disables display sink sync, will run as fast possible. Relevant when using file source.")
parser.add_argument("--dump-dot", action="store_true", help="Dump the pipeline graph to a dot file pipeline.dot")
return parser
def QUEUE(name, max_size_buffers=3, max_size_bytes=0, max_size_time=0):
return f"queue name={name} max-size-buffers={max_size_buffers} max-size-bytes={max_size_bytes} max-size-time={max_size_time} ! "
def get_source_type(input_source):
if input_source.startswith("/dev/video"):
return 'usb'
else:
if input_source.startswith("rpi"):
return 'rpi'
else:
return 'file'
def USER_CALLBACK_PIPELINE(name='identity_callback'):
return f'{QUEUE(name=f"{name}_q")} ! identity name={name}'
# -----------------------------------------------------------------------------------------------
# GStreamerApp class
# -----------------------------------------------------------------------------------------------
class GStreamerApp:
def __init__(self, args, lss_data: lss_callback_class):
# def __init__(self, args, process_frame_callback: lss_callback_class):
# Set the process title
setproctitle.setproctitle("Hailo Python App")
# Create an empty options menu
self.options_menu = args
# Initialize variables
tappas_postprocess_dir = os.environ.get('TAPPAS_POST_PROC_DIR', '')
if tappas_postprocess_dir == '':
print("TAPPAS_POST_PROC_DIR environment variable is not set. Please set it to by sourcing setup_env.sh")
exit(1)
self.current_path = os.path.dirname(os.path.abspath(__file__))
self.postprocess_dir = tappas_postprocess_dir
self.video_source = self.options_menu.input
self.source_type = get_source_type(self.video_source)
# self.user_data = user_data
self.lss_data = lss_data
self.video_sink = "xvimagesink"
# self.video_sink = "autovideosink"
# self.video_sink = "fakesink"
# Set Hailo parameters these parameters shuold be set based on the model used
self.batch_size = 1
# self.network_width = 1280
# self.network_height = 720
self.network_width = 640
self.network_height = 320
self.network_format = "RGB"
# self.network_format = "YUYV"
self.default_postprocess_so = None
self.hef_path = None
self.app_callback = None
self.lss_callback = lss_callback_class
# Set user data parameters
lss_data.use_frame = self.options_menu.use_frame
if (self.options_menu.disable_sync or self.source_type != "file"):
self.sync = "false"
else:
self.sync = "true"
if (self.options_menu.dump_dot):
os.environ["GST_DEBUG_DUMP_DOT_DIR"] = self.current_path
def on_fps_measurement(self, sink, fps, droprate, avgfps):
print(f"FPS: {fps:.2f}, Droprate: {droprate:.2f}, Avg FPS: {avgfps:.2f}")
return True
def create_pipeline(self):
# Initialize GStreamer
Gst.init(None)
pipeline_string = self.get_pipeline_string()
try:
self.pipeline = Gst.parse_launch(pipeline_string)
identity = self.pipeline.get_by_name("identity_callback")
if identity:
identity_pad = identity.get_static_pad("src")
identity_pad.add_probe(Gst.PadProbeType.BUFFER, self.lss_callback, self.lss_data)
except Exception as e:
print(e)
print(pipeline_string)
exit(1)
# connect to hailo_display fps-measurements
if (self.options_menu.show_fps):
print("Showing FPS")
self.pipeline.get_by_name("hailo_display").connect("fps-measurements", self.on_fps_measurement)
# Create a GLib Main Loop
self.loop = GLib.MainLoop()
# internal builted and let them know when there is a event.
def bus_call(self, bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
print("End-of-stream")
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Error: {err}, {debug}")
loop.quit()
# QOS
elif t == Gst.MessageType.QOS:
# Handle QoS message here
qos_element = message.src.get_name()
print(f"QoS message received from {qos_element}")
return True
def get_pipeline_string(self):
return ""
def dump_dot_file(self):
print("Dumping dot file...")
Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
return False
def run(self):
# Add a watch for messages on the pipeline's bus
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self.bus_call, self.loop)
# get xvimagesink element and disable qos
# xvimagesink is instantiated by fpsdisplaysink
hailo_display = self.pipeline.get_by_name("hailo_display")
if hailo_display is None:
print(
"Warning: hailo_display element not found, add <fpsdisplaysink name=hailo_display> to your pipeline to support fps display.")
else:
xvimagesink = hailo_display.get_by_name("xvimagesink0")
if xvimagesink is not None:
xvimagesink.set_property("qos", False)
# Disable QoS to prevent frame drops
disable_qos(self.pipeline)
# start a sub process to run the display_user_data_frame function
if (self.options_menu.use_frame):
# display_process = multiprocessing.Process(target=display_user_data_frame, args=(self.user_data,))
# display_process = multiprocessing.Process(target=display_user_data_frame, args=(self.lss_data,))
display_process = multiprocessing.Process(target=display_lss_data_frame, args=(self.lss_data,))
display_process.start()
# Set pipeline to PLAYING state
self.pipeline.set_state(Gst.State.PLAYING)
# dump dot file
if (self.options_menu.dump_dot):
GLib.timeout_add_seconds(3, self.dump_dot_file)
# Run the GLib event loop and wait event in the loop.
try:
self.loop.run()
except:
pass
# Clean up
self.lss_data.running = False
self.pipeline.set_state(Gst.State.NULL)
if (self.options_menu.use_frame):
display_process.terminate()
display_process.join()
# -----------------------------------------------------------------------------------------------
# User defined callback function
# -----------------------------------------------------------------------------------------------
def lss_callback(pad, info, lss_data):
print(f"lss_data id: {id(lss_data)}")
print(f"lss_data.use_frame: {lss_data.use_frame}")
buffer = info.get_buffer()
if buffer is None:
return Gst.PadProbeReturn.OK
format, width, height = get_caps_from_pad(pad)
print(f"width: {width}")
print(f"height: {height}")
# # from omria in hailo moderator
# if not format or not width or not height:
# print("Error: Format, width, or height not retrieved from pad caps.")
if not format or not width or not height:
print("Error: Format, width, or height not retrieved from pad caps.", file=sys.stderr) # Print to stderr
return Gst.PadProbeReturn.DROP # Drop the buffer
string_to_print = f"Frame count: {lss_data.get_count()}\n"
frame = None # Initialize frame to None
if lss_data.use_frame and format and width and height:
frame = get_numpy_from_buffer(buffer, format, width, height)
# from omria in hailo moderator
if frame is not None:
print(f"Frame shape: {frame.shape}")
else:
print("Failed to convert buffer to NumPy array.")
else:
print("Frame not retrieved due to missing format or dimensions.")
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
detection_count = 0
for detection in detections:
label = detection.get_label()
bbox = detection.get_bbox()
confidence = detection.get_confidence()
if frame is not None:
cv2.rectangle(
frame,
(int(bbox.xmin()), int(bbox.ymin())),
(int(bbox.xmax()), int(bbox.ymax())),
(0, 255, 0),
2
)
cv2.putText(
frame,
f"{label}: {confidence:.2f}",
(int(bbox.xmin()), int(bbox.ymin()) - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 255, 0),
2
)
if detection_count > 0:
string_to_print += f"Total detections: {detection_count}\n"
if lss_data.use_frame and frame is not None:
cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
lss_data.set_frame(frame)
cv2.putText(frame, "Custom Text", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
cv2.circle(frame, (width // 2, height // 2), 30, (0, 255, 255), 3)
cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
print(string_to_print)
return Gst.PadProbeReturn.OK
# -----------------------------------------------------------------------------------------------
# User Gstreamer Application
# -----------------------------------------------------------------------------------------------
class GStreamerDetectionApp(GStreamerApp):
def __init__(self, args, lss_data):
super().__init__(args, lss_data)
# Set batch size (default to 1 for single-frame processing)
self.batch_size = 1
# Set network dimensions (default to 1280x720 for typical use cases)
# self.network_width = 1280
# self.network_height = 720
self.network_width = 640
self.network_height = 360
# Set network format (default to RGB for Hailo compatibility)
self.network_format = "RGB"
# self.network_format = "YUYV" # Uncomment if YUYV is required
# Set default post-processing shared object path
self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')
# Initialize thresholds string (used for NMS thresholds)
self.thresholds_str = "" # Initialize thresholds_str as empty string
# Set HEF path based on the network type
if args.network == "yolov6n":
self.hef_path = os.path.join(self.current_path, './hailomodel/yolov6n.hef')
elif args.network == "yolov8s":
self.hef_path = os.path.join(self.current_path, './hailomodel/yolov8s_h8l.hef')
elif args.network == "yolov8n":
# Set NMS thresholds for YOLOv8n
nms_score_threshold = 0.1
nms_iou_threshold = 0.1
# Alternative thresholds (commented out)
# nms_score_threshold = 0.3
# nms_iou_threshold = 0.45
# Construct thresholds string for YOLOv8n
self.thresholds_str = f"nms-score-threshold={nms_score_threshold} nms-iou-threshold={nms_iou_threshold} output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
self.hef_path = os.path.join(self.current_path, './hailomodel/eeg_employee_2.hef')
elif args.network == "yolox_s_leaky":
self.hef_path = os.path.join(self.current_path, './hailomodel/yolox_s_leaky_h8l_mz.hef')
else:
assert False, "Invalid network type"
# Set LSS callback
self.lss_callback = lss_callback
# Set process title for easier identification in system monitoring
setproctitle.setproctitle("Hailo Detection App")
# Create the pipeline
self.create_pipeline()
def get_pipeline_string(self):
# Define source element based on the source type
if self.source_type == "rpi":
# Use libcamerasrc for Raspberry Pi camera
source_element = f"libcamerasrc name=src_0 auto-focus-mode=2 ! "
source_element += f"video/x-raw, format={self.network_format}, width=1536, height=864 ! "
source_element += QUEUE("queue_src_scale")
source_element += f"videoscale ! "
source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "
elif self.source_type == "usb":
# Use v4l2src for USB camera with the specified pipeline structure
source_element = f"v4l2src device={self.video_source} name=src_0 ! "
source_element += f"capsfilter caps=\"video/x-raw,format=YUY2\" ! "
source_element += QUEUE("queue_src_convert")
source_element += f"videoconvert ! "
source_element += f"capsfilter caps=\"video/x-raw,format=I420\" ! "
source_element += f"jpegenc ! "
# Add multifilesink for saving frames (optional, comment out if not needed)
# source_element += f"multifilesink location=\"frame_%05d.jpg\""
else:
# Use filesrc for video file input
source_element = f"filesrc location={self.video_source} name=src_0 ! "
source_element += QUEUE("queue_dec264")
source_element += f" qtdemux ! h264parse ! avdec_h264 max-threads=2 ! "
source_element += f" video/x-raw,format=I420 ! "
# Common scaling and conversion steps for all source types (except USB)
if self.source_type != "usb":
source_element += QUEUE("queue_scale")
source_element += f" videoscale n-threads=2 ! "
source_element += QUEUE("queue_src_convert")
source_element += f" videoconvert n-threads=3 name=src_convert qos=false ! "
source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! "
# Construct the full pipeline string
pipeline_string = "hailomuxer name=hmux "
pipeline_string += source_element
pipeline_string += "tee name=t ! "
pipeline_string += QUEUE("bypass_queue", max_size_buffers=20) + "hmux.sink_0 "
pipeline_string += "t. ! " + QUEUE("queue_hailonet")
# Convert to RGB format before hailonet (required for Hailo processing)
pipeline_string += f"videoconvert n-threads=3 ! video/x-raw, format=RGB, width={self.network_width}, height={self.network_height} ! "
# Add hailonet element with specified HEF path and batch size
pipeline_string += f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! "
# Add post-processing with hailiofilter
pipeline_string += QUEUE("queue_hailofilter")
self.json_config_path = "./resource/eeg_employee_2.json"
pipeline_string += f"hailofilter function-name=\"filter\" so-path={self.default_postprocess_so} config-path={self.json_config_path} qos=false ! "
# Merge processed and bypass streams
pipeline_string += QUEUE("queue_hmuc") + " hmux.sink_1 "
pipeline_string += "hmux. ! " + QUEUE("queue_hailo_python")
# Add user callback (identity element for debugging or custom processing)
pipeline_string += QUEUE("queue_user_callback")
pipeline_string += f"identity name=identity_callback ! "
# Add hailooverlay for drawing inference results
pipeline_string += QUEUE("queue_hailooverlay")
pipeline_string += f"hailooverlay ! "
# Convert video format for display
pipeline_string += QUEUE("queue_videoconvert")
pipeline_string += f"videoconvert n-threads=3 qos=false ! "
# Add display sink with FPS overlay
pipeline_string += QUEUE("queue_hailo_display")
pipeline_string += f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true "
# Print the pipeline string for debugging
print(pipeline_string)
return pipeline_string
if __name__ == "__main__":
parser = get_default_parser()
# Add additional arguments here
parser.add_argument("--network", default="yolov8n", choices=['yolov6n', 'yolov8s', 'yolox_s_leaky'],
help="Which Network to use, defult is yolov6n")
args = parser.parse_args()
lss_data = lss_callback_class()
# # app = GStreamerDetectionApp(args, user_data)
# app = GStreamerDetectionApp(args, lss_data)
# print("Starting GStreamerDetectionApp")
# begin = time.time()
# app.run()
# print("Application run completed")
# end = time.time()
# print("Total time: ", 733 / (end - begin))
try:
app = GStreamerDetectionApp(args, lss_data) # Initialize within try block
print("Starting GStreamerDetectionApp")
begin = time.time()
app.run()
print("Application run completed")
end = time.time()
print("Total time: ", 733 / (end - begin))
except Exception as e: # Catch exceptions during initialization or running
print(f"Error: {e}", file=sys.stderr) # Print error to stderr
sys.exit(1) # Exit with error code
I just guessed that the error was related setting conditions and tried lots of things. All failed. Help me.