Slow inferencing sleed

I have been trying to perform inferencing with csv file as an input but compared to when images are taken as an input, using csv is is very slow.

what can be the reason?

Following is the code that I am using.


import os
import glob
import numpy as np
from hailo_platform import __version__
from multiprocessing import Process, Queue, Manager
from hailo_platform import (HEF, Device, VDevice, HailoStreamInterface, ConfigureParams,
                            InputVStreamParams, OutputVStreamParams, InputVStreams, OutputVStreams, FormatType)
from zenlog import log
import time
import argparse
import csv
import cProfile
import pstats
from io import StringIO

# Set up argument parser
parser = argparse.ArgumentParser(description='Running a Hailo inference using CSV input')
parser.add_argument('hef', help="HEF file path")
parser.add_argument('--input-dir', help="Directory path containing CSV files for classification.")
parser.add_argument('--output-csv', default="inference_results.csv", help="Path to save all results as a single CSV file.")
args = parser.parse_args()

# ---------------- Post-processing function ----------------- #
def post_processing(inference_output, sample_name, results_list):
    class_probabilities = inference_output[0]  # Assuming a single output with class probabilities
    predicted_class = np.argmax(class_probabilities)  # Get the index of the highest probability
    confidence = class_probabilities[predicted_class]
    
    # Append the result to the results list
    results_list.append([sample_name, predicted_class, confidence])

# ---------------- Inference threads functions -------------- #
def send(configured_network, csv_data, num_samples, batch_size=100):
    vstreams_params = InputVStreamParams.make_from_network_group(configured_network, quantized=False, format_type=FormatType.FLOAT32)
    configured_network.wait_for_activation(100)
    print('Performing classification on CSV data...\n')
    with InputVStreams(configured_network, vstreams_params) as vstreams:
        for i in range(0, num_samples, batch_size):
            batch_data = csv_data[i:i + batch_size]
            for vstream in vstreams:
                batch_data_expanded = np.expand_dims(batch_data, axis=0).astype(np.float32)
                vstream.send(batch_data_expanded)

def recv(configured_network, write_q, num_samples, batch_size=100, max_retries=5):
    vstreams_params = OutputVStreamParams.make_from_network_group(configured_network, quantized=False, format_type=FormatType.FLOAT32)
    configured_network.wait_for_activation(100)
    
    with OutputVStreams(configured_network, vstreams_params) as vstreams:
        for _ in range(0, num_samples, batch_size):
            curr_vstream_data_dict = {}
            for vstream in vstreams:
                retry_count = 0
                while retry_count <= max_retries:
                    try:
                        data = vstream.recv()
                        if data.ndim == 1:
                            curr_vstream_data_dict[vstream.name] = data
                        else:
                            raise ValueError(f"Unexpected data shape: {data.shape}")
                        break  # Exit the retry loop if successful
                    except hailo_platform.pyhailort.pyhailort.HailoRTTimeout:
                        retry_count += 1
                        if retry_count > max_retries:
                            log.error(f"Max retries exceeded for vstream {vstream.name}.")
                            raise
                        else:
                            log.warning(f"Timeout on vstream {vstream.name}. Retrying {retry_count}/{max_retries}...")
                            time.sleep(2 ** retry_count)  # Exponential backoff

            write_q.put(curr_vstream_data_dict)

def inference(read_q, sample_names, current_file_index, total_files, num_samples, results_list):
    i = 0
    while i < num_samples:
        if not read_q.empty():
            inference_dict = read_q.get(0)
            inference_output = list(inference_dict.values())
            print(f"Processing sample {sample_names[i]} (File {current_file_index}/{total_files})")  # Updated logging
            post_processing(inference_output, sample_names[i], results_list)
            i += 1

# ---------------- Pre-processing CSV data ------------------ #
def preprocess_csv_data(csv_file_path, expected_size):
    csv_data = []
    sample_names = []
    
    with open(csv_file_path, newline='') as csvfile:
        csvreader = csv.reader(csvfile)
        for i, row in enumerate(csvreader):
            if len(row) != 1:
                raise ValueError(f"Row {i+1} in CSV file should have exactly one value. Found: {len(row)} values.")
            
            sample_data = float(row[0].strip())  # Trim whitespace and convert to float
            csv_data.append(sample_data)
            sample_names.append(f'Sample_{i+1}_{os.path.basename(csv_file_path)}')

    csv_data = np.array(csv_data, dtype=np.float32).reshape(-1, expected_size)  # Reshape to (num_samples, expected_size)
    return csv_data, sample_names

# ---------------- Save results to CSV file ----------------- #
def save_results_to_csv(results_list, output_csv_path="inference_results.csv"):
    if results_list:
        with open(output_csv_path, mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(["Sample Name", "Predicted Class", "Confidence"])
            writer.writerows(results_list)
        print(f'Results saved to {output_csv_path}')
    else:
        print("No results to save.")

# ---------------- Start of the script --------------------- #
def main():
    hef = HEF(args.hef)
    height, width, channels = hef.get_input_vstream_infos()[0].shape
    num_features = 200  # Overriding to match expected input size

    # Get all CSV files and limit to 1000
    csv_files = glob.glob(f"{args.input_dir}/*.csv")
    csv_files = csv_files[:1000]  # Limit to the first 1000 files
    total_files = len(csv_files)  # Update the total number of CSV files

    # Use a Manager to create a shared list for results
    with Manager() as manager:
        results_list = manager.list()  # Use a managed list

        devices = Device.scan()

        with VDevice(device_ids=devices) as target:
            configure_params = ConfigureParams.create_from_hef(hef, interface=HailoStreamInterface.PCIe)
            network_group = target.configure(hef, configure_params)[0]

            queue = Queue()

            # Preprocess each CSV file
            for file_index, csv_file in enumerate(csv_files, start=1):  # Start the index at 1
                csv_data, sample_names = preprocess_csv_data(csv_file, num_features)
                num_samples = csv_data.shape[0]

                send_process = Process(target=send, args=(network_group, csv_data, num_samples, 100))
                recv_process = Process(target=recv, args=(network_group, queue, num_samples, 100))
                inference_process = Process(target=inference, args=(queue, sample_names, file_index, total_files, num_samples, results_list))

                start_time = time.time()
                recv_process.start()
                send_process.start()
                inference_process.start()

                with network_group.activate():
                    recv_process.join()
                    send_process.join()
                    inference_process.join()

                end_time = time.time()
                print(f'Processed file {csv_file} in {end_time - start_time:.3f} seconds')

Hey @bijen.mali

The slow performance when using CSV files for inference (compared to images) stems from several bottlenecks:

Key Issues:

  • File I/O overhead from reading multiple CSV files
  • Inefficient batching and data transfer
  • Processing overhead from CSV conversion
  • Synchronization delays between processes
  • Delays from retry mechanisms

Recommended Solutions:

  1. Preload all CSV data upfront to minimize I/O operations
  2. Optimize batch sizes for your specific model/hardware
  3. Implement asynchronous data handling using queues or shared memory
  4. Streamline retry logic to reduce delays
  5. Parallelize CSV preprocessing using ThreadPoolExecutor
  6. Profile code to identify specific bottlenecks
  7. Adjust network activation timeout settings
  8. Reduce CSV data size where possible

These optimizations should significantly improve your inference pipeline’s performance with CSV inputs.