How to map my python streams object to the detection stream? (metadata access?)

Hi everyone,

I’m trying to build a test python app for my home to detect people and vehicles at night on certain cameras, which I can access via RTSP over my local network, and save frames on the filesystem.

Currently I’m facing the following issue.
I have a python dictionary that holds my RTSP stream and their config - what to detect on this stream (people, vehicles or both).
But when I get the detections in my pad probe, I can get only their stream id, which is an unknown string to my app, and instead I want to get the index of the stream (0,1,2,3) how it is defined in the pipeline. I need this to know what detections I want saved for this stream.

For this project I’m using a Raspberry Pi 5 with the Hailo8L AI hat 13TOPs. I have installed everything I believe and I have already made a working project to detect people, vehicles or both from one RTSP stream, so I believe the environment is setup properly.

However, I would like to do that on multiple cameras, so I used the multi_stream_detection_rtsp.sh as a base and modified it to fit my purposes.

I built a python application around this example, you can see the pipeline in my code below.

I read quite a lot about metadata and how to carry over an integer from the pipeline to the buffer, but it looks too complex. I thought it has to be a better way to do this.

So I read this page Hailo RoundRobin which says that a metadata is carried over to the hailostreamrouter.

Can I somehow retrieve that data with python to map the stream id to the index I have provided when generating the pipeline string?

See my code below:
(Ohh, and the pipeline is running fine, the error that I’m getting now when I run it, is inside the on_hailofilter_src_probe method on line 209, where I try to access my stream config dictionary)

import os
import sys
import gi

gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib

import hailo
import numpy as np
import cv2
from datetime import datetime
import signal


###############################################################################
# 1. Configuration
###############################################################################
STREAM_CONFIGS = [
    {
        "rtsp_url": "rtsp://admin:somepw@10.10.16.1/cam/realmonitor?channel=1&subtype=1",
        "detect_objects": "people",
        "name": "garden"
    },
    {
        "rtsp_url": "rtsp://admin:somepw@10.10.16.2/cam/realmonitor?channel=1&subtype=1",
        "detect_objects": "people",
        "name": "front_door"
    },
    {
        "rtsp_url": "rtsp://admin:somepw@10.10.16.3/cam/realmonitor?channel=1&subtype=1",
        "detect_objects": "both",
        "name": "side_road"
    },
    {
        "rtsp_url": "rtsp://admin:somepw@10.10.16.4/cam/realmonitor?channel=1&subtype=1",
        "detect_objects": "vehicles",
        "name": "back_road"
    },
]

LABEL_MAPPING = {
    0: "person",
    2: "car",
    3: "motorbike",
    5: "bus",
    7: "truck",
}


###############################################################################
# 2. Multi-Stream Class
###############################################################################
class MultiStreamHailoDetector:
    def __init__(self,
                 stream_configs,
                 model_hef,
                 postprocess_so,
                 network_width=640,
                 network_height=640,
                 detections_needed=5):
        self.stream_configs = stream_configs
        self.num_streams = len(stream_configs)
        self.model_hef = model_hef
        self.postprocess_so = postprocess_so
        self.net_width = network_width
        self.net_height = network_height
        self.detections_needed = detections_needed

        Gst.init(None)

        # Per-stream state
        self.stream_states = {
            i: {
                "config": cfg,
                "consecutive_detections": 0,
                "last_frame_np": None
            }
            for i, cfg in enumerate(stream_configs)
        }

        # Build pipeline string
        self.pipeline_str = self.build_pipeline()
        print("Final GStreamer Pipeline:\n", self.pipeline_str, "\n")

        # Parse/construct pipeline
        try:
            self.pipeline = Gst.parse_launch(self.pipeline_str)
        except GLib.Error as e:
            print("Error creating pipeline:", e)
            sys.exit(1)

        # Set up the bus
        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect("message", self.on_bus_message)

        # Attach a probe to hailofilter's src pad to extract detections
        hailofilter = self.pipeline.get_by_name("hailofilter")
        if hailofilter:
            pad = hailofilter.get_static_pad("src")
            if pad:
                pad.add_probe(Gst.PadProbeType.BUFFER, self.on_hailofilter_src_probe)
            else:
                print("WARNING: hailofilter has no src pad.")
        else:
            print("WARNING: hailofilter not found in pipeline.")

        # Set up appsinks (one per stream)
        for i in range(self.num_streams):
            sink_name = f"appsink_{i}"
            appsink = self.pipeline.get_by_name(sink_name)
            if appsink:
                appsink.connect("new-sample", self.on_new_sample_factory(i))
            else:
                print(f"WARNING: No appsink named {sink_name} found.")

    def build_pipeline(self):
        batch_size = max(self.num_streams, 1)

        # Adjust hailonet parameters as needed
        hailonet_params = (
            f"hef-path={self.model_hef} "
            "is-active=true "
            f"batch-size={batch_size} "
            "output-format-type=HAILO_FORMAT_TYPE_FLOAT32 "
            "force-writable=true "
            #"nms-score-threshold=0.3 "
            #"nms-iou-threshold=0.45 "
        )

        sources_str = ""
        streamrouter_str = ""
        sinks_str = ""
        for i, cfg in enumerate(self.stream_configs):
            sources_str += f"""
                rtspsrc location=\"{cfg["rtsp_url"]}\" name=source{i} message-forward=true !
                    rtph265depay !
                    queue leaky=no max-size-buffers=2 !
                    h265parse !
                    avdec_h265 !
                    videoconvert ! videoscale !
                    video/x-raw,format=RGB,width=640,height=640 !
                    queue leaky=no max-size-buffers=1 !
                    fun.sink_{i}
            """
            sinks_str += f"""
                sid.src_{i} !
                    queue name=to_appsink_{i} leaky=no max-size-buffers=1 !
                    video/x-raw,format=RGB !
                    appsink name=appsink_{i} emit-signals=True drop=false
            """
            streamrouter_str += f"src_{i}::input-streams=\"<sink_{i}>\" "


        pipeline = f"""
            {sources_str}
            hailoroundrobin mode=1 name=fun !
                queue name=hailo_pre_tee_q_0 leaky=no max-size-buffers=5 max-size-bytes=0 max-size-time=0 !
                tee name=t hailomuxer name=hmux
            t. ! queue name=bypass leaky=no max-size-buffers=20 max-size-bytes=0 max-size-time=0 ! hmux.
            t. ! queue name=hailo_pre_infer_q_0 leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 !
                hailonet {hailonet_params} !
                queue name=hailo_postprocess0 leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 !
                hailofilter name=hailofilter so-path={self.postprocess_so} qos=false !
            hmux.
            hmux. !
                hailostreamrouter name=sid {streamrouter_str}
            {sinks_str}
        """

        pipeline = " ".join(pipeline.split())
        return pipeline

    ###################################################
    # 3. GStreamer Callbacks
    ###################################################
    def on_bus_message(self, bus, message):
        t = message.type
        if t == Gst.MessageType.EOS:
            print("Got EOS - pipeline ended.")
            self.stop()
        elif t == Gst.MessageType.ERROR:
            err, dbg = message.parse_error()
            print(f"Error from {message.src.get_name()}: {err.message}")
            if dbg:
                print("Debug info:", dbg)
            self.stop()
        elif t == Gst.MessageType.WARNING:
            warn, dbg = message.parse_warning()
            print(f"WARNING from {message.src.get_name()}: {warn.message}")
            if dbg:
                print("Debug info:", dbg)
        return True

    def on_hailofilter_src_probe(self, pad, info):
        buffer = info.get_buffer()
        if not buffer:
            return Gst.PadProbeReturn.OK

        roi = hailo.get_roi_from_buffer(buffer)
        detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

        for det in detections:
            stream_id = det.get_stream_id()  # which roundrobin input
            label_id = det.get_label()
            confidence = det.get_confidence()

            # I want to use the stream index here to get the detection mode
            detect_mode = self.stream_states[stream_id]["config"]["detect_objects"]
            wanted_labels = self.get_wanted_labels(detect_mode)

            label_name = LABEL_MAPPING.get(label_id, f"unknown_{label_id}")
            if label_name in wanted_labels:
                self.stream_states[stream_id]["consecutive_detections"] += 1
                # Check threshold
                if self.stream_states[stream_id]["consecutive_detections"] >= self.detections_needed:
                    self.stream_states[stream_id]["consecutive_detections"] = 0
                    self.save_detection_snapshot(stream_id, det)

        return Gst.PadProbeReturn.OK

    def on_new_sample_factory(self, stream_idx):
        def callback(sink):
            sample = sink.emit("pull-sample")
            if not sample:
                return Gst.FlowReturn.ERROR

            buffer = sample.get_buffer()
            ok, mapinfo = buffer.map(Gst.MapFlags.READ)
            if ok:
                caps = sample.get_caps()
                struct = caps.get_structure(0)
                width = struct.get_value("width")
                height = struct.get_value("height")

                # If the pipeline is producing raw RGB:
                frame_data = np.frombuffer(mapinfo.data, dtype=np.uint8)
                frame_np = frame_data.reshape((height, width, 3))
                self.stream_states[stream_idx]["last_frame_np"] = frame_np.copy()
                buffer.unmap(mapinfo)

            return Gst.FlowReturn.OK
        return callback

    ###################################################
    # 4. Helper Methods
    ###################################################
    def get_wanted_labels(self, detect_mode):
        # Adjust for your YOLO classes
        people_labels = ["person"]
        vehicle_labels = ["car", "truck", "bus", "motorbike"]
        if detect_mode == "people":
            return people_labels
        elif detect_mode == "vehicles":
            return vehicle_labels
        elif detect_mode == "both":
            return people_labels + vehicle_labels
        else:
            return []

    def save_detection_snapshot(self, stream_id, detection_obj):
        frame_np = self.stream_states[stream_id]["last_frame_np"]
        if frame_np is None:
            print("No frame data to save.")
            return

        bbox = detection_obj.get_bbox()
        x_min = bbox.xmin()
        y_min = bbox.ymin()
        x_max = bbox.xmax()
        y_max = bbox.ymax()

        h, w, _ = frame_np.shape
        x_min_pix = int(x_min * w)
        y_min_pix = int(y_min * h)
        x_max_pix = int(x_max * w)
        y_max_pix = int(y_max * h)

        cv2.rectangle(frame_np, (x_min_pix, y_min_pix), (x_max_pix, y_max_pix), (0,255,0), 2)
        label_id = detection_obj.get_label()
        confidence = detection_obj.get_confidence() * 100
        text = f"{LABEL_MAPPING.get(label_id, label_id)}: {confidence:.1f}%"
        cv2.putText(frame_np, text, (x_min_pix, y_min_pix - 5),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1)

        outdir = "detections"
        os.makedirs(outdir, exist_ok=True)
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        cam_name = self.stream_states[stream_id]["config"]["name"]
        outpath = f"{outdir}/{cam_name}_{timestamp}.jpg"

        # Convert from RGB to BGR for saving
        frame_bgr = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR)
        cv2.imwrite(outpath, frame_bgr)
        print(f"Saved detection snapshot: {outpath}")

    ###################################################
    # 5. Start/Stop
    ###################################################
    def run(self):
        ret = self.pipeline.set_state(Gst.State.PLAYING)
        if ret == Gst.StateChangeReturn.FAILURE:
            print("Unable to set pipeline to PLAYING.")
            sys.exit(1)

        self.loop = GLib.MainLoop()
        try:
            print("Starting main loop. Press Ctrl+C to quit.")
            self.loop.run()
        except KeyboardInterrupt:
            self.stop()

    def stop(self):
        self.pipeline.send_event(Gst.Event.new_eos())
        self.pipeline.get_bus().timed_pop_filtered(Gst.SECOND, Gst.MessageType.EOS)
        self.pipeline.set_state(Gst.State.NULL)
        if hasattr(self, "loop"):
            self.loop.quit()
        sys.exit(0)


###############################################################################
# 6. Main
###############################################################################
def main():
    current_dir = os.path.dirname(os.path.abspath(__file__))
    postprocess_so = os.path.join(current_dir, "resources", "libyolo_hailortpp_postprocess.so")
    model_hef = os.path.join(current_dir, "resources", "yolov8s_h8l.hef")

    if not os.path.exists(postprocess_so):
        print("Missing postprocess .so:", postprocess_so)
        sys.exit(1)
    if not os.path.exists(model_hef):
        print("Missing .hef file:", model_hef)
        sys.exit(1)

    detector = MultiStreamHailoDetector(
        stream_configs=STREAM_CONFIGS,
        model_hef=model_hef,
        postprocess_so=postprocess_so,
        network_width=640,
        network_height=640,
        detections_needed=5
    )

    def sigint_handler(sig, frame):
        print("SIGINT received, stopping.")
        detector.stop()

    signal.signal(signal.SIGINT, sigint_handler)
    detector.run()


if __name__ == "__main__":
    main()

Hey @hsq ,

Welcome to the Hailo Community!

This is an amazing project !

Here’s how you can map buffer callbacks to specific stream configurations:

Using HailoRoundRobin and HailoStreamRouter

  1. Configure your pipeline with proper naming:

    hailoroundrobin name=roundrobin funnel-mode=false !
    ...
    hailostreamrouter name=router src_0::input-streams='<sink_0, sink_1>' src_1::input-streams='<sink_2, sink_3>'
    
  2. Extract stream ID in your Python probe:

    def on_hailofilter_src_probe(pad, info):
        buffer = info.get_buffer()
        if not buffer:
            return Gst.PadProbeReturn.OK
            
        # Get stream ID from Hailo metadata
        roi = hailo.get_roi_from_buffer(buffer)
        pad_name = roi.get_source_pad_name()
        
        # Convert to stream index
        stream_index = int(pad_name.split("_")[1])
        stream_config = STREAM_CONFIGS[stream_index]
        
        print(f"Processing stream {stream_index}: {stream_config['name']}")
        return Gst.PadProbeReturn.OK
    

This approach lets you maintain your original STREAM_CONFIGS structure while properly mapping buffers to their source streams in your Python application.