How to increase input_queue_size on a ConfiguredInferModel

Hello
This question is related to question How do I increase the max queue size for HailoRT?, but the answer posted there does not work for our use case

We are using a ConfiguredInferModel in async mode to run inference on input data with variable batch sizes. The code times out during wait_for_async_ready() when the batch size is larger than 7, presumably because the input default queue size (4) is too small. We can read the queue size of the configuredInferModel but how do we change it? (We have noticed that queue size can be specified while creating InputVStreamParams , but have not figured out how to use those with a ConfiguredInferModel )

Here is the test code we use

def callback(completion_info, bindings_list, bs):
  if completion_info.exception:
    print("Callbck excption {}".format(completion_info.exception))
  else:
    for idx, bindings in enumerate(bindings_list):
      print("BS {} {} output shape {}".format(bs, idx, bindings.output().get_buffer().shape))

def run(configured_infer_model, infer_model, batch_size, hef, input):
  bindings_list = []
  for ib in range(batch_size):
    bindings = create_bindings(configured_infer_model, infer_model, hef)
    inp = np.expand_dims(np.array(input,)[ib,...], axis=0)
    bindings.input().set_buffer(inp)
    bindings_list.append(bindings)

  qs = configured_infer_model.get_async_queue_size()
  print("Wait async BS {} queueSize {}".format(batch_size, qs ))
  configured_infer_model.wait_for_async_ready(timeout_ms=10000, frames_count=batch_size)
  infer_model.set_batch_size(batch_size)
  configured_infer_model.run_async(
      bindings_list, partial(
          callback,
          bindings_list=bindings_list,
          bs=batch_size
      )
  )

  print("Job Submitted") 

  

def main():
  params = VDevice.create_params()
  params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN

  hef = HEF(hef_path)

  target = VDevice(params)
  infer_model = target.create_infer_model(hef_path)
  infer_model.input().set_format_type(getattr(FormatType, 'FLOAT32'))
  with infer_model.configure() as configured_infer_model:

    for bs in range(3, 10):
      input = np.random.rand(bs, 3, 192, 96, ).astype(np.float32)
      run(configured_infer_model, infer_model, bs, hef, input)

Hey @mrevv

Welcome to the Hailo Community!

Here’s how to implement asynchronous inference with queue management in Hailo:

Key Improvements for Async Processing:

  1. Smart Queue Management

    • Uses wait_for_async_ready to check queue availability
    • Prevents HAILO_QUEUE_IS_FULL errors
    • Configurable timeout settings
  2. Flexible Batch Processing

    • Handles variable batch sizes (3-10)
    • Adapts to different workload requirements
    • Includes completion feedback for each batch

Here’s the implementation:

import numpy as np
from hailo_platform import (
    VDevice, HEF, InferVStreams, ConfigureParams,
    InputVStreamParams, OutputVStreamParams, FormatType
)
from hailo_platform.pyhailort import HailoRTException

def async_inference_handler():
    # Setup callback
    def completion_callback(completion_info, bindings_list, batch_size):
        if completion_info.exception:
            print(f"Error in batch {batch_size}: {completion_info.exception}")
        else:
            print(f"Successfully processed batch of {batch_size}")

    # Initialize device
    device_params = VDevice.create_params()
    device_params.scheduling_algorithm = 'ROUND_ROBIN'
    device = VDevice(device_params)

    # Load and configure model
    hef = HEF("path/to/your_model.hef")
    infer_model = device.create_infer_model(hef)
    
    with infer_model.configure() as configured_model:
        # Setup streams
        input_params = InputVStreamParams.make(
            configured_model, 
            format_type=FormatType.FLOAT32
        )
        output_params = OutputVStreamParams.make(
            configured_model, 
            format_type=FormatType.UINT8
        )

        # Process different batch sizes
        for batch_size in range(3, 10):
            try:
                # Wait for queue availability
                if configured_model.wait_for_async_ready(
                    timeout_ms=10000, 
                    frames_count=batch_size
                ):
                    # Prepare and run batch
                    input_data = np.random.rand(batch_size, 3, 192, 96).astype(np.float32)
                    configured_model.run_async(
                        bindings_list,
                        callback=lambda info: completion_callback(info, bindings_list, batch_size)
                    )
                    print(f"Submitted batch size {batch_size}")
                else:
                    print("Queue timeout - skipping batch")
            except HailoRTException as e:
                print(f"Inference error: {e}")

# Run inference
async_inference_handler()

This implementation provides stability and flexibility for asynchronous inference operations. Would you like me to explain any specific part in more detail?

Thanks very much @omria for your response. What we find is that the line

if configured_model.wait_for_async_ready(
                    timeout_ms=10000, 
                    frames_count=batch_size
                ):

will always time out when batch_size > 7, which unfortunately results in many of the batches being dropped

The hack we have adopted to date is to do break the batch_size inference loop into an inner and outer loop, so that in the inner loop we never exceed maxQueue.

maxQueue = 2 * configured_infer_model.get_async_queue_size() - 1

Not sure how this impacts performance. It would seem cleaner to not have this inner loop. Is there a way to increase the input queue size on a configured model?

In case it will be useful to others this what the modified code looks like

# Empirically this seems to be maximum queue size that can be handled without error
maxQueue = 2 * configured_infer_model.get_async_queue_size() - 1
bindings_list = []
for ib in range(batchSize):
   bindings = create_bindings(infer_model, configured_infer_model)
   inp = np.expand_dims(np.array(inputData,)[ib,...], axis=0)
   bindings.input().set_buffer(inp)
   bindings_list.append(bindings)

   if len(bindings_list) >= maxQueue or (ib + 1) >= batchInfo.batchSize:    
     if configured_infer_model.wait_for_async_ready(timeout_ms=10000, frames_count=len(bindings_list))
        infer_model.set_batch_size(len(bindings_list))
        configured_infer_model.run_async(
                bindings_list, partial(
                    callback,
                    bindings_list=bindings_list,
                    bs=batch_size
                )
            )

     bindings_list = []