import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
import time
from datetime import datetime
from hailo_rpi_common import (
get_caps_from_pad,
get_numpy_from_buffer,
app_callback_class,
)
from detection_pipeline import GStreamerDetectionApp
from pygame import mixer # For alarm
# Paths for data collection
DATA_PATH_NEAR = "/home/safebeach/Downloads/Data_Path_Near"
DATA_PATH_INSIDE = "/home/safebeach/Downloads/Data_path_Inside"
os.makedirs(DATA_PATH_NEAR, exist_ok=True)
os.makedirs(DATA_PATH_INSIDE, exist_ok=True)
# Custom class for detecting rip currents and humans
class SafetyMonitor(app_callback_class):
def __init__(self, danger_threshold=150, warning_duration=5, capture_duration=6):
super().__init__()
self.warning_duration = warning_duration # Seconds
self.danger_threshold = danger_threshold # Pixel distance
self.warning_active = False
self.last_detection_time = 0
self.alarm_triggered = False
self.capture_duration = capture_duration
self.video_writer = None
self.record_start_time = None
self.recording_near = False
self.recording_inside = False
# Initialize audio
mixer.init()
self.alarm = mixer.Sound('/home/safebeach/hailo-rpi5-examples/resources/siren.wav') # Load alarm sound
def calculate_distance(self, box1, box2):
"""Calculate distance between centers of two boxes."""
center1 = [(box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2]
center2 = [(box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2]
return np.linalg.norm(np.array(center1) - np.array(center2))
def check_intersection(self, box1, box2):
"""Check if two bounding boxes intersect."""
return not (box1[2] < box2[0] or box1[0] > box2[2] or
box1[3] < box2[1] or box1[1] > box2[3])
def start_capture(self, frame, path, is_near=True):
"""Start video capture for data collection."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{path}/capture_{timestamp}.avi"
self.video_writer = cv2.VideoWriter(
filename, cv2.VideoWriter_fourcc(*'XVID'), 10, (frame.shape[1], frame.shape[0])
)
self.record_start_time = time.time()
self.recording_near = is_near
self.recording_inside = not is_near
def stop_capture(self):
"""Stop video capture."""
if self.video_writer:
self.video_writer.release()
self.video_writer = None
self.recording_near = False
self.recording_inside = False
def trigger_alarm(self, frame, alert_text):
"""Trigger alarm and display warning."""
current_time = time.time()
if not self.alarm_triggered:
self.alarm.play()
self.last_detection_time = current_time
self.alarm_triggered = True
# Keep warning active for the specified duration
if current_time - self.last_detection_time <= self.warning_duration:
cv2.putText(frame, alert_text,
(10, 90), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1)
cv2.rectangle(frame, (0, 0), (frame.shape[1], frame.shape[0]),
(0, 0, 255), 1)
else:
self.alarm_triggered = False
mixer.stop()
return frame
def process_frame_callback(pad, info, monitor):
"""Callback to process video frames."""
buffer = info.get_buffer()
if buffer is None:
return Gst.PadProbeReturn.OK
format, width, height = get_caps_from_pad(pad)
frame = get_numpy_from_buffer(buffer, format, width, height)
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
humans, rip_currents = [], []
# Parse detections for humans and rip currents
for detection in detections:
label = detection.get_label()
bbox = detection.get_bbox()
confidence = detection.get_confidence()
if confidence < 0.5:
continue
x1, y1, x2, y2 = [int(bbox.xmin() * width), int(bbox.ymin() * height),
int((bbox.xmin() + bbox.width()) * width),
int((bbox.ymin() + bbox.height()) * height)]
if label == "Human":
humans.append((x1, y1, x2, y2))
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 1)
cv2.putText(frame, f"Human: {confidence:.2f}",
(x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
(0, 255, 0), 1)
elif label == "Rip_Current":
rip_currents.append((x1, y1, x2, y2))
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 1)
cv2.putText(frame, f"Rip Current: {confidence:.2f}",
(x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
(0, 0, 255), 1)
# Danger check and recording
for human in humans:
for rip in rip_currents:
distance = monitor.calculate_distance(human, rip)
print("distance: ", distance)
cv2.line(frame, ((human[0] + human[2]) // 2, (human[1] + human[3]) // 2),
((rip[0] + rip[2]) // 2, (rip[1] + rip[3]) // 2), (255, 255, 0), 1)
cv2.putText(frame, f"Distance: {distance:.1f}px",
((human[0] + rip[0]) // 2, (human[1] + rip[1]) // 2),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 1)
if distance < monitor.danger_threshold:
print("WARNING: Human near rip current!")
frame = monitor.trigger_alarm(frame, "WARNING: Human near rip current!")
if not monitor.recording_near:
monitor.start_capture(frame, DATA_PATH_NEAR, is_near=True)
elif monitor.check_intersection(human, rip):
print("DANGER: Human inside rip current!")
frame = monitor.trigger_alarm(frame, "DANGER: Human inside rip current!")
if not monitor.recording_inside:
monitor.start_capture(frame, DATA_PATH_INSIDE, is_near=False)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
monitor.set_frame(frame)
# Add timestamp and process data collection stop timing
if monitor.video_writer:
monitor.video_writer.write(frame)
if time.time() - monitor.record_start_time > monitor.capture_duration:
monitor.stop_capture()
cv2.putText(frame, datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
(10, height - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return Gst.PadProbeReturn.OK
def main():
# Create the safety monitor
monitor = SafetyMonitor(danger_threshold=150, warning_duration=5)
# Create and run the detection app
app = GStreamerDetectionApp(process_frame_callback, monitor)
try:
app.run()
except KeyboardInterrupt:
print("Application stopped by user")
finally:
cv2.destroyAllWindows()
mixer.quit()
if __name__ == "__main__":
main()
This is my code for an object detection model via a hef file using the rpi5 ai kit which uses hailo accelerator. I’m using the hailo-rpi5-examples repository and modifying it from there. The problem is, the gstreamer application that’s used in the detection pipeline opens a default window with basic elements like opencv rectangles and detection likeness. I want to add elements to this window, but I can’t since I don’t have access to the internal opencv window object and creating one in this code and using the --use-frame option while running the script opens up another window with the desired objects. How can I add these elements dynamically through this script on the original window?
Also where is the code that defines the default display pipeline and applies the opencv effects on top of the frames (like the default rectangles, text, etc.) that work out of the box?
I’ve already tried using opencv
from the custom callback process_frame_callback()
as you can see. Firstly, it only works when I use the --use-frame
option, which I mentioned in my original post. Secondly, it only pops up a new window.
I traced the cause to this piece of code in hailo_rpi_common.py
:
def display_user_data_frame(user_data: app_callback_class):
while user_data.running:
frame = user_data.get_frame()
if frame is not None:
cv2.imshow("User Frame", frame)
cv2.waitKey(1)
cv2.destroyAllWindows()
The cv2.imshow()
menthod pops up the new window, but what I can replace that line with to edit the original window?
The check for --use-frame
option is enforced in about 3 files from the hailo-rpi5-examples
repo, namely detection.py
, detection_pipeline.py
and the aforementioned hailo_rpi5_common.py
.
The argparse
line that adds the --use-frame
option also mentions that it allows the pipeline to use frames from the custom callback function as in seen in hailo_rpi5_common.py
: parser.add_argument("--use-frame", "-u", action="store_true", help="Use frame from the callback function")
Is using the --use-frame
option the only way to inject custom frames from the custom callback? If not then how can I do it without using the option? And if yes, then what am I supposed to change in the aforementioned display_user_data_frame()
function?