Hi, omria
I tried like you said, but there was an error like these:
(venv_hailo_rpi5_examples) root@iot:/home/test_camera/Benchmarking-YOLOv8-on-Raspberry-PI-reComputer-r1000-and-AIkit-Hailo-8L-main# python hailo_app_combined_5.py --input /dev/video0 -f
hailomuxer name=hmux v4l2src device=/dev/video0 name=src_0 ! video/x-raw, format=RGB, width=640, height=640, framerate=30/1 ! queue name=queue_scale max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoscale n-threads=2 ! queue name=queue_src_convert max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoconvert n-threads=3 name=src_convert qos=false ! video/x-raw, format=RGB, width=640, height=640, pixel-aspect-ratio=1/1 ! tee name=t ! queue name=bypass_queue max-size-buffers=20 max-size-bytes=0 max-size-time=0 ! hmux.sink_0 t. ! queue name=queue_hailonet max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoconvert n-threads=3 ! hailonet hef-path=/home/test_camera/Benchmarking-YOLOv8-on-Raspberry-PI-reComputer-r1000-and-AIkit-Hailo-8L-main/./hailomodel/eeg_employee_2.hef batch-size=2 nms-score-threshold=0.1 nms-iou-threshold=0.1 output-format-type=HAILO_FORMAT_TYPE_FLOAT32 force-writable=true ! queue name=queue_hailofilter max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! hailofilter function-name=“filter” so-path=/usr/lib/aarch64-linux-gnu/hailo/tappas/post_processes/libyolo_hailortpp_post.so config-path=./resource/eeg_employee_2.json qos=false ! queue name=queue_hmuc max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! hmux.sink_1 hmux. ! queue name=queue_hailo_python max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! queue name=queue_user_callback max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! identity name=identity_callback ! queue name=queue_hailooverlay max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! hailooverlay ! queue name=queue_videoconvert max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! videoconvert n-threads=3 qos=false ! queue name=queue_hailo_display max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! fpsdisplaysink video-sink=autovideosink name=hailo_display sync=false text-overlay=True signal-fps-measurements=true
Showing FPS
Starting GStreamerDetectionApp
Error: gst-stream-error-quark: Internal data stream error. (1), …/libs/gst/base/gstbasesrc.c(3132): gst_base_src_loop (): /GstPipeline:pipeline0/GstV4l2Src:src_0:
streaming stopped, reason not-negotiated (-4)
Application run completed
Total time: 265.24232344324963
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import json
import hailo
import supervision as sv
# Try to import hailo python module
try:
import hailo
except ImportError:
exit("Failed to import hailo python module. Make sure you are in hailo virtual environment.")
# ---------------------------------------------------------
# Functions used to get numpy arrays from GStreamer buffers
# ---------------------------------------------------------
def handle_rgb(map_info, width, height):
return np.ndarray(shape=(height, width, 3), dtype=np.uint8, buffer=map_info.data).copy()
def handle_nv12(map_info, width, height):
y_plane_size = width * height
uv_plane_size = width * height // 2
y_plane = np.ndarray(shape=(height, width), dtype=np.uint8, buffer=map_info.data[:y_plane_size]).copy()
uv_plane = np.ndarray(shape=(height // 2, width // 2, 2), dtype=np.uint8,
buffer=map_info.data[y_plane_size:]).copy()
return y_plane, uv_plane
def handle_yuyv(map_info, width, height):
return np.ndarray(shape=(height, width, 2), dtype=np.uint8, buffer=map_info.data).copy()
FORMAT_HANDLERS = {
'RGB': handle_rgb,
'NV12': handle_nv12,
'YUYV': handle_yuyv,
}
def get_numpy_from_buffer(buffer, format, width, height):
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success:
raise ValueError("Buffer mapping failed")
try:
handler = FORMAT_HANDLERS.get(format)
if handler is None:
raise ValueError(f"Unsupported format: {format}")
return handler(map_info, width, height)
finally:
buffer.unmap(map_info)
# ---------------------------------------------------------
# Useful functions for working with GStreamer
# ---------------------------------------------------------
def disable_qos(pipeline):
if not isinstance(pipeline, Gst.Pipeline):
print("The provided object is not a GStreamer Pipeline")
return
it = pipeline.iterate_elements()
while True:
result, element = it.next()
if result != Gst.IteratorResult.OK:
break
if 'qos' in GObject.list_properties(element):
element.set_property('qos', False)
print(f"Set qos to False for {element.get_name()}")
# -----------------------------------------------------------------------------------------------
# User defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
class lss_callback_class:
def __init__(self):
self.frame_count = 0
self.use_frame = False
self.frame_queue = multiprocessing.Queue(maxsize=3)
self.running = True
def increment(self):
self.frame_count += 1
def get_count(self):
return self.frame_count
def set_frame(self, frame):
if not self.frame_queue.full():
self.frame_queue.put(frame)
def get_frame(self):
if not self.frame_queue.empty():
return self.frame_queue.get()
else:
return None
# -----------------------------------------------------------------------------------------------
# Common functions
# -----------------------------------------------------------------------------------------------
def get_caps_from_pad(pad: Gst.Pad):
caps = pad.get_current_caps()
if caps:
structure = caps.get_structure(0)
if structure:
format = structure.get_value('format')
width = structure.get_value('width')
height = structure.get_value('height')
return format, width, height
return None, None, None
def display_user_data_frame(lss_data: lss_callback_class):
while lss_data.running:
frame = lss_data.get_frame()
if frame is not None:
cv2.imshow("User Frame", frame)
cv2.waitKey(1)
cv2.destroyAllWindows()
def display_lss_data_frame(lss_data: lss_callback_class):
# while lss_data.running:
# frame = lss_data.get_frame()
# if frame is not None:
# cv2.imshow("User Frame", frame)
# cv2.waitKey(1)
# cv2.destroyAllWindows()
while lss_data.running:
frame = lss_data.get_frame()
if frame is not None:
# Add custom overlays
cv2.putText(frame, "Custom Overlay Text", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
cv2.circle(frame, (frame.shape[1] // 2, frame.shape[0] // 2), 50, (0, 255, 0), 3)
cv2.imshow("User Frame", frame)
cv2.waitKey(1)
cv2.destroyAllWindows()
def get_default_parser():
parser = argparse.ArgumentParser(description="Hailo App Help")
parser.add_argument("--input", "-i", type=str, default="/dev/video0", help="Input source. Can be a file, USB or RPi camera (CSI camera module). \
For RPi camera use '-i rpi' (Still in Beta). \
Defaults to /dev/video0")
parser.add_argument("--use-frame", "-u", action="store_true", help="Use frame from the callback function")
parser.add_argument("--show-fps", "-f", action="store_true", help="Print FPS on sink")
parser.add_argument("--disable-sync", action="store_true", help="Disables display sink sync, will run as fast possible. Relevant when using file source.")
parser.add_argument("--dump-dot", action="store_true", help="Dump the pipeline graph to a dot file pipeline.dot")
return parser
def QUEUE(name, max_size_buffers=3, max_size_bytes=0, max_size_time=0):
return f"queue name={name} max-size-buffers={max_size_buffers} max-size-bytes={max_size_bytes} max-size-time={max_size_time} ! "