How do I edit/update the default output window of the Hailo GStreamer app

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
import time
from datetime import datetime
from hailo_rpi_common import (
    get_caps_from_pad,
    get_numpy_from_buffer,
    app_callback_class,
)
from detection_pipeline import GStreamerDetectionApp
from pygame import mixer  # For alarm

# Paths for data collection
DATA_PATH_NEAR = "/home/safebeach/Downloads/Data_Path_Near"
DATA_PATH_INSIDE = "/home/safebeach/Downloads/Data_path_Inside"
os.makedirs(DATA_PATH_NEAR, exist_ok=True)
os.makedirs(DATA_PATH_INSIDE, exist_ok=True)

# Custom class for detecting rip currents and humans
class SafetyMonitor(app_callback_class):
    def __init__(self, danger_threshold=150, warning_duration=5, capture_duration=6):
        super().__init__()
        self.warning_duration = warning_duration  # Seconds
        self.danger_threshold = danger_threshold  # Pixel distance
        self.warning_active = False
        self.last_detection_time = 0
        self.alarm_triggered = False
        self.capture_duration = capture_duration
        self.video_writer = None
        self.record_start_time = None
        self.recording_near = False
        self.recording_inside = False
        
        # Initialize audio
        mixer.init()
        self.alarm = mixer.Sound('/home/safebeach/hailo-rpi5-examples/resources/siren.wav')  # Load alarm sound
    
    def calculate_distance(self, box1, box2):
        """Calculate distance between centers of two boxes."""
        center1 = [(box1[0] + box1[2]) / 2, (box1[1] + box1[3]) / 2]
        center2 = [(box2[0] + box2[2]) / 2, (box2[1] + box2[3]) / 2]
        return np.linalg.norm(np.array(center1) - np.array(center2))

    def check_intersection(self, box1, box2):
        """Check if two bounding boxes intersect."""
        return not (box1[2] < box2[0] or box1[0] > box2[2] or 
                    box1[3] < box2[1] or box1[1] > box2[3])

    def start_capture(self, frame, path, is_near=True):
        """Start video capture for data collection."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"{path}/capture_{timestamp}.avi"
        self.video_writer = cv2.VideoWriter(
            filename, cv2.VideoWriter_fourcc(*'XVID'), 10, (frame.shape[1], frame.shape[0])
        )
        self.record_start_time = time.time()
        self.recording_near = is_near
        self.recording_inside = not is_near

    def stop_capture(self):
        """Stop video capture."""
        if self.video_writer:
            self.video_writer.release()
            self.video_writer = None
        self.recording_near = False
        self.recording_inside = False

    def trigger_alarm(self, frame, alert_text):
        """Trigger alarm and display warning."""
        current_time = time.time()
        if not self.alarm_triggered:
            self.alarm.play()
            self.last_detection_time = current_time
            self.alarm_triggered = True
        
        # Keep warning active for the specified duration
        if current_time - self.last_detection_time <= self.warning_duration:
            cv2.putText(frame, alert_text, 
                        (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1)
            cv2.rectangle(frame, (0, 0), (frame.shape[1], frame.shape[0]), 
                          (0, 0, 255), 1)
        else:
            self.alarm_triggered = False
            mixer.stop()

        return frame

def process_frame_callback(pad, info, monitor):
    """Callback to process video frames."""
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK

    format, width, height = get_caps_from_pad(pad)
    frame = get_numpy_from_buffer(buffer, format, width, height)

    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

    humans, rip_currents = [], []

    # Parse detections for humans and rip currents
    for detection in detections:
        label = detection.get_label()
        bbox = detection.get_bbox()
        confidence = detection.get_confidence()
        
        if confidence < 0.5:
            continue

        x1, y1, x2, y2 = [int(bbox.xmin() * width), int(bbox.ymin() * height),
                          int((bbox.xmin() + bbox.width()) * width),
                          int((bbox.ymin() + bbox.height()) * height)]
        
        if label == "Human":
            humans.append((x1, y1, x2, y2))
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 1)
            cv2.putText(frame, f"Human: {confidence:.2f}", 
                        (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 
                        (0, 255, 0), 1)
        elif label == "Rip_Current":
            rip_currents.append((x1, y1, x2, y2))
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 1)
            cv2.putText(frame, f"Rip Current: {confidence:.2f}", 
                        (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 
                        (0, 0, 255), 1)

    # Danger check and recording
    for human in humans:
        for rip in rip_currents:
            distance = monitor.calculate_distance(human, rip)
            print("distance: ", distance)
            cv2.line(frame, ((human[0] + human[2]) // 2, (human[1] + human[3]) // 2),
                     ((rip[0] + rip[2]) // 2, (rip[1] + rip[3]) // 2), (255, 255, 0), 1)
            cv2.putText(frame, f"Distance: {distance:.1f}px", 
                        ((human[0] + rip[0]) // 2, (human[1] + rip[1]) // 2), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 1)

            if distance < monitor.danger_threshold:
                print("WARNING: Human near rip current!")
                frame = monitor.trigger_alarm(frame, "WARNING: Human near rip current!")
                if not monitor.recording_near:
                    monitor.start_capture(frame, DATA_PATH_NEAR, is_near=True)
            elif monitor.check_intersection(human, rip):
                print("DANGER: Human inside rip current!")
                frame = monitor.trigger_alarm(frame, "DANGER: Human inside rip current!")
                if not monitor.recording_inside:
                    monitor.start_capture(frame, DATA_PATH_INSIDE, is_near=False)
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
    monitor.set_frame(frame)

    # Add timestamp and process data collection stop timing
    if monitor.video_writer:
        monitor.video_writer.write(frame)
        if time.time() - monitor.record_start_time > monitor.capture_duration:
            monitor.stop_capture()

    cv2.putText(frame, datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 
                (10, height - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

    return Gst.PadProbeReturn.OK

def main():
    # Create the safety monitor
    monitor = SafetyMonitor(danger_threshold=150, warning_duration=5)
    
    # Create and run the detection app
    app = GStreamerDetectionApp(process_frame_callback, monitor)
    try:
        app.run()
    except KeyboardInterrupt:
        print("Application stopped by user")
    finally:
        cv2.destroyAllWindows()
        mixer.quit()

if __name__ == "__main__":
    main()

This is my code for an object detection model via a hef file using the rpi5 ai kit which uses hailo accelerator. I’m using the hailo-rpi5-examples repository and modifying it from there. The problem is, the gstreamer application that’s used in the detection pipeline opens a default window with basic elements like opencv rectangles and detection likeness. I want to add elements to this window, but I can’t since I don’t have access to the internal opencv window object and creating one in this code and using the --use-frame option while running the script opens up another window with the desired objects. How can I add these elements dynamically through this script on the original window?

Also where is the code that defines the default display pipeline and applies the opencv effects on top of the frames (like the default rectangles, text, etc.) that work out of the box?

I’ve already tried using opencv from the custom callback process_frame_callback() as you can see. Firstly, it only works when I use the --use-frame option, which I mentioned in my original post. Secondly, it only pops up a new window.

I traced the cause to this piece of code in hailo_rpi_common.py:

def display_user_data_frame(user_data: app_callback_class):
    while user_data.running:
        frame = user_data.get_frame()
        if frame is not None:
            cv2.imshow("User Frame", frame)
        cv2.waitKey(1)
    cv2.destroyAllWindows()

The cv2.imshow() menthod pops up the new window, but what I can replace that line with to edit the original window?

The check for --use-frame option is enforced in about 3 files from the hailo-rpi5-examples repo, namely detection.py, detection_pipeline.py and the aforementioned hailo_rpi5_common.py.

The argparse line that adds the --use-frame option also mentions that it allows the pipeline to use frames from the custom callback function as in seen in hailo_rpi5_common.py: parser.add_argument("--use-frame", "-u", action="store_true", help="Use frame from the callback function")

Is using the --use-frame option the only way to inject custom frames from the custom callback? If not then how can I do it without using the option? And if yes, then what am I supposed to change in the aforementioned display_user_data_frame() function?

1 Like

Hey @2101030400090,

Let’s explore this approach: by incorporating an identity element into the pipeline and updating the USER_CALLBACK_PIPELINE function, you can process and modify frames directly within the pipeline.

1. Modify the GStreamer Pipeline

  • Add the identity element to the USER_CALLBACK_PIPELINE function to enable custom callback functionality.
  • This approach allows inline frame processing in the GStreamer pipeline, removing the need for the --use-frame option.

Update the USER_CALLBACK_PIPELINE function in hailo_rpi_common.py as follows:

def USER_CALLBACK_PIPELINE(name='identity_callback'):  
    return f'{QUEUE(name=f"{name}_q")} ! identity name={name}'  

2. Update the Process Frame Callback

To handle modified frames, include the following code at the end of your process_frame_callback function:

# Write the modified frame back into the GStreamer buffer  
success, map_info = buffer.map(Gst.MapFlags.WRITE)  
if not success:  
    raise RuntimeError("Failed to map buffer for writing")  

try:  
    # Convert the frame to RGB (GStreamer format) and copy it back to the buffer  
    modified_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  
    np.copyto(np.ndarray(shape=(height, width, 3), dtype=np.uint8, buffer=map_info.data), modified_frame)  
finally:  
    buffer.unmap(map_info)  

3. Attach the Callback to the Pipeline

Ensure your GStreamer pipeline includes the identity element, then connect the callback like this:

identity = pipeline.get_by_name("identity_callback")  
if identity:  
    identity_pad = identity.get_static_pad("src")  
    identity_pad.add_probe(Gst.PadProbeType.BUFFER, process_frame_callback, monitor)  

With these updates, your application will process and modify frames directly within the GStreamer pipeline using your custom callback.

I hope this helps clarify how to integrate UI changes into the RPI examples!

Best regards,
Omri

1 Like