running hef with python on rpi5+hailo-8l+ai-kit

I want to have create a custom software that takes the realtime feed from the picamer2 and does inference using the yolov8-8l hef on the hailo8l ai kit, and shows the object ddetected output on the screen, similar to what this command does but from my code

rpicam-hello -t 0 --post-process-file /usr/share/rpi-camera-assets/hailo_yolov8_inference.json

I tried this code -

import cv2
import numpy as np
import picamera2
import libcamera
from hailo_platform import (
    HEF, VDevice, HailoStreamInterface, InferVStreams,
    ConfigureParams, InputVStreamParams, OutputVStreamParams, 
    InputVStreams, OutputVStreams, FormatType
)

class HailoYOLOInference:
    def __init__(self, hef_path):
        # Initialize Hailo device and model
        self.target = VDevice()
        self.hef = HEF(hef_path)
        
        # Configure network groups
        self.configure_params = ConfigureParams.create_from_hef(
            hef=self.hef, 
            interface=HailoStreamInterface.PCIe
        )
        self.network_groups = self.target.configure(self.hef, self.configure_params)
        self.network_group = self.network_groups[0]
        
        # Create network group params
        self.network_group_params = self.network_group.create_params()
        
        # Get input and output stream info
        self.input_vstream_info = self.hef.get_input_vstream_infos()[0]
        self.output_vstream_infos = self.hef.get_output_vstream_infos()
        
        # Print input and output stream information for debugging
        print(f"Input stream shape: {self.input_vstream_info.shape}")
        print(f"Input stream name: {self.input_vstream_info.name}")
        print(f"Number of output streams: {len(self.output_vstream_infos)}")
        for i, out_stream in enumerate(self.output_vstream_infos):
            print(f"Output stream {i} name: {out_stream.name}, shape: {out_stream.shape}")
        
        # Create input and output virtual stream parameters
        self.input_vstreams_params = InputVStreamParams.make(
            self.network_group, 
            format_type=FormatType.FLOAT32
        )
        self.output_vstreams_params = OutputVStreamParams.make(
            self.network_group, 
            format_type=FormatType.FLOAT32
        )
        
        # Input image dimensions
        self.input_height, self.input_width, self.input_channels = self.input_vstream_info.shape

    def preprocess_frame(self, frame):
        # Resize frame to model input size (640x640)
        resized_frame = cv2.resize(frame, (self.input_width, self.input_height))
        
        # Convert color space if needed (BGR to RGB)
        preprocessed_frame = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB)
        
        # Normalize pixel values (0-1)
        normalized_frame = preprocessed_frame.astype(np.float32) / 255.0
        
        # Ensure the frame matches the expected input shape
        assert normalized_frame.shape == (self.input_height, self.input_width, self.input_channels), \
            f"Input shape mismatch. Expected {(self.input_height, self.input_width, self.input_channels)}, got {normalized_frame.shape}"
        
        return normalized_frame

    def run_inference(self, preprocessed_frame):
        # Prepare input data dictionary
        # Ensure input is a 4D tensor with batch dimension
        input_data = {
            self.input_vstream_info.name: np.expand_dims(preprocessed_frame, axis=0)
        }
        
        # Use InferVStreams for inference
        with InferVStreams(
            self.network_group, 
            self.input_vstreams_params, 
            self.output_vstreams_params
        ) as infer_pipeline:
            # Activate network group
            with self.network_group.activate(self.network_group_params):
                # Run inference
                infer_results = infer_pipeline.infer(input_data)
                
                # Return outputs for all output streams
                return {
                    out_stream.name: infer_results[out_stream.name] 
                    for out_stream in self.output_vstream_infos
                }

    def postprocess_results(self, outputs, original_frame):
    # Iterate over output streams and convert lists to NumPy arrays
        for stream_name, output in outputs.items():
            output_array = np.array(output)  # Convert to NumPy array

            # Now you can safely access .shape and .dtype
            print(f"Output stream {stream_name} shape: {output_array.shape}")
            print(f"Output stream {stream_name} dtype: {output_array.dtype}")
        
        # If you want to draw something on the frame
        result_frame = original_frame.copy()
        
        return result_frame


def main():
    # Specify the exact path to your HEF file
    HEF_PATH = '/usr/share/hailo-models/yolov8s_h8l.hef'

    # Setup Picamera2
    picam2 = picamera2.Picamera2()
    config = picam2.create_preview_configuration(
        main={"size": (1920, 1080)},
        lores={"size": (640, 480)},
        display="lores"
    )
    picam2.configure(config)
    
    # Initialize Hailo inference
    hailo_inference = HailoYOLOInference(hef_path=HEF_PATH)
    
    # Start camera
    picam2.start()
    
    try:
        while True:
            # Capture frame
            frame = picam2.capture_array()
            
            # Preprocess frame
            preprocessed_frame = hailo_inference.preprocess_frame(frame)
            
            # Run inference
            outputs = hailo_inference.run_inference(preprocessed_frame)
            
            # Postprocess and visualize results
            result_frame = hailo_inference.postprocess_results(outputs, frame)
            
            # Display frame
            cv2.imshow('Hailo YOLOv8 Inference', result_frame)
            
            # Exit on 'q' key press
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
    
    finally:
        # Cleanup
        picam2.stop()
        cv2.destroyAllWindows()

if __name__ == '__main__':
    main() 

The documentation is very unclear and there is no clear instructions on what code does what?

Welcome to the Hailo Community!

First, please use the </> Preformatted text feature for source code next time to make the post readable. I formatted it for you.

Our examples cater to different users. Some are designed for direct use with minimal modifications, while others are more complex and allow greater flexibility.

If you need help understanding unfamiliar code, ChatGPT is a great tool to try. It can suggest modifications and provide explanations, though debugging skills are still necessary, as it may occasionally make mistakes.

Did you have a look at our Raspberry Pi 5 examples?

GitHub - Hailo rpi5 examples

I did had a look at the hailo rpi5 examples, but if i do to the detection.py example in it, then it is preloading the video from the Gstreamer library itself, how do i customize this pipeline for my needs? Do you have any step by step tutorial on how to create a custom object detection pipeline, in the docs you have stated what each function does with its params, but there is no tutorial of in what order to use those functions.

I need to create a custom video processing piepline, can you help?

Hi @helter_skelter
You can use our PySDK to develop such a flow. In fact, a few other Hailo community users were able to do what you outlined. Please see Processing live video stream from opencv on raspberry pi 5 - General - Hailo Community for a discussion and solution.

Can you please provide direct link to the pySDK docs

Hi @helter_skelter
You can find PySDK docs at: DeGirum Docs. Here is a github repo with instructions to setup and some usage examples: Hailo examples