Hi everyone,
I’m trying to build a test python app for my home to detect people and vehicles at night on certain cameras, which I can access via RTSP over my local network, and save frames on the filesystem.
Currently I’m facing the following issue.
I have a python dictionary that holds my RTSP stream and their config - what to detect on this stream (people, vehicles or both).
But when I get the detections in my pad probe, I can get only their stream id, which is an unknown string to my app, and instead I want to get the index of the stream (0,1,2,3) how it is defined in the pipeline. I need this to know what detections I want saved for this stream.
For this project I’m using a Raspberry Pi 5 with the Hailo8L AI hat 13TOPs. I have installed everything I believe and I have already made a working project to detect people, vehicles or both from one RTSP stream, so I believe the environment is setup properly.
However, I would like to do that on multiple cameras, so I used the multi_stream_detection_rtsp.sh as a base and modified it to fit my purposes.
I built a python application around this example, you can see the pipeline in my code below.
I read quite a lot about metadata and how to carry over an integer from the pipeline to the buffer, but it looks too complex. I thought it has to be a better way to do this.
So I read this page Hailo RoundRobin which says that a metadata is carried over to the hailostreamrouter.
Can I somehow retrieve that data with python to map the stream id to the index I have provided when generating the pipeline string?
See my code below:
(Ohh, and the pipeline is running fine, the error that I’m getting now when I run it, is inside the on_hailofilter_src_probe
method on line 209, where I try to access my stream config dictionary)
import os
import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import hailo
import numpy as np
import cv2
from datetime import datetime
import signal
###############################################################################
# 1. Configuration
###############################################################################
STREAM_CONFIGS = [
{
"rtsp_url": "rtsp://admin:somepw@10.10.16.1/cam/realmonitor?channel=1&subtype=1",
"detect_objects": "people",
"name": "garden"
},
{
"rtsp_url": "rtsp://admin:somepw@10.10.16.2/cam/realmonitor?channel=1&subtype=1",
"detect_objects": "people",
"name": "front_door"
},
{
"rtsp_url": "rtsp://admin:somepw@10.10.16.3/cam/realmonitor?channel=1&subtype=1",
"detect_objects": "both",
"name": "side_road"
},
{
"rtsp_url": "rtsp://admin:somepw@10.10.16.4/cam/realmonitor?channel=1&subtype=1",
"detect_objects": "vehicles",
"name": "back_road"
},
]
LABEL_MAPPING = {
0: "person",
2: "car",
3: "motorbike",
5: "bus",
7: "truck",
}
###############################################################################
# 2. Multi-Stream Class
###############################################################################
class MultiStreamHailoDetector:
def __init__(self,
stream_configs,
model_hef,
postprocess_so,
network_width=640,
network_height=640,
detections_needed=5):
self.stream_configs = stream_configs
self.num_streams = len(stream_configs)
self.model_hef = model_hef
self.postprocess_so = postprocess_so
self.net_width = network_width
self.net_height = network_height
self.detections_needed = detections_needed
Gst.init(None)
# Per-stream state
self.stream_states = {
i: {
"config": cfg,
"consecutive_detections": 0,
"last_frame_np": None
}
for i, cfg in enumerate(stream_configs)
}
# Build pipeline string
self.pipeline_str = self.build_pipeline()
print("Final GStreamer Pipeline:\n", self.pipeline_str, "\n")
# Parse/construct pipeline
try:
self.pipeline = Gst.parse_launch(self.pipeline_str)
except GLib.Error as e:
print("Error creating pipeline:", e)
sys.exit(1)
# Set up the bus
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect("message", self.on_bus_message)
# Attach a probe to hailofilter's src pad to extract detections
hailofilter = self.pipeline.get_by_name("hailofilter")
if hailofilter:
pad = hailofilter.get_static_pad("src")
if pad:
pad.add_probe(Gst.PadProbeType.BUFFER, self.on_hailofilter_src_probe)
else:
print("WARNING: hailofilter has no src pad.")
else:
print("WARNING: hailofilter not found in pipeline.")
# Set up appsinks (one per stream)
for i in range(self.num_streams):
sink_name = f"appsink_{i}"
appsink = self.pipeline.get_by_name(sink_name)
if appsink:
appsink.connect("new-sample", self.on_new_sample_factory(i))
else:
print(f"WARNING: No appsink named {sink_name} found.")
def build_pipeline(self):
batch_size = max(self.num_streams, 1)
# Adjust hailonet parameters as needed
hailonet_params = (
f"hef-path={self.model_hef} "
"is-active=true "
f"batch-size={batch_size} "
"output-format-type=HAILO_FORMAT_TYPE_FLOAT32 "
"force-writable=true "
#"nms-score-threshold=0.3 "
#"nms-iou-threshold=0.45 "
)
sources_str = ""
streamrouter_str = ""
sinks_str = ""
for i, cfg in enumerate(self.stream_configs):
sources_str += f"""
rtspsrc location=\"{cfg["rtsp_url"]}\" name=source{i} message-forward=true !
rtph265depay !
queue leaky=no max-size-buffers=2 !
h265parse !
avdec_h265 !
videoconvert ! videoscale !
video/x-raw,format=RGB,width=640,height=640 !
queue leaky=no max-size-buffers=1 !
fun.sink_{i}
"""
sinks_str += f"""
sid.src_{i} !
queue name=to_appsink_{i} leaky=no max-size-buffers=1 !
video/x-raw,format=RGB !
appsink name=appsink_{i} emit-signals=True drop=false
"""
streamrouter_str += f"src_{i}::input-streams=\"<sink_{i}>\" "
pipeline = f"""
{sources_str}
hailoroundrobin mode=1 name=fun !
queue name=hailo_pre_tee_q_0 leaky=no max-size-buffers=5 max-size-bytes=0 max-size-time=0 !
tee name=t hailomuxer name=hmux
t. ! queue name=bypass leaky=no max-size-buffers=20 max-size-bytes=0 max-size-time=0 ! hmux.
t. ! queue name=hailo_pre_infer_q_0 leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 !
hailonet {hailonet_params} !
queue name=hailo_postprocess0 leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 !
hailofilter name=hailofilter so-path={self.postprocess_so} qos=false !
hmux.
hmux. !
hailostreamrouter name=sid {streamrouter_str}
{sinks_str}
"""
pipeline = " ".join(pipeline.split())
return pipeline
###################################################
# 3. GStreamer Callbacks
###################################################
def on_bus_message(self, bus, message):
t = message.type
if t == Gst.MessageType.EOS:
print("Got EOS - pipeline ended.")
self.stop()
elif t == Gst.MessageType.ERROR:
err, dbg = message.parse_error()
print(f"Error from {message.src.get_name()}: {err.message}")
if dbg:
print("Debug info:", dbg)
self.stop()
elif t == Gst.MessageType.WARNING:
warn, dbg = message.parse_warning()
print(f"WARNING from {message.src.get_name()}: {warn.message}")
if dbg:
print("Debug info:", dbg)
return True
def on_hailofilter_src_probe(self, pad, info):
buffer = info.get_buffer()
if not buffer:
return Gst.PadProbeReturn.OK
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
for det in detections:
stream_id = det.get_stream_id() # which roundrobin input
label_id = det.get_label()
confidence = det.get_confidence()
# I want to use the stream index here to get the detection mode
detect_mode = self.stream_states[stream_id]["config"]["detect_objects"]
wanted_labels = self.get_wanted_labels(detect_mode)
label_name = LABEL_MAPPING.get(label_id, f"unknown_{label_id}")
if label_name in wanted_labels:
self.stream_states[stream_id]["consecutive_detections"] += 1
# Check threshold
if self.stream_states[stream_id]["consecutive_detections"] >= self.detections_needed:
self.stream_states[stream_id]["consecutive_detections"] = 0
self.save_detection_snapshot(stream_id, det)
return Gst.PadProbeReturn.OK
def on_new_sample_factory(self, stream_idx):
def callback(sink):
sample = sink.emit("pull-sample")
if not sample:
return Gst.FlowReturn.ERROR
buffer = sample.get_buffer()
ok, mapinfo = buffer.map(Gst.MapFlags.READ)
if ok:
caps = sample.get_caps()
struct = caps.get_structure(0)
width = struct.get_value("width")
height = struct.get_value("height")
# If the pipeline is producing raw RGB:
frame_data = np.frombuffer(mapinfo.data, dtype=np.uint8)
frame_np = frame_data.reshape((height, width, 3))
self.stream_states[stream_idx]["last_frame_np"] = frame_np.copy()
buffer.unmap(mapinfo)
return Gst.FlowReturn.OK
return callback
###################################################
# 4. Helper Methods
###################################################
def get_wanted_labels(self, detect_mode):
# Adjust for your YOLO classes
people_labels = ["person"]
vehicle_labels = ["car", "truck", "bus", "motorbike"]
if detect_mode == "people":
return people_labels
elif detect_mode == "vehicles":
return vehicle_labels
elif detect_mode == "both":
return people_labels + vehicle_labels
else:
return []
def save_detection_snapshot(self, stream_id, detection_obj):
frame_np = self.stream_states[stream_id]["last_frame_np"]
if frame_np is None:
print("No frame data to save.")
return
bbox = detection_obj.get_bbox()
x_min = bbox.xmin()
y_min = bbox.ymin()
x_max = bbox.xmax()
y_max = bbox.ymax()
h, w, _ = frame_np.shape
x_min_pix = int(x_min * w)
y_min_pix = int(y_min * h)
x_max_pix = int(x_max * w)
y_max_pix = int(y_max * h)
cv2.rectangle(frame_np, (x_min_pix, y_min_pix), (x_max_pix, y_max_pix), (0,255,0), 2)
label_id = detection_obj.get_label()
confidence = detection_obj.get_confidence() * 100
text = f"{LABEL_MAPPING.get(label_id, label_id)}: {confidence:.1f}%"
cv2.putText(frame_np, text, (x_min_pix, y_min_pix - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1)
outdir = "detections"
os.makedirs(outdir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
cam_name = self.stream_states[stream_id]["config"]["name"]
outpath = f"{outdir}/{cam_name}_{timestamp}.jpg"
# Convert from RGB to BGR for saving
frame_bgr = cv2.cvtColor(frame_np, cv2.COLOR_RGB2BGR)
cv2.imwrite(outpath, frame_bgr)
print(f"Saved detection snapshot: {outpath}")
###################################################
# 5. Start/Stop
###################################################
def run(self):
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Unable to set pipeline to PLAYING.")
sys.exit(1)
self.loop = GLib.MainLoop()
try:
print("Starting main loop. Press Ctrl+C to quit.")
self.loop.run()
except KeyboardInterrupt:
self.stop()
def stop(self):
self.pipeline.send_event(Gst.Event.new_eos())
self.pipeline.get_bus().timed_pop_filtered(Gst.SECOND, Gst.MessageType.EOS)
self.pipeline.set_state(Gst.State.NULL)
if hasattr(self, "loop"):
self.loop.quit()
sys.exit(0)
###############################################################################
# 6. Main
###############################################################################
def main():
current_dir = os.path.dirname(os.path.abspath(__file__))
postprocess_so = os.path.join(current_dir, "resources", "libyolo_hailortpp_postprocess.so")
model_hef = os.path.join(current_dir, "resources", "yolov8s_h8l.hef")
if not os.path.exists(postprocess_so):
print("Missing postprocess .so:", postprocess_so)
sys.exit(1)
if not os.path.exists(model_hef):
print("Missing .hef file:", model_hef)
sys.exit(1)
detector = MultiStreamHailoDetector(
stream_configs=STREAM_CONFIGS,
model_hef=model_hef,
postprocess_so=postprocess_so,
network_width=640,
network_height=640,
detections_needed=5
)
def sigint_handler(sig, frame):
print("SIGINT received, stopping.")
detector.stop()
signal.signal(signal.SIGINT, sigint_handler)
detector.run()
if __name__ == "__main__":
main()