I run asynchronous inference via an input queue.
After some hours of operation this error message is logged by HailoRT, and it does not recover:
[HailoRT] [error] CHECK_SUCCESS failed with status=HAILO_NOT_FOUND(61)
Restarting my python application fixes the issue.
Here is my entire inference class:
import cv2
from hailo_platform import (HEF, VDevice, FormatType, HailoSchedulingAlgorithm)
from functools import partial
import logging
import numpy as np
from typing import Dict, List, Optional, Tuple
import queue
from .config import PixelFormatEnum
from .util import SharedMemoryFrameManager
class HailoAsyncInference:
def __init__(
self,
hef_path: str,
input_queue: queue.Queue,
output_queue: queue.Queue,
batch_size: int = 1,
pixel_format: PixelFormatEnum = PixelFormatEnum.rgb,
input_type: Optional[str] = None,
output_type: Optional[str] = None
) -> None:
"""
Initialize the HailoAsyncInference class with the provided HEF model
file path and input/output queues.
Args:
hef_path (str): Path to the HEF model file.
input_queue (queue.Queue): Queue from which to pull input frames
for inference.
output_queue (queue.Queue): Queue to hold the inference results.
batch_size (int): Batch size for inference. Defaults to 1.
input_type (Optional[str]): Format type of the input stream.
Possible values: 'UINT8', 'UINT16'.
output_type (Optional[str]): Format type of the output stream.
Possible values: 'UINT8', 'UINT16', 'FLOAT32'.
"""
self.frame_manager = SharedMemoryFrameManager()
if pixel_format == PixelFormatEnum.rgb:
self.pixel_format = cv2.COLOR_YUV2RGB_I420
elif pixel_format == PixelFormatEnum.bgr:
self.pixel_format = cv2.COLOR_YUV2BGR_I420
else:
raise ValueError(f"Pixel format {pixel_format} not supported")
self.input_queue = input_queue
self.output_queue = output_queue
params = VDevice.create_params()
# Set the scheduling algorithm to round-robin to activate the scheduler
params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN
params.group_id = "SHARED"
params.multi_process_service = True
self.hef = HEF(hef_path)
self.target = VDevice(params)
self.infer_model = self.target.create_infer_model(hef_path)
self.infer_model.set_batch_size(batch_size)
if input_type is not None:
self._set_input_type(input_type)
if output_type is not None:
self._set_output_type(output_type)
self.output_type = output_type
def _set_input_type(self, input_type: Optional[str] = None) -> None:
"""
Set the input type for the HEF model. If the model has multiple inputs,
it will set the same type of all of them.
Args:
input_type (Optional[str]): Format type of the input stream.
"""
self.infer_model.input().set_format_type(getattr(FormatType, input_type))
def _set_output_type(self, output_type: Optional[str] = None) -> None:
"""
Set the output type for the HEF model. If the model has multiple outputs,
it will set the same type of all of them.
Args:
output_type (Optional[str]): Format type of the output stream.
"""
self.infer_model.output().set_format_type(getattr(FormatType, output_type))
def callback(
self, completion_info, bindings_list: list, processed_batch: list
) -> None:
"""
Callback function for handling inference results.
Args:
completion_info: Information about the completion of the
inference task.
bindings_list (list): List of binding objects containing input
and output buffers.
processed_batch (list): The processed batch of images.
"""
if completion_info.exception:
logging.info(f'Inference error: {completion_info.exception}')
self.output_queue.put((None, []))
else:
for i, bindings in enumerate(bindings_list):
if len(bindings._output_names) == 1:
result = bindings.output().get_buffer()
else:
result = {
name: bindings.output(name).get_buffer()
for name in bindings._output_names
}
self.output_queue.put((processed_batch[i], result))
def _get_vstream_info(self) -> Tuple[list, list]:
"""
Get information about input and output stream layers.
Returns:
Tuple[list, list]: List of input stream layer information, List of
output stream layer information.
"""
return (
self.hef.get_input_vstream_infos(),
self.hef.get_output_vstream_infos()
)
def get_input_shape(self) -> Tuple[int, ...]:
"""
Get the shape of the model's input layer.
Returns:
Tuple[int, ...]: Shape of the model's input layer.
"""
return self.hef.get_input_vstream_infos()[0].shape # Assumes one input
def run(self) -> None:
"""
Run asynchronous inference on the Hailo device, processing batches
from the input queue.
Batches are fetched from the input queue until a sentinel value
(None) is encountered.
"""
input_height = self.get_input_shape()[0]
input_width = self.get_input_shape()[1]
with self.infer_model.configure() as configured_infer_model:
job = None
while True:
try:
batch_info = self.input_queue.get()
if batch_info is None:
break # Sentinel value to stop the inference loop
batch_frames = []
bindings_list = []
for camera_name, frame_time, frame_shape in batch_info:
try:
frame = self.frame_manager.get(
f"{camera_name}{frame_time}",
(frame_shape[0], frame_shape[1])
)
if frame is None:
logging.info(f"{camera_name} frame {frame_time} is not in memory store.")
continue
frame = cv2.cvtColor(frame, self.pixel_format)
if frame.shape != (input_height, input_width):
frame = cv2.resize(frame, (input_width, input_height), interpolation=cv2.INTER_AREA)
# Comment: Shape of an image is (height, width) but cv2.resize expects (width, height)
batch_frames.append(frame)
self.frame_manager.close(f"{camera_name}{frame_time}")
bindings = self._create_bindings(configured_infer_model)
bindings.input().set_buffer(np.array(frame))
bindings_list.append(bindings)
except Exception:
continue
if not batch_frames:
self.output_queue.put((None, []))
continue
configured_infer_model.wait_for_async_ready(timeout_ms=10000)
job = configured_infer_model.run_async(
bindings_list, partial(
self.callback,
processed_batch=batch_frames,
bindings_list=bindings_list
)
)
except Exception as e:
logging.error(f"Error in Hailo inference loop: {e}")
# In case of error, put None in output queue
self.output_queue.put((None, []))
continue
if job is not None:
job.wait(10000) # Wait for the last job
def _create_bindings(self, configured_infer_model) -> object:
"""
Create bindings for input and output buffers.
Args:
configured_infer_model: The configured inference model.
Returns:
object: Bindings object with input and output buffers.
"""
if self.output_type is None:
hef_output_type = str(
self.hef.get_output_vstream_infos()[0].format.type
).split(".")[1].lower()
output_type = getattr(np, hef_output_type)
else:
output_type = getattr(np, self.output_type.lower())
output_buffers = {
name: np.empty(
self.infer_model.output(name).shape, dtype=output_type
)
for name in self.infer_model.output_names
}
return configured_infer_model.create_bindings(
output_buffers=output_buffers
)
def extract_detections(
self,
output: List[np.ndarray],
original_frame_height: int,
original_frame_width: int,
threshold: float,
classes: Dict[int, str],
classes_to_detect: List[int]
) -> List[Dict]:
"""Extract detections from the HailoRT-postprocess output.
Args:
output (List[np.ndarray]): List of numpy arrays containing the detections.
original_frame_height (int): Original height of the frame.
original_frame_width (int): Original width of the frame.
threshold (float): Confidence threshold for detections.
classes (Dict[int, str]): Dictionary of class IDs and names.
classes_to_detect (List[int]): List of class ids to detect.
Returns:
List[Dict]: List of detections.
Each detection is a dictionary with the following keys:
- label (str): The class label of the detection.
- confidence (float): The confidence score of the detection.
- x_min (int): The x-coordinate of the top-left corner of the bounding box.
- y_min (int): The y-coordinate of the top-left corner of the bounding box.
- x_max (int): The x-coordinate of the bottom-right corner of the bounding box.
- y_max (int): The y-coordinate of the bottom-right corner of the bounding box.
"""
results = []
for i, detections in enumerate(output):
if len(detections) == 0:
continue
if i not in classes_to_detect:
continue
for detection in detections:
bbox, score = detection[:4], detection[4]
if score < threshold:
continue
# Convert bbox to xyxy absolute pixel values
y_min, x_min, y_max, x_max = (
bbox[0] * original_frame_height,
bbox[1] * original_frame_width,
bbox[2] * original_frame_height,
bbox[3] * original_frame_width,
)
results.append({
"label": classes[i],
"confidence": int(score * 100),
"x_min": int(x_min),
"y_min": int(y_min),
"x_max": int(x_max),
"y_max": int(y_max)
})
return results