How to edit/update the output window of the Hailo GStreamer app

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
import time
from datetime import datetime
from hailo_rpi_common import (
    get_caps_from_pad,
    get_numpy_from_buffer,
    app_callback_class,
)
from detection_pipeline import GStreamerDetectionApp
from pygame import mixer  # For alarm

# Paths for data collection
DATA_PATH_NEAR = "/home/safebeach/Downloads/Data_Path_Near"
DATA_PATH_INSIDE = "/home/safebeach/Downloads/Data_path_Inside"
os.makedirs(DATA_PATH_NEAR, exist_ok=True)
os.makedirs(DATA_PATH_INSIDE, exist_ok=True)

# Custom class for detecting rip currents and humans
class SafetyMonitor(app_callback_class):
    def __init__(self, danger_threshold=150, warning_duration=5, capture_duration=6):
        super().__init__()
        self.warning_duration = warning_duration  # Seconds
        self.danger_threshold = danger_threshold  # Pixel distance
        self.warning_active = False
        self.last_detection_time = 0
        self.alarm_triggered = False
        self.capture_duration = capture_duration
        self.video_writer = None
        self.record_start_time = None
        self.recording_near = False
        self.recording_inside = False
        
        # Initialize audio
        mixer.init()
        self.alarm = mixer.Sound('/home/safebeach/hailo-rpi5-examples/resources/siren.wav')  # Load alarm sound
    
    def calculate_distance(self, box1, box2):
        """Calculate distance between centers of two boxes."""
        center1 = [(box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2]
        center2 = [(box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2]
        return np.linalg.norm(np.array(center1) - np.array(center2))

    def check_intersection(self, box1, box2):
        """Check if two bounding boxes intersect."""
        return not (box1[2] < box2[0] or box1[0] > box2[2] or 
                    box1[3] < box2[1] or box1[1] > box2[3])

    def start_capture(self, frame, path, is_near=True):
        """Start video capture for data collection."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"{path}/capture_{timestamp}.avi"
        self.video_writer = cv2.VideoWriter(
            filename, cv2.VideoWriter_fourcc(*'XVID'), 10, (frame.shape[1], frame.shape[0])
        )
        self.record_start_time = time.time()
        self.recording_near = is_near
        self.recording_inside = not is_near

    def stop_capture(self):
        """Stop video capture."""
        if self.video_writer:
            self.video_writer.release()
            self.video_writer = None
        self.recording_near = False
        self.recording_inside = False

    def trigger_alarm(self, frame, alert_text):
        """Trigger alarm and display warning."""
        current_time = time.time()
        if not self.alarm_triggered:
            self.alarm.play()
            self.last_detection_time = current_time
            self.alarm_triggered = True
        
        # Keep warning active for the specified duration
        if current_time - self.last_detection_time <= self.warning_duration:
            cv2.putText(frame, alert_text, 
                        (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1)
            cv2.rectangle(frame, (0, 0), (frame.shape[1], frame.shape[0]), 
                          (0, 0, 255), 1)
        else:
            self.alarm_triggered = False
            mixer.stop()

        return frame

def process_frame_callback(pad, info, monitor):
    """Callback to process video frames."""
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK

    format, width, height = get_caps_from_pad(pad)
    frame = get_numpy_from_buffer(buffer, format, width, height)

    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

    humans, rip_currents = [], []

    # Parse detections for humans and rip currents
    for detection in detections:
        label = detection.get_label()
        bbox = detection.get_bbox()
        confidence = detection.get_confidence()
        
        if confidence < 0.5:
            continue

        x1, y1, x2, y2 = [int(bbox.xmin() * width), int(bbox.ymin() * height),
                          int((bbox.xmin() + bbox.width()) * width),
                          int((bbox.ymin() + bbox.height()) * height)]
        
        if label == "Human":
            humans.append((x1, y1, x2, y2))
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 1)
            cv2.putText(frame, f"Human: {confidence:.2f}", 
                        (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 
                        (0, 255, 0), 1)
        elif label == "Rip_Current":
            rip_currents.append((x1, y1, x2, y2))
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 1)
            cv2.putText(frame, f"Rip Current: {confidence:.2f}", 
                        (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 
                        (0, 0, 255), 1)

    # Danger check and recording
    for human in humans:
        for rip in rip_currents:
            distance = monitor.calculate_distance(human, rip)
            print("distance: ", distance)
            cv2.line(frame, ((human[0] + human[2]) // 2, (human[1] + human[3]) // 2),
                     ((rip[0] + rip[2]) // 2, (rip[1] + rip[3]) // 2), (255, 255, 0), 1)
            cv2.putText(frame, f"Distance: {distance:.1f}px", 
                        ((human[0] + rip[0]) // 2, (human[1] + rip[1]) // 2), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 1)

            if distance < monitor.danger_threshold:
                print("WARNING: Human near rip current!")
                frame = monitor.trigger_alarm(frame, "WARNING: Human near rip current!")
                if not monitor.recording_near:
                    monitor.start_capture(frame, DATA_PATH_NEAR, is_near=True)
            elif monitor.check_intersection(human, rip):
                print("DANGER: Human inside rip current!")
                frame = monitor.trigger_alarm(frame, "DANGER: Human inside rip current!")
                if not monitor.recording_inside:
                    monitor.start_capture(frame, DATA_PATH_INSIDE, is_near=False)
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
    monitor.set_frame(frame)

    # Add timestamp and process data collection stop timing
    if monitor.video_writer:
        monitor.video_writer.write(frame)
        if time.time() - monitor.record_start_time > monitor.capture_duration:
            monitor.stop_capture()

    cv2.putText(frame, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 
                (10, height - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

    return Gst.PadProbeReturn.OK

def main():
    # Create the safety monitor
    monitor = SafetyMonitor(danger_threshold=150, warning_duration=5)
    
    # Create and run the detection app
    app = GStreamerDetectionApp(process_frame_callback, monitor)
    try:
        app.run()
    except KeyboardInterrupt:
        print("Application stopped by user")
    finally:
        cv2.destroyAllWindows()
        mixer.quit()

if __name__ == "__main__":
    main()

This is my code for an object detection model via a hef file using the rpi5 ai kit which uses hailo accelerator. I’m using the hailo-rpi5-examples repository and modifying it from there. The problem is, the gstreamer application that’s used in the detection pipeline opens a default window with basic elements like opencv rectangles and detection likeness. I want to add elements to this window, but I can’t since I don’t have access to the internal opencv window object and creating one in this code and using the --use-frame option while running the script opens up another window with the desired objects. How can I add these elements dynamically through this script on the original window?

Also where is the code that defines the default display pipeline and applies the opencv effects on top of the frames (like the default rectangles, text, etc.) that work out of the box?

Hey @2101030400090

When working with GStreamer pipelines, you’ll want to modify frames directly in the processing callback rather than creating separate windows. This ensures smooth integration with the existing pipeline while allowing dynamic updates to your display.

Here’s how you can modify the process_frame_callback function to achieve this:

def process_frame_callback(pad, info, monitor):
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK
        
    format, width, height = get_caps_from_pad(pad)
    frame = get_numpy_from_buffer(buffer, format, width, height)
    
    # Add your custom overlays
    cv2.putText(frame, "Custom Text", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    cv2.circle(frame, (width//2, height//2), 30, (0, 255, 255), 3)
    
    # Update frame in GStreamer pipeline
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
    monitor.set_frame(frame)
    
    return Gst.PadProbeReturn.OK

This approach ensures all visual elements appear in your original display window. Let me know if you need help with specific modifications!

I’ve already tried that. Firstly, it only works when I use the --use-frame option, which I mentioned in my original post. Secondly, it only pops up a new window.

I traced the cause to this piece of code in hailo_rpi_common.py:

def display_user_data_frame(user_data: app_callback_class):
    while user_data.running:
        frame = user_data.get_frame()
        if frame is not None:
            cv2.imshow("User Frame", frame)
        cv2.waitKey(1)
    cv2.destroyAllWindows()

The cv2.imshow() menthod pops up the new window, but what I can replace that line with to edit the original window?

The check for --use-frame option is enforced in about 3 files from the hailo-rpi5-examples repo, namely detection.py, detection_pipeline.py and the aforementioned hailo_rpi5_common.py.

The argparse line that adds the --use-frame option also mentions that it allows the pipeline to use frames from the custom callback function as in seen in hailo_rpi5_common.py: parser.add_argument("--use-frame", "-u", action="store_true", help="Use frame from the callback function")

Is using the --use-frame option the only way to inject custom frames from the custom callback? If not then how can I do it without using the option? And if yes, then what am I supposed to change in the aforementioned display_user_data_frame() function?