Multicamera inference

Hello,
I created this code to perform inference on frames from two Arducam cameras. I’m running inference in batches of two images—one from each camera. The code works well, but before running inference, I’m getting 54 FPS. After performing inference on the two images, the frame rate drops to 28 FPS.
My model is a custom YOLOv8n with two classes, and I’m using a Raspberry Pi CM4. What do you suggest I can do to increase the inference rate?
Thanks!

from picamera2 import Picamera2
from picamera2.devices import Hailo
import numpy as np
import time
import cv2


def extract_detections(hailo_output, w, h, class_names, cam_id, threshold=0.5):
    """Extract detections from the HailoRT-postprocess output."""
    results = []
    for class_id, detections in enumerate(hailo_output):
        for detection in detections:
            detection_array = np.array(detection)
            score = detection_array[4]
            if score >= threshold:
                y0, x0, y1, x1 = detection_array[:4]
                bbox = (int(x0 * w), int(y0 * h), int(x1 * w), int(y1 * h))
                results.append([class_names[class_id], bbox, score])
                print(
                    f"Detection(s) found for class '{class_names[class_id]}', Score: {score:.2f}, Camera: {cam_id}"
                )
    return results


if __name__ == "__main__":

    video_w, video_h = 1920, 1080
    model = "yolov8n_D4.hef"
    labels = "labels.txt"
    score_thresh = 0.5

    hailo = Hailo(model, 2)
    model_h, model_w, _ = hailo.get_input_shape()

    try:
        with open(labels, "r", encoding="utf-8") as f:
            class_names = f.read().splitlines()
    except FileNotFoundError as file_error:
        print(f"Error: The labels file '{labels}' was not found.")
        raise file_error
    except IOError as io_error:
        print(f"Error: There was an issue reading the labels file '{labels}'")
        raise io_error
    except Exception as e:
        print(f"Unexpected error while loading labels from '{labels}'")
        raise e

    detections = None

    picam2a = Picamera2(0)
    picam2b = Picamera2(1)

    # Configure camera A
    main_a = {'size': (video_w, video_h), 'format': 'XRGB8888'}
    lores_a = {'size': (model_w, model_h), 'format': 'YUV420'}
    controls_a = {
        'FrameRate': 60,
        "AfMode": 0,          # Manual AF mode
        "NoiseReductionMode": 0,  # Noise reduction off
        "LensPosition": 6.0,
    }
    config_a = picam2a.create_preview_configuration(
        main_a, lores=lores_a, controls=controls_a)
    picam2a.configure(config_a)

    # Configure camera B
    main_b = {'size': (video_w, video_h), 'format': 'XRGB8888'}
    lores_b = {'size': (model_w, model_h), 'format': 'YUV420'}
    controls_b = {
        'FrameRate': 60,
        "AfMode": 0,
        "NoiseReductionMode": 0,
        "LensPosition": 6.0,
    }
    config_b = picam2b.create_preview_configuration(
        main_b, lores=lores_b, controls=controls_b)
    picam2b.configure(config_b)

    picam2a.start()
    picam2b.start()

    input_data = np.empty((2, model_h, model_w, 3), dtype=np.uint8)

    start_time = time.time()

    while True:
        frame_a = picam2a.capture_array('lores')
        frame_b = picam2b.capture_array('lores')

        rgb_a = cv2.cvtColor(frame_a, cv2.COLOR_YUV420p2RGB)
        rgb_b = cv2.cvtColor(frame_b, cv2.COLOR_YUV420p2RGB)

        input_data[0] = rgb_a
        input_data[1] = rgb_b

        results = hailo.run(input_data)

        for cam_id, result in enumerate(results):
            detections = extract_detections(
                result, video_w, video_h, class_names, cam_id, score_thresh
            )

Hey @dgarrido,

You’re currently using blocking inference with:

hailo.run(input_data)

That means each inference call waits until the result is ready before continuing — which creates idle time and limits your FPS.

To improve performance, you should move to asynchronous inference. Hailo supports this in two main ways:

  1. Raspberry Pi Integration (Async with Picamera2 + Hailo module)
    The Raspberry Pi Hailo integration lets you run async inference natively:

    from picamera2 import Picamera2
    from picamera2.devices import Hailo
    
    picam2 = Picamera2()
    picam2.start()
    
    with Hailo("model.hef") as hailo:
        future = hailo.run_async(frame)  # non-blocking
        # do other processing...
        results = future.result()  # get the result when needed
    

    This approach allows you to keep your camera running while inference is being processed in the background.

  2. Native HailoRT Async API
    If you want more control, the lower-level HailoRT async API gives you full async handling. You can find an example here:
    :backhand_index_pointing_right: Hailo-Application-Code-Examples/runtime/python/utils.py at main · hailo-ai/Hailo-Application-Code-Examples · GitHub


In your current setup, capture and inference are happening one after the other — frame capture, color conversion, then inference. This serial flow means the Hailo device often sits idle, waiting.

To fix this, you should separate capture and inference into parallel threads. One thread continuously captures frames into a queue, and another thread pulls from that queue and runs inference. This keeps both the camera and Hailo busy and improves overall throughput.

Another thing to look at is image preprocessing. You’re using:

cv2.cvtColor(frame_a, cv2.COLOR_YUV420p2RGB)

This is expensive on the Pi. If your camera supports it, switch to direct RGB output (format='RGB888') to skip this step. If not, consider using OpenCV’s UMat for faster, hardware-accelerated conversion.

Also, Logging every detection like:

print(f"Detection(s) found for class '{class_names[class_id]}', Score: {score:.2f}, Camera: {cam_id}")

can slow things down on a Pi. Try throttling the logs or disabling them in your hot loop.

Lastly, you’re already using batch size 2 — which is solid. But depending on memory constraints, you might be able to push it to 4, 8, or even 16, and see a boost in FPS.

1 Like

Hi @omria,
I am using the rpi5 + hailo HAT for detection, and the vision provided by IP cam. Managed to refer from other posts on modifying the code for RTSP support, but couldnt find anything related to add in multiple RTSP streams for detection.

Found the most relevant piece of information where there is a QUEUE function in gstreamer_helper_pipelines.py but did not see any examples that could help with. Is this function any relevant to my case?

Also from the community post, DeGirum/hailo_examples does this library actually works better for my use case?

I am confused because it seems hailo-ai/hailo-rpi5-examples has clearer examples for use with raspberry pi

Hi @Wey
Welcome to the Hailo community. Can you clarify what you mean by “it seems hailo-ai/hailo-rpi5-examples has clearer examples for use with raspberry pi”. Are there any examples in degirum/hailo_examples repo that are not clear or not working for you?

Hi @shashi
Apologies for my confusing post. Let me rewrite.
For hailo-ai/hailo-rpi5-examples repo:

  1. hailo-aps-infra/doc/developement_guide.md mentioned that Gstreamer runs seamlessly for use in rpi5, which I assumed the video pipeline is handled efficiently.
  2. also by running the example with modification to accept RTSP stream (IP Cam), got 30fps for object detection.
  3. no blackout/stuttering frames while tested, everything good so far
  4. no example available for multiple camera view inferences (big trouble for me)

For degirum/hailo_examples repo:

  1. tried the rtsp.ipynb, although manage to get hailo running for object detection but it is only at ~15fps.
  2. while running detection, got the black out(grayed) frames on and off.
  3. saw similar example to achieve multiple camera view (006_multi_threading.ipynb), but worrying on further fps drops.

I am having difficulties on which repo to stick to. Ideally what I am trying to do here is to have rpi5 + hailo8 to do the job for vehicle type detection (and then LPR) on 2-4 IP cam views. Also having found out the differences of fps with just single view between the two repos make me worry that might need huge effort for optimization in future.
Of course the other possibility is to have one rpi setup for a single IP cam view, but that would be very bad for cost effectiveness.
Newbie here, so need some guidance… Appreciated.

Hi @Wey
Thanks for the details. Much appreciated. PySDK has been tested to make sure that there is no performance degradation compared to native runtime. In fact, PySDK is just a wrapper and underneath use HailoRT. We can troubleshoot why you are seeing FPS drop. I will reach out to you via PM.

Hi @omria , thank you very much for your response.
Using your recommendations, I wrote this code, but this time managing only one camera.
I used the threading library to handle capture and inference in concurrent processes. This was the best solution I found so far, since according to my project’s constraints, I can only use one core of the Raspberry Pi’s CPU — the rest is reserved for other tasks.

After testing, it turns out that capture and inference now run at almost the same frame rate. In other words, the significant delays that occurred when the process was sequential are no longer present.

On the other hand, I implemented Hailo’s asynchronous inference, but since I’m doing real-time detection, I need the results immediately after calling hailo.run_async, so the true benefits of asynchronous execution are not being leveraged. So, my question is: in my case, does it make sense to use Hailo async?
How could I implement it in my current code?

Thanks again — I look forward to your response.

import argparse
import cv2
import time
import threading
import queue
from picamera2 import MappedArray, Picamera2, Preview
from picamera2.devices import Hailo
import numpy as np
from control_settings_in_yaml import generate_controls_from_yaml


COLOR_MAP = {
    "person": (0, 0, 255),      # Blue
    "gun": (255, 0, 0),         # Red
    "cellphone": (0, 255, 0),   # Green
    "knife": (255, 255, 0),     # Yellow
    "backpack": (255, 0, 255),  # Magenta
    "bottle": (0, 255, 255),    # Cyan
}
DEFAULT_COLOR = (0, 0, 0)
FONT = cv2.FONT_HERSHEY_SIMPLEX
FPS_POSITION_CAPTURE = (20, 30)
FPS_POSITION_INFERENCE = (20, 60)
FPS_COLOR = (0, 255, 0)
FPS_TEXT_SIZE = 1.0
FPS_THICKNESS = 2
LABEL_TEXT_SIZE = 0.5
LABEL_THICKNESS = 1
BOX_THICKNESS = 2

# Threading control
shutdown_event = threading.Event()

# Shared state (protected by locks where needed)
detections = None
detections_lock = threading.Lock()
fps_inference = 0.0
fps_capture = 0.0
fps_lock = threading.Lock()

# Queue for processed detections
processed_detections_queue = queue.Queue(maxsize=50)


def extract_detections(hailo_output, w, h, class_names, threshold=0.3):
    """Extract detections from the HailoRT-postprocess output - optimized version."""
    results = []

    # Pre-calculate multipliers for width and height conversion
    w_factor = w
    h_factor = h

    for class_id, detections_array in enumerate(hailo_output):
        class_name = class_names[class_id]

        if len(detections_array) == 0:
            continue

        detection_arrays = np.array(detections_array)

        valid_indices = detection_arrays[:, 4] >= threshold
        valid_detections = detection_arrays[valid_indices]

        for detection in valid_detections:
            y0, x0, y1, x1, score = detection[:5]
            bbox = (int(x0 * w_factor), int(y0 * h_factor),
                    int(x1 * w_factor), int(y1 * h_factor))
            results.append([class_name, bbox, score])

    return results


def capture_thread(picam2, frame_queue):
    """Continuously capture frames and put them in the queue."""
    global fps_capture

    frame_count = 0
    last_fps_update = time.time()

    while not shutdown_event.is_set():
        try:
            frame = picam2.capture_array('lores')
            rgb = cv2.cvtColor(frame, cv2.COLOR_YUV420p2RGB)

            try:
                frame_queue.put(rgb, block=False)
                frame_count += 1
            except queue.Full:
                frame_count += 1

            # Update capture FPS every 2 seconds
            current_time = time.time()
            if current_time - last_fps_update >= 2.0:
                time_elapsed = current_time - last_fps_update
                calculated_fps = frame_count / time_elapsed if time_elapsed > 0 else 0
                with fps_lock:
                    fps_capture = calculated_fps
                frame_count = 0
                last_fps_update = current_time

        except Exception as e:
            print(f"Error in capture thread: {e}")
            break


def inference_thread(hailo, frame_queue, video_w, video_h, class_names, score_thresh, batch_size):
    """Continuously process frames in batches through inference."""
    global fps_inference

    frame_count = 0
    last_fps_update = time.time()

    while not shutdown_event.is_set():
        batch_frames = []

        # Adaptive batch collection - don't wait too long for full batches
        start_batch_time = time.time()
        timeout_per_frame = 0.05  # 50ms max wait per frame

        try:
            for _ in range(batch_size):
                elapsed = time.time() - start_batch_time
                remaining_timeout = max(
                    0.01, timeout_per_frame - elapsed/batch_size)

                frame = frame_queue.get(timeout=remaining_timeout)
                batch_frames.append(frame)
        except queue.Empty:
            if not batch_frames:
                continue

        try:
            # Convert list of frames to numpy array for batch processing
            if batch_frames:
                # Shape: (batch_size, height, width, channels)
                batch_array = np.stack(batch_frames)

                # Run batch inference
                future = hailo.run_async(batch_frames)
                results_batch = future.result()

                # Process ALL batch results, not just the last one
                batch_detections = []
                for results in results_batch:
                    new_detections = extract_detections(
                        results, video_w, video_h, class_names, score_thresh
                    )
                    batch_detections.append(new_detections)

                # Put all detection results in the processed queue
                for detection_result in batch_detections:
                    try:
                        processed_detections_queue.put(
                            detection_result, block=False)
                    except queue.Full:
                        # If queue is full, remove oldest detection and add new one
                        try:
                            processed_detections_queue.get_nowait()
                            processed_detections_queue.put(
                                detection_result, block=False)
                        except queue.Empty:
                            pass

                # Mark frames as processed
                for _ in batch_frames:
                    frame_queue.task_done()

                # Update FPS calculation
                frame_count += len(batch_frames)
                current_time = time.time()
                if current_time - last_fps_update >= 2.0:
                    time_elapsed = current_time - last_fps_update
                    final_fps = frame_count / time_elapsed if time_elapsed > 0 else 0
                    with fps_lock:
                        fps_inference = final_fps
                    frame_count = 0
                    last_fps_update = current_time

        except Exception as e:
            print(f"Error in inference thread: {e}")
            for _ in batch_frames:
                frame_queue.task_done()


def detection_update_thread():
    """Thread to continuously update the current detections from processed results."""
    global detections

    while not shutdown_event.is_set():
        try:
            # Get the latest detection result
            new_detection = processed_detections_queue.get(timeout=0.1)

            # Update the global detections
            with detections_lock:
                detections = new_detection

            time.sleep(0.01)

        except queue.Empty:
            continue
        except Exception as e:
            print(f"Error in detection update thread: {e}")


def draw_objects(request):
    """Draw detection results on the frame."""
    global detections, fps_inference, fps_capture

    with detections_lock:
        current_detections = detections

    with fps_lock:
        current_fps_inference = fps_inference
        current_fps_capture = fps_capture

    with MappedArray(request, "main") as m:
        # Display capture FPS
        fps_capture_text = f"Capture FPS: {current_fps_capture:.1f}"
        cv2.putText(
            m.array,
            fps_capture_text,
            FPS_POSITION_CAPTURE,
            FONT,
            FPS_TEXT_SIZE,
            FPS_COLOR,
            FPS_THICKNESS,
            cv2.LINE_AA,
        )

        # Display inference FPS
        fps_inference_text = f"Inference FPS: {current_fps_inference:.1f}"
        cv2.putText(
            m.array,
            fps_inference_text,
            FPS_POSITION_INFERENCE,
            FONT,
            FPS_TEXT_SIZE,
            FPS_COLOR,
            FPS_THICKNESS,
            cv2.LINE_AA,
        )

        if current_detections is not None and len(current_detections) > 0:
            for class_name, bbox, score in current_detections:
                x0, y0, x1, y1 = bbox
                label = f"{class_name} %{int(score * 100)}"
                color = COLOR_MAP.get(class_name, DEFAULT_COLOR)

                cv2.rectangle(m.array, (x0, y0), (x1, y1),
                              color, BOX_THICKNESS)
                cv2.putText(
                    m.array,
                    label,
                    (x0 + 5, max(15, y0 + 15)),
                    FONT,
                    LABEL_TEXT_SIZE,
                    color,
                    LABEL_THICKNESS,
                    cv2.LINE_AA,
                )


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Record a video with Picamera2 and perform object detection using parallel processing."
    )
    parser.add_argument("--width", type=int, default=1920,
                        help="Width of the video")
    parser.add_argument("--height", type=int, default=1080,
                        help="Height of the video")
    parser.add_argument(
        "--config_file_path",
        type=str,
        default="config.yaml",
        help="Configuration file path",
    )
    parser.add_argument(
        "-m", "--model", help="Path for the HEF model.", default="yolov8n_D13_quantized_model.hef"
    )
    parser.add_argument(
        "-l",
        "--labels",
        default="coco_6.txt",
        help="Path to a text file containing labels.",
    )
    parser.add_argument(
        "-s",
        "--score_thresh",
        type=float,
        default=0.3,
        help="Score threshold, must be a float between 0 and 1.",
    )
    parser.add_argument(
        "--queue_size",
        type=int,
        default=12,
        help="Maximum size of the frame queue",
    )
    parser.add_argument(
        "-b", "--batch_size",
        type=int,
        default=2,
        help="Batch size for inference"
    )

    args = parser.parse_args()
    video_w = args.width
    video_h = args.height
    score_thresh = args.score_thresh
    labels = args.labels
    model = args.model
    batch_size = args.batch_size
    MAX_QUEUE_SIZE = max(args.queue_size, batch_size)

    with Hailo(model, batch_size) as hailo:
        model_h, model_w, _ = hailo.get_input_shape()

        with open(labels, "r", encoding="utf-8") as f:
            class_names = f.read().splitlines()

        frame_queue = queue.Queue(maxsize=MAX_QUEUE_SIZE)

        with Picamera2() as picam2:
            picam2.video_configuration.main.size = (video_w, video_h)
            main = {'size': (video_w, video_h), 'format': 'XBGR8888'}
            lores = {'size': (model_w, model_h), 'format': 'YUV420'}
            config = picam2.create_preview_configuration(main, lores=lores)
            picam2.configure(config)

            camera_control_dict = generate_controls_from_yaml(
                args.config_file_path)
            picam2.set_controls(camera_control_dict)

            picam2.start_preview(Preview.QTGL, x=0, y=0,
                                 width=video_w, height=video_h)

            picam2.start()
            picam2.pre_callback = draw_objects

            # Start all worker threads
            capture_worker = threading.Thread(
                target=capture_thread,
                args=(picam2, frame_queue),
                daemon=True
            )

            inference_worker = threading.Thread(
                target=inference_thread,
                args=(hailo, frame_queue, video_w,
                      video_h, class_names, score_thresh, batch_size),
                daemon=True
            )

            detection_updater = threading.Thread(
                target=detection_update_thread,
                daemon=True
            )

            capture_worker.start()
            inference_worker.start()
            detection_updater.start()

            try:
                while True:
                    time.sleep(1)

            except KeyboardInterrupt:
                print("\nShutting down...")
                shutdown_event.set()

                capture_worker.join(timeout=2.0)
                inference_worker.join(timeout=2.0)
                detection_updater.join(timeout=2.0)

                picam2.stop()

                print("Shutdown complete.")