Multicamera inference

Hi @omria , thank you very much for your response.
Using your recommendations, I wrote this code, but this time managing only one camera.
I used the threading library to handle capture and inference in concurrent processes. This was the best solution I found so far, since according to my project’s constraints, I can only use one core of the Raspberry Pi’s CPU — the rest is reserved for other tasks.

After testing, it turns out that capture and inference now run at almost the same frame rate. In other words, the significant delays that occurred when the process was sequential are no longer present.

On the other hand, I implemented Hailo’s asynchronous inference, but since I’m doing real-time detection, I need the results immediately after calling hailo.run_async, so the true benefits of asynchronous execution are not being leveraged. So, my question is: in my case, does it make sense to use Hailo async?
How could I implement it in my current code?

Thanks again — I look forward to your response.

import argparse
import cv2
import time
import threading
import queue
from picamera2 import MappedArray, Picamera2, Preview
from picamera2.devices import Hailo
import numpy as np
from control_settings_in_yaml import generate_controls_from_yaml


COLOR_MAP = {
    "person": (0, 0, 255),      # Blue
    "gun": (255, 0, 0),         # Red
    "cellphone": (0, 255, 0),   # Green
    "knife": (255, 255, 0),     # Yellow
    "backpack": (255, 0, 255),  # Magenta
    "bottle": (0, 255, 255),    # Cyan
}
DEFAULT_COLOR = (0, 0, 0)
FONT = cv2.FONT_HERSHEY_SIMPLEX
FPS_POSITION_CAPTURE = (20, 30)
FPS_POSITION_INFERENCE = (20, 60)
FPS_COLOR = (0, 255, 0)
FPS_TEXT_SIZE = 1.0
FPS_THICKNESS = 2
LABEL_TEXT_SIZE = 0.5
LABEL_THICKNESS = 1
BOX_THICKNESS = 2

# Threading control
shutdown_event = threading.Event()

# Shared state (protected by locks where needed)
detections = None
detections_lock = threading.Lock()
fps_inference = 0.0
fps_capture = 0.0
fps_lock = threading.Lock()

# Queue for processed detections
processed_detections_queue = queue.Queue(maxsize=50)


def extract_detections(hailo_output, w, h, class_names, threshold=0.3):
    """Extract detections from the HailoRT-postprocess output - optimized version."""
    results = []

    # Pre-calculate multipliers for width and height conversion
    w_factor = w
    h_factor = h

    for class_id, detections_array in enumerate(hailo_output):
        class_name = class_names[class_id]

        if len(detections_array) == 0:
            continue

        detection_arrays = np.array(detections_array)

        valid_indices = detection_arrays[:, 4] >= threshold
        valid_detections = detection_arrays[valid_indices]

        for detection in valid_detections:
            y0, x0, y1, x1, score = detection[:5]
            bbox = (int(x0 * w_factor), int(y0 * h_factor),
                    int(x1 * w_factor), int(y1 * h_factor))
            results.append([class_name, bbox, score])

    return results


def capture_thread(picam2, frame_queue):
    """Continuously capture frames and put them in the queue."""
    global fps_capture

    frame_count = 0
    last_fps_update = time.time()

    while not shutdown_event.is_set():
        try:
            frame = picam2.capture_array('lores')
            rgb = cv2.cvtColor(frame, cv2.COLOR_YUV420p2RGB)

            try:
                frame_queue.put(rgb, block=False)
                frame_count += 1
            except queue.Full:
                frame_count += 1

            # Update capture FPS every 2 seconds
            current_time = time.time()
            if current_time - last_fps_update >= 2.0:
                time_elapsed = current_time - last_fps_update
                calculated_fps = frame_count / time_elapsed if time_elapsed > 0 else 0
                with fps_lock:
                    fps_capture = calculated_fps
                frame_count = 0
                last_fps_update = current_time

        except Exception as e:
            print(f"Error in capture thread: {e}")
            break


def inference_thread(hailo, frame_queue, video_w, video_h, class_names, score_thresh, batch_size):
    """Continuously process frames in batches through inference."""
    global fps_inference

    frame_count = 0
    last_fps_update = time.time()

    while not shutdown_event.is_set():
        batch_frames = []

        # Adaptive batch collection - don't wait too long for full batches
        start_batch_time = time.time()
        timeout_per_frame = 0.05  # 50ms max wait per frame

        try:
            for _ in range(batch_size):
                elapsed = time.time() - start_batch_time
                remaining_timeout = max(
                    0.01, timeout_per_frame - elapsed/batch_size)

                frame = frame_queue.get(timeout=remaining_timeout)
                batch_frames.append(frame)
        except queue.Empty:
            if not batch_frames:
                continue

        try:
            # Convert list of frames to numpy array for batch processing
            if batch_frames:
                # Shape: (batch_size, height, width, channels)
                batch_array = np.stack(batch_frames)

                # Run batch inference
                future = hailo.run_async(batch_frames)
                results_batch = future.result()

                # Process ALL batch results, not just the last one
                batch_detections = []
                for results in results_batch:
                    new_detections = extract_detections(
                        results, video_w, video_h, class_names, score_thresh
                    )
                    batch_detections.append(new_detections)

                # Put all detection results in the processed queue
                for detection_result in batch_detections:
                    try:
                        processed_detections_queue.put(
                            detection_result, block=False)
                    except queue.Full:
                        # If queue is full, remove oldest detection and add new one
                        try:
                            processed_detections_queue.get_nowait()
                            processed_detections_queue.put(
                                detection_result, block=False)
                        except queue.Empty:
                            pass

                # Mark frames as processed
                for _ in batch_frames:
                    frame_queue.task_done()

                # Update FPS calculation
                frame_count += len(batch_frames)
                current_time = time.time()
                if current_time - last_fps_update >= 2.0:
                    time_elapsed = current_time - last_fps_update
                    final_fps = frame_count / time_elapsed if time_elapsed > 0 else 0
                    with fps_lock:
                        fps_inference = final_fps
                    frame_count = 0
                    last_fps_update = current_time

        except Exception as e:
            print(f"Error in inference thread: {e}")
            for _ in batch_frames:
                frame_queue.task_done()


def detection_update_thread():
    """Thread to continuously update the current detections from processed results."""
    global detections

    while not shutdown_event.is_set():
        try:
            # Get the latest detection result
            new_detection = processed_detections_queue.get(timeout=0.1)

            # Update the global detections
            with detections_lock:
                detections = new_detection

            time.sleep(0.01)

        except queue.Empty:
            continue
        except Exception as e:
            print(f"Error in detection update thread: {e}")


def draw_objects(request):
    """Draw detection results on the frame."""
    global detections, fps_inference, fps_capture

    with detections_lock:
        current_detections = detections

    with fps_lock:
        current_fps_inference = fps_inference
        current_fps_capture = fps_capture

    with MappedArray(request, "main") as m:
        # Display capture FPS
        fps_capture_text = f"Capture FPS: {current_fps_capture:.1f}"
        cv2.putText(
            m.array,
            fps_capture_text,
            FPS_POSITION_CAPTURE,
            FONT,
            FPS_TEXT_SIZE,
            FPS_COLOR,
            FPS_THICKNESS,
            cv2.LINE_AA,
        )

        # Display inference FPS
        fps_inference_text = f"Inference FPS: {current_fps_inference:.1f}"
        cv2.putText(
            m.array,
            fps_inference_text,
            FPS_POSITION_INFERENCE,
            FONT,
            FPS_TEXT_SIZE,
            FPS_COLOR,
            FPS_THICKNESS,
            cv2.LINE_AA,
        )

        if current_detections is not None and len(current_detections) > 0:
            for class_name, bbox, score in current_detections:
                x0, y0, x1, y1 = bbox
                label = f"{class_name} %{int(score * 100)}"
                color = COLOR_MAP.get(class_name, DEFAULT_COLOR)

                cv2.rectangle(m.array, (x0, y0), (x1, y1),
                              color, BOX_THICKNESS)
                cv2.putText(
                    m.array,
                    label,
                    (x0 + 5, max(15, y0 + 15)),
                    FONT,
                    LABEL_TEXT_SIZE,
                    color,
                    LABEL_THICKNESS,
                    cv2.LINE_AA,
                )


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Record a video with Picamera2 and perform object detection using parallel processing."
    )
    parser.add_argument("--width", type=int, default=1920,
                        help="Width of the video")
    parser.add_argument("--height", type=int, default=1080,
                        help="Height of the video")
    parser.add_argument(
        "--config_file_path",
        type=str,
        default="config.yaml",
        help="Configuration file path",
    )
    parser.add_argument(
        "-m", "--model", help="Path for the HEF model.", default="yolov8n_D13_quantized_model.hef"
    )
    parser.add_argument(
        "-l",
        "--labels",
        default="coco_6.txt",
        help="Path to a text file containing labels.",
    )
    parser.add_argument(
        "-s",
        "--score_thresh",
        type=float,
        default=0.3,
        help="Score threshold, must be a float between 0 and 1.",
    )
    parser.add_argument(
        "--queue_size",
        type=int,
        default=12,
        help="Maximum size of the frame queue",
    )
    parser.add_argument(
        "-b", "--batch_size",
        type=int,
        default=2,
        help="Batch size for inference"
    )

    args = parser.parse_args()
    video_w = args.width
    video_h = args.height
    score_thresh = args.score_thresh
    labels = args.labels
    model = args.model
    batch_size = args.batch_size
    MAX_QUEUE_SIZE = max(args.queue_size, batch_size)

    with Hailo(model, batch_size) as hailo:
        model_h, model_w, _ = hailo.get_input_shape()

        with open(labels, "r", encoding="utf-8") as f:
            class_names = f.read().splitlines()

        frame_queue = queue.Queue(maxsize=MAX_QUEUE_SIZE)

        with Picamera2() as picam2:
            picam2.video_configuration.main.size = (video_w, video_h)
            main = {'size': (video_w, video_h), 'format': 'XBGR8888'}
            lores = {'size': (model_w, model_h), 'format': 'YUV420'}
            config = picam2.create_preview_configuration(main, lores=lores)
            picam2.configure(config)

            camera_control_dict = generate_controls_from_yaml(
                args.config_file_path)
            picam2.set_controls(camera_control_dict)

            picam2.start_preview(Preview.QTGL, x=0, y=0,
                                 width=video_w, height=video_h)

            picam2.start()
            picam2.pre_callback = draw_objects

            # Start all worker threads
            capture_worker = threading.Thread(
                target=capture_thread,
                args=(picam2, frame_queue),
                daemon=True
            )

            inference_worker = threading.Thread(
                target=inference_thread,
                args=(hailo, frame_queue, video_w,
                      video_h, class_names, score_thresh, batch_size),
                daemon=True
            )

            detection_updater = threading.Thread(
                target=detection_update_thread,
                daemon=True
            )

            capture_worker.start()
            inference_worker.start()
            detection_updater.start()

            try:
                while True:
                    time.sleep(1)

            except KeyboardInterrupt:
                print("\nShutting down...")
                shutdown_event.set()

                capture_worker.join(timeout=2.0)
                inference_worker.join(timeout=2.0)
                detection_updater.join(timeout=2.0)

                picam2.stop()

                print("Shutdown complete.")