import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
import time
from datetime import datetime
from hailo_rpi_common import (
get_caps_from_pad,
get_numpy_from_buffer,
app_callback_class,
)
from detection_pipeline import GStreamerDetectionApp
from pygame import mixer # For alarm
# Paths for data collection
DATA_PATH_NEAR = "/home/safebeach/Downloads/Data_Path_Near"
DATA_PATH_INSIDE = "/home/safebeach/Downloads/Data_path_Inside"
os.makedirs(DATA_PATH_NEAR, exist_ok=True)
os.makedirs(DATA_PATH_INSIDE, exist_ok=True)
# Custom class for detecting rip currents and humans
class SafetyMonitor(app_callback_class):
def __init__(self, danger_threshold=150, warning_duration=5, capture_duration=6):
super().__init__()
self.warning_duration = warning_duration # Seconds
self.danger_threshold = danger_threshold # Pixel distance
self.warning_active = False
self.last_detection_time = 0
self.alarm_triggered = False
self.capture_duration = capture_duration
self.video_writer = None
self.record_start_time = None
self.recording_near = False
self.recording_inside = False
# Initialize audio
mixer.init()
self.alarm = mixer.Sound('/home/safebeach/hailo-rpi5-examples/resources/siren.wav') # Load alarm sound
def calculate_distance(self, box1, box2):
"""Calculate distance between centers of two boxes."""
center1 = [(box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2]
center2 = [(box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2]
return np.linalg.norm(np.array(center1) - np.array(center2))
def check_intersection(self, box1, box2):
"""Check if two bounding boxes intersect."""
return not (box1[2] < box2[0] or box1[0] > box2[2] or
box1[3] < box2[1] or box1[1] > box2[3])
def start_capture(self, frame, path, is_near=True):
"""Start video capture for data collection."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{path}/capture_{timestamp}.avi"
self.video_writer = cv2.VideoWriter(
filename, cv2.VideoWriter_fourcc(*'XVID'), 10, (frame.shape[1], frame.shape[0])
)
self.record_start_time = time.time()
self.recording_near = is_near
self.recording_inside = not is_near
def stop_capture(self):
"""Stop video capture."""
if self.video_writer:
self.video_writer.release()
self.video_writer = None
self.recording_near = False
self.recording_inside = False
def trigger_alarm(self, frame, alert_text):
"""Trigger alarm and display warning."""
current_time = time.time()
if not self.alarm_triggered:
self.alarm.play()
self.last_detection_time = current_time
self.alarm_triggered = True
# Keep warning active for the specified duration
if current_time - self.last_detection_time <= self.warning_duration:
cv2.putText(frame, alert_text,
(10, 90), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1)
cv2.rectangle(frame, (0, 0), (frame.shape[1], frame.shape[0]),
(0, 0, 255), 1)
else:
self.alarm_triggered = False
mixer.stop()
return frame
def process_frame_callback(pad, info, monitor):
"""Callback to process video frames."""
buffer = info.get_buffer()
if buffer is None:
return Gst.PadProbeReturn.OK
format, width, height = get_caps_from_pad(pad)
frame = get_numpy_from_buffer(buffer, format, width, height)
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
humans, rip_currents = [], []
# Parse detections for humans and rip currents
for detection in detections:
label = detection.get_label()
bbox = detection.get_bbox()
confidence = detection.get_confidence()
if confidence < 0.5:
continue
x1, y1, x2, y2 = [int(bbox.xmin() * width), int(bbox.ymin() * height),
int((bbox.xmin() + bbox.width()) * width),
int((bbox.ymin() + bbox.height()) * height)]
if label == "Human":
humans.append((x1, y1, x2, y2))
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 1)
cv2.putText(frame, f"Human: {confidence:.2f}",
(x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
(0, 255, 0), 1)
elif label == "Rip_Current":
rip_currents.append((x1, y1, x2, y2))
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 1)
cv2.putText(frame, f"Rip Current: {confidence:.2f}",
(x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
(0, 0, 255), 1)
# Danger check and recording
for human in humans:
for rip in rip_currents:
distance = monitor.calculate_distance(human, rip)
print("distance: ", distance)
cv2.line(frame, ((human[0] + human[2]) // 2, (human[1] + human[3]) // 2),
((rip[0] + rip[2]) // 2, (rip[1] + rip[3]) // 2), (255, 255, 0), 1)
cv2.putText(frame, f"Distance: {distance:.1f}px",
((human[0] + rip[0]) // 2, (human[1] + rip[1]) // 2),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 1)
if distance < monitor.danger_threshold:
print("WARNING: Human near rip current!")
frame = monitor.trigger_alarm(frame, "WARNING: Human near rip current!")
if not monitor.recording_near:
monitor.start_capture(frame, DATA_PATH_NEAR, is_near=True)
elif monitor.check_intersection(human, rip):
print("DANGER: Human inside rip current!")
frame = monitor.trigger_alarm(frame, "DANGER: Human inside rip current!")
if not monitor.recording_inside:
monitor.start_capture(frame, DATA_PATH_INSIDE, is_near=False)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
monitor.set_frame(frame)
# Add timestamp and process data collection stop timing
if monitor.video_writer:
monitor.video_writer.write(frame)
if time.time() - monitor.record_start_time > monitor.capture_duration:
monitor.stop_capture()
cv2.putText(frame, datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
(10, height - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return Gst.PadProbeReturn.OK
def main():
# Create the safety monitor
monitor = SafetyMonitor(danger_threshold=150, warning_duration=5)
# Create and run the detection app
app = GStreamerDetectionApp(process_frame_callback, monitor)
try:
app.run()
except KeyboardInterrupt:
print("Application stopped by user")
finally:
cv2.destroyAllWindows()
mixer.quit()
if __name__ == "__main__":
main()
This is my code for an object detection model via a hef file using the rpi5 ai kit which uses hailo accelerator. I’m using the hailo-rpi5-examples repository and modifying it from there. The problem is, the gstreamer application that’s used in the detection pipeline opens a default window with basic elements like opencv rectangles and detection likeness. I want to add elements to this window, but I can’t since I don’t have access to the internal opencv window object and creating one in this code and using the --use-frame
option while running the script opens up another window with the desired objects. How can I add these elements dynamically through this script on the original window?
Also where is the code that defines the default display pipeline and applies the opencv effects on top of the frames (like the default rectangles, text, etc.) that work out of the box?