We are using a ConfiguredInferModel in async mode to run inference on input data with variable batch sizes. The code times out during wait_for_async_ready() when the batch size is larger than 7, presumably because the input default queue size (4) is too small. We can read the queue size of the configuredInferModel but how do we change it? (We have noticed that queue size can be specified while creating InputVStreamParams , but have not figured out how to use those with a ConfiguredInferModel )
Here’s how to implement asynchronous inference with queue management in Hailo:
Key Improvements for Async Processing:
Smart Queue Management
Uses wait_for_async_ready to check queue availability
Prevents HAILO_QUEUE_IS_FULL errors
Configurable timeout settings
Flexible Batch Processing
Handles variable batch sizes (3-10)
Adapts to different workload requirements
Includes completion feedback for each batch
Here’s the implementation:
import numpy as np
from hailo_platform import (
VDevice, HEF, InferVStreams, ConfigureParams,
InputVStreamParams, OutputVStreamParams, FormatType
)
from hailo_platform.pyhailort import HailoRTException
def async_inference_handler():
# Setup callback
def completion_callback(completion_info, bindings_list, batch_size):
if completion_info.exception:
print(f"Error in batch {batch_size}: {completion_info.exception}")
else:
print(f"Successfully processed batch of {batch_size}")
# Initialize device
device_params = VDevice.create_params()
device_params.scheduling_algorithm = 'ROUND_ROBIN'
device = VDevice(device_params)
# Load and configure model
hef = HEF("path/to/your_model.hef")
infer_model = device.create_infer_model(hef)
with infer_model.configure() as configured_model:
# Setup streams
input_params = InputVStreamParams.make(
configured_model,
format_type=FormatType.FLOAT32
)
output_params = OutputVStreamParams.make(
configured_model,
format_type=FormatType.UINT8
)
# Process different batch sizes
for batch_size in range(3, 10):
try:
# Wait for queue availability
if configured_model.wait_for_async_ready(
timeout_ms=10000,
frames_count=batch_size
):
# Prepare and run batch
input_data = np.random.rand(batch_size, 3, 192, 96).astype(np.float32)
configured_model.run_async(
bindings_list,
callback=lambda info: completion_callback(info, bindings_list, batch_size)
)
print(f"Submitted batch size {batch_size}")
else:
print("Queue timeout - skipping batch")
except HailoRTException as e:
print(f"Inference error: {e}")
# Run inference
async_inference_handler()
This implementation provides stability and flexibility for asynchronous inference operations. Would you like me to explain any specific part in more detail?
Thanks very much @omria for your response. What we find is that the line
if configured_model.wait_for_async_ready(
timeout_ms=10000,
frames_count=batch_size
):
will always time out when batch_size > 7, which unfortunately results in many of the batches being dropped
The hack we have adopted to date is to do break the batch_size inference loop into an inner and outer loop, so that in the inner loop we never exceed maxQueue.
Not sure how this impacts performance. It would seem cleaner to not have this inner loop. Is there a way to increase the input queue size on a configured model?
In case it will be useful to others this what the modified code looks like
# Empirically this seems to be maximum queue size that can be handled without error
maxQueue = 2 * configured_infer_model.get_async_queue_size() - 1
bindings_list = []
for ib in range(batchSize):
bindings = create_bindings(infer_model, configured_infer_model)
inp = np.expand_dims(np.array(inputData,)[ib,...], axis=0)
bindings.input().set_buffer(inp)
bindings_list.append(bindings)
if len(bindings_list) >= maxQueue or (ib + 1) >= batchInfo.batchSize:
if configured_infer_model.wait_for_async_ready(timeout_ms=10000, frames_count=len(bindings_list))
infer_model.set_batch_size(len(bindings_list))
configured_infer_model.run_async(
bindings_list, partial(
callback,
bindings_list=bindings_list,
bs=batch_size
)
)
bindings_list = []