Hi @omria , thank you very much for your response.
Using your recommendations, I wrote this code, but this time managing only one camera.
I used the threading library to handle capture and inference in concurrent processes. This was the best solution I found so far, since according to my project’s constraints, I can only use one core of the Raspberry Pi’s CPU — the rest is reserved for other tasks.
After testing, it turns out that capture and inference now run at almost the same frame rate. In other words, the significant delays that occurred when the process was sequential are no longer present.
On the other hand, I implemented Hailo’s asynchronous inference, but since I’m doing real-time detection, I need the results immediately after calling hailo.run_async
, so the true benefits of asynchronous execution are not being leveraged. So, my question is: in my case, does it make sense to use Hailo async?
How could I implement it in my current code?
Thanks again — I look forward to your response.
import argparse
import cv2
import time
import threading
import queue
from picamera2 import MappedArray, Picamera2, Preview
from picamera2.devices import Hailo
import numpy as np
from control_settings_in_yaml import generate_controls_from_yaml
COLOR_MAP = {
"person": (0, 0, 255), # Blue
"gun": (255, 0, 0), # Red
"cellphone": (0, 255, 0), # Green
"knife": (255, 255, 0), # Yellow
"backpack": (255, 0, 255), # Magenta
"bottle": (0, 255, 255), # Cyan
}
DEFAULT_COLOR = (0, 0, 0)
FONT = cv2.FONT_HERSHEY_SIMPLEX
FPS_POSITION_CAPTURE = (20, 30)
FPS_POSITION_INFERENCE = (20, 60)
FPS_COLOR = (0, 255, 0)
FPS_TEXT_SIZE = 1.0
FPS_THICKNESS = 2
LABEL_TEXT_SIZE = 0.5
LABEL_THICKNESS = 1
BOX_THICKNESS = 2
# Threading control
shutdown_event = threading.Event()
# Shared state (protected by locks where needed)
detections = None
detections_lock = threading.Lock()
fps_inference = 0.0
fps_capture = 0.0
fps_lock = threading.Lock()
# Queue for processed detections
processed_detections_queue = queue.Queue(maxsize=50)
def extract_detections(hailo_output, w, h, class_names, threshold=0.3):
"""Extract detections from the HailoRT-postprocess output - optimized version."""
results = []
# Pre-calculate multipliers for width and height conversion
w_factor = w
h_factor = h
for class_id, detections_array in enumerate(hailo_output):
class_name = class_names[class_id]
if len(detections_array) == 0:
continue
detection_arrays = np.array(detections_array)
valid_indices = detection_arrays[:, 4] >= threshold
valid_detections = detection_arrays[valid_indices]
for detection in valid_detections:
y0, x0, y1, x1, score = detection[:5]
bbox = (int(x0 * w_factor), int(y0 * h_factor),
int(x1 * w_factor), int(y1 * h_factor))
results.append([class_name, bbox, score])
return results
def capture_thread(picam2, frame_queue):
"""Continuously capture frames and put them in the queue."""
global fps_capture
frame_count = 0
last_fps_update = time.time()
while not shutdown_event.is_set():
try:
frame = picam2.capture_array('lores')
rgb = cv2.cvtColor(frame, cv2.COLOR_YUV420p2RGB)
try:
frame_queue.put(rgb, block=False)
frame_count += 1
except queue.Full:
frame_count += 1
# Update capture FPS every 2 seconds
current_time = time.time()
if current_time - last_fps_update >= 2.0:
time_elapsed = current_time - last_fps_update
calculated_fps = frame_count / time_elapsed if time_elapsed > 0 else 0
with fps_lock:
fps_capture = calculated_fps
frame_count = 0
last_fps_update = current_time
except Exception as e:
print(f"Error in capture thread: {e}")
break
def inference_thread(hailo, frame_queue, video_w, video_h, class_names, score_thresh, batch_size):
"""Continuously process frames in batches through inference."""
global fps_inference
frame_count = 0
last_fps_update = time.time()
while not shutdown_event.is_set():
batch_frames = []
# Adaptive batch collection - don't wait too long for full batches
start_batch_time = time.time()
timeout_per_frame = 0.05 # 50ms max wait per frame
try:
for _ in range(batch_size):
elapsed = time.time() - start_batch_time
remaining_timeout = max(
0.01, timeout_per_frame - elapsed/batch_size)
frame = frame_queue.get(timeout=remaining_timeout)
batch_frames.append(frame)
except queue.Empty:
if not batch_frames:
continue
try:
# Convert list of frames to numpy array for batch processing
if batch_frames:
# Shape: (batch_size, height, width, channels)
batch_array = np.stack(batch_frames)
# Run batch inference
future = hailo.run_async(batch_frames)
results_batch = future.result()
# Process ALL batch results, not just the last one
batch_detections = []
for results in results_batch:
new_detections = extract_detections(
results, video_w, video_h, class_names, score_thresh
)
batch_detections.append(new_detections)
# Put all detection results in the processed queue
for detection_result in batch_detections:
try:
processed_detections_queue.put(
detection_result, block=False)
except queue.Full:
# If queue is full, remove oldest detection and add new one
try:
processed_detections_queue.get_nowait()
processed_detections_queue.put(
detection_result, block=False)
except queue.Empty:
pass
# Mark frames as processed
for _ in batch_frames:
frame_queue.task_done()
# Update FPS calculation
frame_count += len(batch_frames)
current_time = time.time()
if current_time - last_fps_update >= 2.0:
time_elapsed = current_time - last_fps_update
final_fps = frame_count / time_elapsed if time_elapsed > 0 else 0
with fps_lock:
fps_inference = final_fps
frame_count = 0
last_fps_update = current_time
except Exception as e:
print(f"Error in inference thread: {e}")
for _ in batch_frames:
frame_queue.task_done()
def detection_update_thread():
"""Thread to continuously update the current detections from processed results."""
global detections
while not shutdown_event.is_set():
try:
# Get the latest detection result
new_detection = processed_detections_queue.get(timeout=0.1)
# Update the global detections
with detections_lock:
detections = new_detection
time.sleep(0.01)
except queue.Empty:
continue
except Exception as e:
print(f"Error in detection update thread: {e}")
def draw_objects(request):
"""Draw detection results on the frame."""
global detections, fps_inference, fps_capture
with detections_lock:
current_detections = detections
with fps_lock:
current_fps_inference = fps_inference
current_fps_capture = fps_capture
with MappedArray(request, "main") as m:
# Display capture FPS
fps_capture_text = f"Capture FPS: {current_fps_capture:.1f}"
cv2.putText(
m.array,
fps_capture_text,
FPS_POSITION_CAPTURE,
FONT,
FPS_TEXT_SIZE,
FPS_COLOR,
FPS_THICKNESS,
cv2.LINE_AA,
)
# Display inference FPS
fps_inference_text = f"Inference FPS: {current_fps_inference:.1f}"
cv2.putText(
m.array,
fps_inference_text,
FPS_POSITION_INFERENCE,
FONT,
FPS_TEXT_SIZE,
FPS_COLOR,
FPS_THICKNESS,
cv2.LINE_AA,
)
if current_detections is not None and len(current_detections) > 0:
for class_name, bbox, score in current_detections:
x0, y0, x1, y1 = bbox
label = f"{class_name} %{int(score * 100)}"
color = COLOR_MAP.get(class_name, DEFAULT_COLOR)
cv2.rectangle(m.array, (x0, y0), (x1, y1),
color, BOX_THICKNESS)
cv2.putText(
m.array,
label,
(x0 + 5, max(15, y0 + 15)),
FONT,
LABEL_TEXT_SIZE,
color,
LABEL_THICKNESS,
cv2.LINE_AA,
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Record a video with Picamera2 and perform object detection using parallel processing."
)
parser.add_argument("--width", type=int, default=1920,
help="Width of the video")
parser.add_argument("--height", type=int, default=1080,
help="Height of the video")
parser.add_argument(
"--config_file_path",
type=str,
default="config.yaml",
help="Configuration file path",
)
parser.add_argument(
"-m", "--model", help="Path for the HEF model.", default="yolov8n_D13_quantized_model.hef"
)
parser.add_argument(
"-l",
"--labels",
default="coco_6.txt",
help="Path to a text file containing labels.",
)
parser.add_argument(
"-s",
"--score_thresh",
type=float,
default=0.3,
help="Score threshold, must be a float between 0 and 1.",
)
parser.add_argument(
"--queue_size",
type=int,
default=12,
help="Maximum size of the frame queue",
)
parser.add_argument(
"-b", "--batch_size",
type=int,
default=2,
help="Batch size for inference"
)
args = parser.parse_args()
video_w = args.width
video_h = args.height
score_thresh = args.score_thresh
labels = args.labels
model = args.model
batch_size = args.batch_size
MAX_QUEUE_SIZE = max(args.queue_size, batch_size)
with Hailo(model, batch_size) as hailo:
model_h, model_w, _ = hailo.get_input_shape()
with open(labels, "r", encoding="utf-8") as f:
class_names = f.read().splitlines()
frame_queue = queue.Queue(maxsize=MAX_QUEUE_SIZE)
with Picamera2() as picam2:
picam2.video_configuration.main.size = (video_w, video_h)
main = {'size': (video_w, video_h), 'format': 'XBGR8888'}
lores = {'size': (model_w, model_h), 'format': 'YUV420'}
config = picam2.create_preview_configuration(main, lores=lores)
picam2.configure(config)
camera_control_dict = generate_controls_from_yaml(
args.config_file_path)
picam2.set_controls(camera_control_dict)
picam2.start_preview(Preview.QTGL, x=0, y=0,
width=video_w, height=video_h)
picam2.start()
picam2.pre_callback = draw_objects
# Start all worker threads
capture_worker = threading.Thread(
target=capture_thread,
args=(picam2, frame_queue),
daemon=True
)
inference_worker = threading.Thread(
target=inference_thread,
args=(hailo, frame_queue, video_w,
video_h, class_names, score_thresh, batch_size),
daemon=True
)
detection_updater = threading.Thread(
target=detection_update_thread,
daemon=True
)
capture_worker.start()
inference_worker.start()
detection_updater.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down...")
shutdown_event.set()
capture_worker.join(timeout=2.0)
inference_worker.join(timeout=2.0)
detection_updater.join(timeout=2.0)
picam2.stop()
print("Shutdown complete.")