[RPi5] how to access mask from instance_segmentation

Hi there,

I have successfully installed and configure Hailo8 AI kit with RPi5. I am running the instance_segmentation.py example (from hailo-rpi5-examples) with HD Pi camera as an input.

I would like to access the bit mask data of a person so I can cut out the background and replace it with a static image. Now, I’m not too familiar with whole retraining datasets nor custom gstreamer pipelines so I’m wondering if someone could give me a hand with that or at least point in a direction?

Best regards,
Radek

Hey @Radek ,

You dont need to change the pipeline (gstreamer ) or the model , all you have to do is the following :
update app_callback function in instance_segmentation.py with the background replacement using the instance segmentation mask:

def app_callback(pad, info, user_data):
    # Get the GstBuffer from the probe info
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK

    # Using the user_data to count the number of frames
    user_data.increment()
    string_to_print = f"Frame count: {user_data.get_count()}\n"

    # Get the caps from the pad
    format, width, height = get_caps_from_pad(pad)

    # If the user_data.use_frame is set to True, we can get the video frame from the buffer
    frame = None
    if user_data.use_frame and format is not None and width is not None and height is not None:
        # Get video frame
        frame = get_numpy_from_buffer(buffer, format, width, height)

    # Get the detections from the buffer
    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

    # Load static background image and resize to match frame dimensions
    background = cv2.imread("background.jpg")
    background = cv2.resize(background, (width, height))

    # Parse the detections
    for detection in detections:
        label = detection.get_label()
        bbox = detection.get_bbox()
        confidence = detection.get_confidence()

        if label == "person":
            string_to_print += (f"Detection: {label} {confidence:.2f}\n")
            if user_data.use_frame:
                # Get instance segmentation mask from detection
                masks = detection.get_objects_typed(hailo.HAILO_CONF_CLASS_MASK)
                if len(masks) > 0:
                    mask = masks[0]
                    mask_height = mask.get_height()
                    mask_width = mask.get_width()
                    mask_data = np.array(mask.get_data()).reshape((mask_height, mask_width))

                    # Resize mask to match bounding box size
                    mask_resized = cv2.resize(mask_data, (mask_width * 4, mask_height * 4), interpolation=cv2.INTER_NEAREST)

                    # Get coordinates to place mask on the frame
                    x_min, y_min = int(bbox.xmin() * width), int(bbox.ymin() * height)
                    x_max, y_max = min(x_min + mask_resized.shape[1], width), min(y_min + mask_resized.shape[0], height)

                    # Create a mask overlay for the person
                    mask_overlay = np.zeros_like(frame)
                    mask_overlay[y_min:y_max, x_min:x_max] = cv2.merge([mask_resized] * 3)

                    # Apply the mask to extract the person and background
                    person = cv2.bitwise_and(frame, mask_overlay)
                    background_mask = cv2.bitwise_not(mask_overlay)
                    background_masked = cv2.bitwise_and(background, background_mask)

                    # Combine the person and background
                    frame = cv2.add(person, background_masked)

    print(string_to_print)

    if user_data.use_frame:
        # Convert the frame to BGR for display
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        user_data.set_frame(frame)

    return Gst.PadProbeReturn.OK

Key Changes

  1. Loading and resizing the static background image (background.jpg) to match the frame dimensions.
  2. Retrieving the instance segmentation mask for detected persons and resizing it to match the bounding box size.
  3. Creating a mask overlay for the person using the resized mask.
  4. Applying the mask to extract the person from the original frame and the corresponding region from the static background.
  5. Combining the extracted person with the masked background to replace the original background.

Thank you @omria !

That helped me lot. Unfortunately the provided code doesn’t produce any mask at all. I’m assuming it’s something to do with how mask_overlay is created. I have managed to get it working though. I had to add binary mask, and ensure it’s applied to all three color channels:

  binary_mask = (mask_resized> 0.2) * 255
  
  # Create a mask overlay for the person
  mask_overlay = np.zeros_like(frame)
  mask_overlay[y_min:y_max, x_min:x_max, :] = cv2.merge([binary_mask] * 3)

I’m not too satisfied with the results though. The resizing of the mask produces extremely pixelated image:

So I’ve played around with interpolation method and INTER_CUBIC seems to be quite good:

The problem I have personally is the mediocre results on the final video. I am using this free video from pexels - https://www.pexels.com/video/baseball-training-852399/ and here is what I got:


Obviously, ignore the perspective and lighting issue. I believe those, once set correctly will add to the realism, but so far I’m focused on the mask itself.

More importantly sometimes the mask doesn’t appear at all:

Additionally I see extreme CPU usage on my RPi5 when utilizins script with PiCamera module:

I believe in the power of community, therefore I would like to find out:

  1. How to get better quality mask?
  2. Why sometimes mask doesn’t appear on the buffer frame?
  3. How to replace the main preview with the one from the buffer?

My full script for reference:

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
from hailo_rpi_common import (
    get_caps_from_pad,
    get_numpy_from_buffer,
    app_callback_class,
)
from instance_segmentation_pipeline import GStreamerInstanceSegmentationApp

# -----------------------------------------------------------------------------------------------
# User-defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
# Inheritance from the app_callback_class
class user_app_callback_class(app_callback_class):
    def __init__(self):
        super().__init__()

# -----------------------------------------------------------------------------------------------
# User-defined callback function
# -----------------------------------------------------------------------------------------------

# This is the callback function that will be called when data is available from the pipeline
def app_callback(pad, info, user_data):
    # Get the GstBuffer from the probe info
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK

    # Using the user_data to count the number of frames
    user_data.increment()
    string_to_print = f"Frame count: {user_data.get_count()}\n"

    # Get the caps from the pad
    format, width, height = get_caps_from_pad(pad)

    # If the user_data.use_frame is set to True, we can get the video frame from the buffer
    frame = None
    if user_data.use_frame and format is not None and width is not None and height is not None:
        # Get video frame
        frame = get_numpy_from_buffer(buffer, format, width, height)

    # Get the detections from the buffer
    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

    # Load static background image and resize to match frame dimensions
    background = cv2.imread("background.jpg")
    background = cv2.resize(background, (width, height))

    # Parse the detections
    for detection in detections:
        label = detection.get_label()
        bbox = detection.get_bbox()
        confidence = detection.get_confidence()

        if label == "person":
            string_to_print += (f"Detection: {label} {confidence:.2f}\n")
            if user_data.use_frame:
                # Get instance segmentation mask from detection
                masks = detection.get_objects_typed(hailo.HAILO_CONF_CLASS_MASK)
                if len(masks) > 0:
                    mask = masks[0]
                    mask_height = mask.get_height()
                    mask_width = mask.get_width()
                    mask_data = np.array(mask.get_data()).reshape((mask_height, mask_width))

                    # Resize mask to match bounding box size
                    mask_resized = cv2.resize(mask_data, (mask_width * 4, mask_height * 4), interpolation=cv2.INTER_CUBIC)

                    # Get coordinates to place mask on the frame
                    x_min, y_min = int(bbox.xmin() * width), int(bbox.ymin() * height)
                    x_max, y_max = min(x_min + mask_resized.shape[1], width), min(y_min + mask_resized.shape[0], height)

                    binary_mask = (mask_resized> 0.5) * 255

                    # Create a mask overlay for the person
                    mask_overlay = np.zeros_like(frame)
                    mask_overlay[y_min:y_max, x_min:x_max, :] = cv2.merge([binary_mask] * 3)

                    # Apply the mask to extract the person and background
                    person = cv2.bitwise_and(frame, mask_overlay)
                    background_mask = cv2.bitwise_not(mask_overlay)
                    background_masked = cv2.bitwise_and(background, background_mask)

                    # Combine the person and background
                    frame = cv2.add(person, background_masked)

    print(string_to_print)

    if user_data.use_frame:
        # Convert the frame to BGR for display
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        user_data.set_frame(frame)

    return Gst.PadProbeReturn.OK

if __name__ == "__main__":
    # Create an instance of the user app callback class
    user_data = user_app_callback_class()
    app = GStreamerInstanceSegmentationApp(app_callback, user_data)
    app.run()