# region imports
# Standard library imports
# Third-party imports
import gi
gi.require_version(“Gst”, “1.0”)
import cv2
# Local application-specific imports
import hailo
from gi.repository import Gst
from hailo_apps.python.pipeline_apps.detection.detection_pipeline import GStreamerDetectionApp
from hailo_apps.python.core.common.buffer_utils import (
get_caps_from_pad,
get_numpy_from_buffer,
)
from hailo_apps.python.core.common.hailo_logger import get_logger
from hailo_apps.python.core.gstreamer.gstreamer_app import app_callback_class
hailo_logger = get_logger(_name_)
# endregion imports
# -----------------------------------------------------------------------------------------------
# User-defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
class user_app_callback_class(app_callback_class):
def \__init_\_(self):
super().\__init_\_()
# Define your custom labels here
# Mapping from COCO labels (default) to your custom classes
MY_LABELS = {
"person": "recyclable waste", # Class 0
"bicycle": "hazardous waste", # Class 1
"car": "kitchen waste", # Class 2
"motorcycle": "other waste", # Class 3
\# Fallback IDs
0: "recyclable waste",
1: "hazardous waste",
2: "kitchen waste",
3: "other waste"
}
# -----------------------------------------------------------------------------------------------
# User-defined callback function
# -----------------------------------------------------------------------------------------------
def app_callback(element, buffer, user_data):
if buffer is None:
hailo_logger.warning("Received None buffer.")
return
\# Note: Frame counting is handled automatically by the framework wrapper
frame_idx = user_data.get_count()
string_to_print = f"Frame count: {user_data.get_count()}\\n"
pad = element.get_static_pad("src")
format, width, height = get_caps_from_pad(pad)
frame = None
if user_data.use_frame and format is not None and width is not None and height is not None:
frame = get_numpy_from_buffer(buffer, format, width, height)
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
detection_count = 0
for detection in detections:
\# Get raw info for debugging
raw_label = detection.get_label()
class_id = -1
try:
class_id = detection.get_class_id()
except AttributeError:
pass
\# Smart mapping: try label string first, then class ID
label = MY_LABELS.get(raw_label, raw_label)
if label == raw_label and class_id in MY_LABELS:
label = MY_LABELS\[class_id\]
confidence = detection.get_confidence()
\# Filter based on confidence threshold (user requested 0.5)
if confidence < 0.5:
continue
\# Get track ID
track_id = 0
track = detection.get_objects_typed(hailo.HAILO_UNIQUE_ID)
if len(track) == 1:
track_id = track\[0\].get_id()
string_to_print += (
f"Detection: ID: {track_id} Raw: {raw_label}({class_id}) -> {label} Conf: {confidence:.2f}\\n"
)
detection_count += 1
if user_data.use_frame:
\# Draw detection bounding box and label
bbox = detection.get_bbox()
\# bbox coordinates are normalized \[0, 1\], scale to image size
x_min = max(0, int(bbox.xmin() \* width))
y_min = max(0, int(bbox.ymin() \* height))
x_max = min(width, int(bbox.xmax() \* width))
y_max = min(height, int(bbox.ymax() \* height))
\# Draw rectangle
cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)
\# Draw label
label_text = f"{label} {confidence:.2f}"
cv2.putText(
frame,
label_text,
(x_min, y_min - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 255, 0),
2
)
if user_data.use_frame:
cv2.putText(
frame,
f"Detections: {detection_count}",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 255, 0),
2,
)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
user_data.set_frame(frame)
print(string_to_print)
return
def main():
hailo_logger.info("Starting Detection App.")
user_data = user_app_callback_class()
app = GStreamerDetectionApp(app_callback, user_data)
app.run()
if _name_ == “_main_”:
main()