How to Run Dual Camera with Two AI Models in Parallel on Raspberry Pi 5 + Hailo-8?

Hi everyone,

I’m currently working with a Raspberry Pi 5 combined with a Hailo-8 AI HAT+, and I need to run two cameras simultaneously, each using a different AI model (e.g., YOLOv6n + YOLOv8s). My goal:

  • Camera 1 runs model A
  • Camera 2 runs model B
  • Both run in real-time, in parallel, using the Hailo NPU for inference

I’ve found relevant code in the hailo-rpi5-examples repository (e.g., detection.py, multi_detection), and I’m able to detect both cameras. However, I’m not sure how to configure two parallel pipelines, each with a different model. Should I launch them in separate processes, or is there a recommended way to manage both in a single application?

My current setup:

  • 2 USB camera
  • Hailo SDK on Raspberry Pi 5
  • GStreamer pipelines using hailonet, hailofilter, hailooverlay, etc.
  • Models: YOLOv6n, YOLOv8s (.hef format)

Has anyone successfully run a similar multi-camera, multi-model pipeline using the Hailo SDK on RPi5? Any documentation, code samples, or tips would be greatly appreciated!

Thanks in advance!

1 Like

Hi @The_Nguyen
You can use DeGirum PySDK for such use cases. Please see Running multiple models independently thread for an example.

Hey @The_Nguyen,

Welcome to the Hailo Community!

I’m curious , what’s the reason for running two different models? There might be a more efficient approach depending on what you’re trying to achieve.

For a good starting point, I’d suggest checking out this example: tappas/apps/h8/gstreamer/general/multistream_detection at master · hailo-ai/tappas · GitHub

It should give you a solid foundation to work from.

Hi @shashi
Thank you for your response and guidance. I followed your instructions with the code below; however, when I run it, two windows appear showing the two camera USB streams, but they immediately freeze and no frames are captured. The process stops right away, and CPU usage spikes to over 200%. I’m not sure where the issue lies. I’m using a Raspberry Pi 5 with 8GB RAM and a Hailo-8 (26 TOPS). I would greatly appreciate your feedback so that I can continue my research and further development. Here is my code:

import degirum as dg
import degirum_tools.streams as dgstreams
inference_host_address = “@local
zoo_url = “degirum/models_hailort”
token = ‘’
configurations = [
{
“model_name”: “yolov8n_relu6_coco–640x640_quant_hailort_hailo8_1”,
“source”: 2,
“display_name”: “USB cam 1”,
},
{
“model_name”: “yolov8n_relu6_face–640x640_quant_hailort_hailo8_1”,
“source”: 0,
“display_name”: “USB cam 2”,
},
]
models = [
dg.load_model(cfg[“model_name”], inference_host_address, zoo_url, token)
for cfg in configurations
]
sources = [dgstreams.VideoSourceGizmo(cfg[“source”]) for cfg in configurations]
detectors = [dgstreams.AiSimpleGizmo(model) for model in models]
display = dgstreams.VideoDisplayGizmo(
[cfg[“display_name”] for cfg in configurations], show_ai_overlay=True, show_fps=True
)
pipeline = (
(source >> detector for source, detector in zip(sources, detectors)),
(detector >> display[di] for di, detector in enumerate(detectors)),
)
dgstreams.Composition(*pipeline).start()

Hi @The_Nguyen
Can you please try this simpler code and see what happens?

import degirum as dg, degirum_tools, cv2

# Camera sources: 0 and 2 represent connected webcam indices
sources = [2, 0]

# Hailo configuration
inference_host_address = "@local"  # Use IP if remote Hailo device
zoo_url = "degirum/models_hailort"
token = ""  # Leave empty if using public models

# Load the Hailo-compatible model using dg.load_model
model1 = dg.load_model(
    model_name="yolov8n_relu6_coco–640x640_quant_hailort_hailo8_1",
    inference_host_address=inference_host_address,
    zoo_url=zoo_url,
    token=token,    
)

model2 = dg.load_model(
    model_name="yolov8n_relu6_face–640x640_quant_hailort_hailo8_1",
    inference_host_address=inference_host_address
    zoo_url=zoo_url,
    token=token,
)

# Display and inference loop
with degirum_tools.Display("USB Cam 1") as display1, degirum_tools.Display("USB Cam 2") as display2:
    for result1, result2 in zip(
        degirum_tools.predict_stream(model1, sources[0]),
        degirum_tools.predict_stream(model2, sources[1])
    ):
        display1.show(result1)
        display2.show(result2)
1 Like

Hi @omria,
Sorry for the late reply. I’m currently experimenting with using 5 USB cameras with 5 different models on a Raspberry Pi 5 with AI HAT+ for my project. For now, I’m starting with 2 cameras and 2 models to observe performance. I’m not sure if the 26 TOPS from the AI HAT+ and the 8GB RAM of the Pi 5 will be enough to handle it. Thank you for your attention. If you have any information related to running multiple cameras and models simultaneously, please share it with me. Thank you!

Hi @shashi
This is a great solution—thank you very much for your enthusiastic support. The code works as intended, but after running for a while, it stops and throws the error:
“[HailoRT] [critical] Executing pipeline terminate failed with status HAILO_RPC_FAILED(77)”.
I’m not sure how to resolve this issue on the Pi 5 with 8GB RAM and AI HAT+ (26 TOPS). I would really appreciate your feedback. Thank you!

Hi @The_Nguyen
Glad to hear it is working for you. Regarding crashing after a while: can you let us know the kernel version? In the past few weeks, we saw users complaining that kernel 6.25 is causing stability issues.

Hi @shashi
Thanks you for your response. The kernel version is 6.12.25. if you have any suggestions for a more stable version, I will consider switching to ensure system stability. I look forward to your recommendation. Thanks you so much!

Hi @The_Nguyen
This is the official thread on the kernel issue and instructions: Raspberry Pi Kernel Compatibility Issue - Temporary Fix - General - Hailo Community

1 Like

Hi @shashi
Everything is working very well now. Thank you for your enthusiastic support.

1 Like

Hi @The_Nguyen
Glad to hear. You can adapt the code to your 5 camera, 5 model use case you mentioned as well. Let us know if you encounter any issues. Please note that there will be performance degradation due to switching the models. You may also run into weak host problem as RPi5 is not a very powerful host.

1 Like

Hi @shashi,
Sorry to bother you, but I would like to deploy 5 cameras along with 5 different models running on ROS 2 Humble. I have completely built ROS 2 Humble from source. However, I haven’t been able to find any documentation or resources about deploying Hailo-8 with ROS 2. Could you please provide me with some information or guidance on this? Thank you very much.

Hi @The_Nguyen
We are not familiar with the ROS 2 Humble ecosystem. Maybe the Hailo team can help.

1 Like

hi @shashi,
I’m currently running 3 cameras and 3 AI models using the DeGirum PySDK, and everything is working fine. Now I plan to scale up to 5 cameras, but I want to reduce CPU usage. Do you have any suggestions for optimization or performance improvements?
Below is 3 camera and cpu when running.

Hi @The_Nguyen,

It’s great to know that you were able to run 3 cameras with 3 AI models using DeGirum PySDK. Here’s an optimized approach to reduce CPU usage while you scale to 5 cameras, or even more.

You can use dg_streams gizmos which is optimized for running multi-camera inference. You can find a detailed guide here: PySDKExamples/examples/dgstreams/multi_camera_multi_model_detection.ipynb at main · DeGirum/PySDKExamples

Below is a quick example to run gizmos and test CPU usage.

  1. Run the below script. Change the model names and video source as per your requirement:
import degirum as dg, degirum_tools
from degirum_tools import streams as dgstreams
import time

# === Configuration ===
video_sources =  [
    "https://raw.githubusercontent.com/DeGirum/PySDKExamples/main/images/WalkingPeople.mp4",
    "https://raw.githubusercontent.com/DeGirum/PySDKExamples/main/images/Parking.mp4",
    "https://raw.githubusercontent.com/DeGirum/PySDKExamples/main/images/Traffic.mp4",
    "https://raw.githubusercontent.com/DeGirum/PySDKExamples/main/images/Traffic2.mp4",
    "https://raw.githubusercontent.com/DeGirum/PySDKExamples/main/images/TrafficHD.mp4",
    "https://raw.githubusercontent.com/DeGirum/PySDKExamples/main/images/WalkingPeople2.mp4",
    "https://raw.githubusercontent.com/DeGirum/PySDKExamples/main/images/example_video.mp4",
    "https://raw.githubusercontent.com/DeGirum/PySDKExamples/main/images/faces_and_gender.mp4",
    "https://raw.githubusercontent.com/DeGirum/PySDKExamples/main/images/person_face_hand.mp4",
    "https://raw.githubusercontent.com/DeGirum/PySDKExamples/main/images/person_pose.mp4",
]


model_name = "yolov8n_relu6_coco--640x640_quant_hailort_hailo8_1"
model_zoo_url = "degirum/models_hailort"
hw_location = "@local"
token = ""

# === Load the model once ===
model = dg.load_model(
    model_name=model_name,
    inference_host_address=hw_location,
    zoo_url=model_zoo_url,
    token=token,
    overlay_line_width=2
)

# === Create all pipeline components ===
sources = [dgstreams.VideoSourceGizmo(src, stop_composition_on_end=False) for src in video_sources]
resizers = [dgstreams.AiPreprocessGizmo(model) for _ in video_sources]
detectors = [dgstreams.AiSimpleGizmo(model) for _ in video_sources]
win_captions = [f"Stream #{i+1}" for i in range(len(video_sources))]

display = dgstreams.VideoDisplayGizmo(
    win_captions, show_ai_overlay=True, show_fps=True, multiplex=True
)

# === Compose full pipeline ===
pipeline = [
    source >> resizer >> detector >> display
    for source, resizer, detector in zip(sources, resizers, detectors)
]

composition = dgstreams.Composition(*pipeline)
composition.start()
print("Composition started. Running for 100 inference steps...")

# === Counter-based exit ===
max_steps = 100
counter = 0
while counter < max_steps:
    counter += 1
    dgstreams.pump_all(1)  # runs 1 inference step
    # You could print or log here if needed

# === Stop ===
composition.stop()
print(f"✅ Stopped after {counter} steps.")

  1. Open another terminal and run this script to measure CPU usage while running above script:
cpu_samples = []
start_time = time.time()
while time.time() - start_time < duration:
    usage = psutil.cpu_percent(interval=1)
    cpu_samples.append(usage)

avg_cpu = round(sum(cpu_samples) / len(cpu_samples), 2)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

file_exists = os.path.exists(output_file)
with open(output_file, mode="a", newline="") as f:
    writer = csv.writer(f)
    if not file_exists:
        writer.writerow(["timestamp", "label", "avg_cpu"])
    writer.writerow([timestamp, label, avg_cpu])

print(f"Average CPU usage using {label}: {avg_cpu}%")

The above script will measure CPU usage per second and provide avg_cpu usage at the end of the duration specified.


Hi Shashi

I modified the source

"    },\n",
     "    {\n",
     "        \"model_name\": \"yolov8n_relu6_face--640x640_quant_hailort_multidevice_1\",\n",
-    "        \"source\": 0,  # Webcam index\n",
+    "        \"source\": \"v4l2src device=/dev/video0 ! video/x-raw,format=UYVY,width=1280,height=720,framerate=30/1 ! videoconvert ! appsink sync=0\",  # Webcam index\n",
     "        \"display_name\": \"Webcam Feed\",\n",
     "    },\n",
     "]\n",

I use 006 example. And I have test our camera in gstreamer. But I got Exception.
Does it support openCV?

export DISPLAY=:0
gst-lanuch-1.0 v4l2src device=/dev/video0 ! video/x-raw,format=UYVY,width=1280,height=720,framerate=30/1 ! videoconvert ! appsink sync=0 => Success

gst-lanuch-1.0 v4l2src device=/dev/video0 ! video/x-raw,format=UYVY,width=1280,height=720,framerate=30/1 ! videoconvert ! autovideosink sync=0 => Success

Hi @bernie_chen
The source as a gstreamer string is not natively supported yet in our degirum_toolsand we are currently working on it. In the meantime, we will provide you a way to define the source as a generator outside and pass it to the predict function. @Darshil_Modi will help you.

Hi shashi

Thanks your support
I have successfully use hailo official example to implement dual camera. But I still want to know how to use degirum_tools to implement dual camera.

@Darshil_Modi
Would you mind give me a hand? Thank you.

Hi Bernie,

You can create a custom video generator function to fetch frames using gstreamer and then yield it to predict_batch function. You can find our detailed guide on how to use custom video generators here

For your usecase, we will use pygobject to fetch frames using gstreamer inside our custom video generator. It will require you to install pygobject which is a python binding which provides bindings for gstreamer etc. To install pygobject, follow below steps:

sudo apt install python3-gi python3-gi-cairo
pip install PyGObject

Here’s an illustration on how to use gstreamer along with pygobject using custom video generator.

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
import numpy as np
import cv2
import degirum as dg

# Initialize GStreamer
Gst.init(None)

# Load Hailo model 
model = dg.load_model(
    model_name="yolov8n_coco--640x640_quant_hailort_multidevice_1",
    inference_host_address="@local",
    zoo_url="degirum/hailo"
)

# Custom generator using PyGObject
def custom_video_generator():
    pipeline_str = (
        "v4l2src device=/dev/video0 ! "
        "videoconvert ! video/x-raw, format=BGR ! "
        "appsink name=sink emit-signals=true max-buffers=1 drop=true"
    )
    pipeline = Gst.parse_launch(pipeline_str)
    sink = pipeline.get_by_name("sink")

    pipeline.set_state(Gst.State.PLAYING)

    try:
        while True:
            sample = sink.emit("pull-sample")
            if sample:
                buf = sample.get_buffer()
                caps = sample.get_caps()
                width = caps.get_structure(0).get_value("width")
                height = caps.get_structure(0).get_value("height")

                success, map_info = buf.map(Gst.MapFlags.READ)
                if not success:
                    continue

                frame = np.frombuffer(map_info.data, np.uint8).reshape((height, width, 3))
                buf.unmap(map_info)

                yield frame
            else:
                break
    finally:
        pipeline.set_state(Gst.State.NULL)

# Run prediction and display
for result in model.predict_batch(custom_video_generator()):
    cv2.imshow("Webcam Inference", result.image_overlay)
    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

cv2.destroyAllWindows()

Please let me know if you face any issues in implementing this solution.