CHECK_SUCCESS failed with status=HAILO_NOT_FOUND(61) after period of time

I run asynchronous inference via an input queue.
After some hours of operation this error message is logged by HailoRT, and it does not recover:

[HailoRT] [error] CHECK_SUCCESS failed with status=HAILO_NOT_FOUND(61)

Restarting my python application fixes the issue.
Here is my entire inference class:

import cv2
from hailo_platform import (HEF, VDevice, FormatType, HailoSchedulingAlgorithm)
from functools import partial
import logging
import numpy as np
from typing import Dict, List, Optional, Tuple
import queue

from .config import PixelFormatEnum
from .util import SharedMemoryFrameManager

class HailoAsyncInference:
    def __init__(
        self,
        hef_path: str,
        input_queue: queue.Queue,
        output_queue: queue.Queue,
        batch_size: int = 1,
        pixel_format: PixelFormatEnum = PixelFormatEnum.rgb,
        input_type: Optional[str] = None,
        output_type: Optional[str] = None
    ) -> None:
        """
        Initialize the HailoAsyncInference class with the provided HEF model 
        file path and input/output queues.

        Args:
            hef_path (str): Path to the HEF model file.
            input_queue (queue.Queue): Queue from which to pull input frames 
                                       for inference.
            output_queue (queue.Queue): Queue to hold the inference results.
            batch_size (int): Batch size for inference. Defaults to 1.
            input_type (Optional[str]): Format type of the input stream. 
                                        Possible values: 'UINT8', 'UINT16'.
            output_type (Optional[str]): Format type of the output stream. 
                                         Possible values: 'UINT8', 'UINT16', 'FLOAT32'.
        """
        self.frame_manager = SharedMemoryFrameManager()
        if pixel_format == PixelFormatEnum.rgb:
            self.pixel_format = cv2.COLOR_YUV2RGB_I420
        elif pixel_format == PixelFormatEnum.bgr:
            self.pixel_format = cv2.COLOR_YUV2BGR_I420
        else:
            raise ValueError(f"Pixel format {pixel_format} not supported")
        
        self.input_queue = input_queue
        self.output_queue = output_queue

        params = VDevice.create_params()
        
        # Set the scheduling algorithm to round-robin to activate the scheduler
        params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN
        params.group_id = "SHARED"
        params.multi_process_service = True

        self.hef = HEF(hef_path)
        self.target = VDevice(params)
        self.infer_model = self.target.create_infer_model(hef_path)
        self.infer_model.set_batch_size(batch_size)

        if input_type is not None:
            self._set_input_type(input_type)
        if output_type is not None:
            self._set_output_type(output_type)

        self.output_type = output_type

    def _set_input_type(self, input_type: Optional[str] = None) -> None:
        """
        Set the input type for the HEF model. If the model has multiple inputs,
        it will set the same type of all of them.

        Args:
            input_type (Optional[str]): Format type of the input stream.
        """
        self.infer_model.input().set_format_type(getattr(FormatType, input_type))
    
    def _set_output_type(self, output_type: Optional[str] = None) -> None:
        """
        Set the output type for the HEF model. If the model has multiple outputs,
        it will set the same type of all of them.

        Args:
            output_type (Optional[str]): Format type of the output stream.
        """
        self.infer_model.output().set_format_type(getattr(FormatType, output_type))

    def callback(
        self, completion_info, bindings_list: list, processed_batch: list
    ) -> None:
        """
        Callback function for handling inference results.

        Args:
            completion_info: Information about the completion of the 
                             inference task.
            bindings_list (list): List of binding objects containing input 
                                  and output buffers.
            processed_batch (list): The processed batch of images.
        """
        if completion_info.exception:
            logging.info(f'Inference error: {completion_info.exception}')
            self.output_queue.put((None, []))
        else:
            for i, bindings in enumerate(bindings_list):
                if len(bindings._output_names) == 1:
                    result = bindings.output().get_buffer()
                else:
                    result = {
                        name: bindings.output(name).get_buffer() 
                        for name in bindings._output_names
                    }
                self.output_queue.put((processed_batch[i], result))

    def _get_vstream_info(self) -> Tuple[list, list]:
        """
        Get information about input and output stream layers.

        Returns:
            Tuple[list, list]: List of input stream layer information, List of 
                               output stream layer information.
        """
        return (
            self.hef.get_input_vstream_infos(), 
            self.hef.get_output_vstream_infos()
        )

    def get_input_shape(self) -> Tuple[int, ...]:
        """
        Get the shape of the model's input layer.

        Returns:
            Tuple[int, ...]: Shape of the model's input layer.
        """
        return self.hef.get_input_vstream_infos()[0].shape  # Assumes one input

    def run(self) -> None:
        """
        Run asynchronous inference on the Hailo device, processing batches 
        from the input queue.

        Batches are fetched from the input queue until a sentinel value 
        (None) is encountered.
        """
        input_height = self.get_input_shape()[0]
        input_width = self.get_input_shape()[1]

        with self.infer_model.configure() as configured_infer_model:
            job = None

            while True:
                try:
                    batch_info = self.input_queue.get()  
                    if batch_info is None:
                        break  # Sentinel value to stop the inference loop

                    batch_frames = []
                    bindings_list = []
                    for camera_name, frame_time, frame_shape in batch_info:
                        try:
                            frame = self.frame_manager.get(
                                f"{camera_name}{frame_time}",
                                (frame_shape[0], frame_shape[1])
                            )
                            if frame is None:
                                logging.info(f"{camera_name} frame {frame_time} is not in memory store.")
                                continue

                            frame = cv2.cvtColor(frame, self.pixel_format)
                            if frame.shape != (input_height, input_width):
                                frame = cv2.resize(frame, (input_width, input_height), interpolation=cv2.INTER_AREA)
                                # Comment: Shape of an image is (height, width) but cv2.resize expects (width, height)

                            batch_frames.append(frame)
                            
                            self.frame_manager.close(f"{camera_name}{frame_time}")
                            
                            bindings = self._create_bindings(configured_infer_model)
                            bindings.input().set_buffer(np.array(frame))
                            bindings_list.append(bindings)
                        except Exception:
                            continue

                    if not batch_frames:
                        self.output_queue.put((None, []))
                        continue

                    configured_infer_model.wait_for_async_ready(timeout_ms=10000)
                    job = configured_infer_model.run_async(
                        bindings_list, partial(
                            self.callback,
                            processed_batch=batch_frames, 
                            bindings_list=bindings_list
                        )
                    )
                except Exception as e:
                    logging.error(f"Error in Hailo inference loop: {e}")
                    # In case of error, put None in output queue
                    self.output_queue.put((None, []))
                    continue

            if job is not None:
                job.wait(10000)  # Wait for the last job

    def _create_bindings(self, configured_infer_model) -> object:
        """
        Create bindings for input and output buffers.

        Args:
            configured_infer_model: The configured inference model.

        Returns:
            object: Bindings object with input and output buffers.
        """
        if self.output_type is None:
            hef_output_type = str(
                self.hef.get_output_vstream_infos()[0].format.type
            ).split(".")[1].lower()
            output_type = getattr(np, hef_output_type)
        else:
            output_type = getattr(np, self.output_type.lower())

        output_buffers = {
            name: np.empty(
                self.infer_model.output(name).shape, dtype=output_type
            )
            for name in self.infer_model.output_names
        }
        return configured_infer_model.create_bindings(
            output_buffers=output_buffers
        )

    def extract_detections(
        self,
        output: List[np.ndarray],
        original_frame_height: int,
        original_frame_width: int,
        threshold: float,
        classes: Dict[int, str],
        classes_to_detect: List[int]
    ) -> List[Dict]:
        """Extract detections from the HailoRT-postprocess output.
        
        Args:
            output (List[np.ndarray]): List of numpy arrays containing the detections.
            original_frame_height (int): Original height of the frame.
            original_frame_width (int): Original width of the frame.
            threshold (float): Confidence threshold for detections.
            classes (Dict[int, str]): Dictionary of class IDs and names.
            classes_to_detect (List[int]): List of class ids to detect.

        Returns:
            List[Dict]: List of detections.
            Each detection is a dictionary with the following keys:
                - label (str): The class label of the detection.
                - confidence (float): The confidence score of the detection.
                - x_min (int): The x-coordinate of the top-left corner of the bounding box.
                - y_min (int): The y-coordinate of the top-left corner of the bounding box.
                - x_max (int): The x-coordinate of the bottom-right corner of the bounding box.
                - y_max (int): The y-coordinate of the bottom-right corner of the bounding box.
        """
        results = []

        for i, detections in enumerate(output):
            if len(detections) == 0:
                continue
            if i not in classes_to_detect:
                continue
            for detection in detections:
                bbox, score = detection[:4], detection[4]

                if score < threshold:
                    continue

                # Convert bbox to xyxy absolute pixel values
                y_min, x_min, y_max, x_max = (
                    bbox[0] * original_frame_height,
                    bbox[1] * original_frame_width,
                    bbox[2] * original_frame_height,
                    bbox[3] * original_frame_width,
                )

                results.append({
                    "label": classes[i],
                    "confidence": int(score * 100),
                    "x_min": int(x_min),
                    "y_min": int(y_min),
                    "x_max": int(x_max),
                    "y_max": int(y_max)
                })

        return results

Hey @axel.moller,

To better assist you with the memory usage issue, it would be helpful if you could provide more details. Is there any indication of a memory leak in your application?

Possible Causes and Solutions for HAILO_NOT_FOUND(61):

  1. Resource Issues:

    • The error can result from unreleased bindings, models, or device resources over extended use, or memory leaks during asynchronous inference.
    • Solution: Explicitly release resources in the destructor (__del__) and abort incomplete jobs during shutdown.
  2. Shared Resource Conflicts:

    • Conflicts arise when multiple processes or threads access shared resources like device handles or shared memory, often due to misconfigured multi_process_service or group_id.
    • Solution: Use a unique group_id for multi-process applications or disable multi_process_service for single-use scenarios.
  3. Timeouts or Long Inference Times:

    • Extended inference tasks may cause device timeouts or unresponsiveness.
    • Solution: Increase the timeout_ms for configured_infer_model.wait_for_async_ready (e.g., to 30,000 ms).

Additionally, to enhance the robustness of your application, you can implement logic to restart specific components instead of the entire application when the error occurs. This can involve restarting just the inference class or re initializing the necessary resources.