Extracting Inference Results from a TAPPAS Pipeline in Python

There are multiple ways of getting the inference results from a TAPPAS pipeline in Python, both before or after post-processing.

GstHailoPython Element

The HailoPython module offers a seamless entry point for user-defined Python scripts within the TAPPAS pipeline. It grants access to both raw and post-processed outputs, alongside the GStreamer buffer.

–To obtain raw outputs using the HailoPython element: replace the hailofilter element with hailopython module=$PATH_TO_MODULE/my_module.py.
– For accessing post-processed outputs: add hailopython module=$PATH_TO_MODULE/my_module.py after the hailofilter .

Here’s an example of what the Python module would look like:

import hailo

# Importing VideoFrame before importing GST is must
from gsthailo import VideoFrame
from gi.repository import Gst


def run(video_frame: VideoFrame):
    for tensor in video_frame.roi.get_tensors():
        print(tensor.name())
    my_tensor = roi.get_tensor("output_layer_name")
    print(f"shape is {my_tensor.height()}X{my_tensor.width()}X{my_tensor.features()})

HailoImportZMQ

HailoImportZMQ is an element which provides an access point in the pipeline to import HailoObjects meta from a ZMQ socket. The meta is added to any pre-existing ROI in the buffer, and then the buffer continues onwards in the pipeline.

Here’s an example of implementing this feature using Python for the client-side:
Server-Side Pipeline

gst-launch-1.0 filesrc location=detection.mp4 name=src_0 ! \
  decodebin ! videoscale ! video/x-raw, pixel-aspect-ratio=1/1 ! \
  videoconvert ! queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! \
  hailonet hef-path=./resources/yolov5m_wo_spp_60p.hef is-active=true batch-size=1 ! \
  queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! \
  hailofilter function-name=yolov5 so-path=/local/workspace/tappas/apps/gstreamer/libs/post_processes/libyolo_post.so config-path=/local/workspace/tappas/apps/gstreamer/general/detection/resources/configs/yolov5.json qos=false ! \
  queue leaky=no max-size-buffers=30 max-size-bytes=0 max-size-time=0 ! \
  hailooverlay ! \
  hailoexportzmq ! \
  videoconvert ! \
  fpsdisplaysink video-sink=xvimagesink name=hailo_display sync=false text-overlay=false

Client-side (Python)

import zmq

port = "5555"

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, "")

print("Collecting updates from weather server...")
socket.connect(f'tcp://localhost:{port}')

while True:
    print(socket.recv())

Here is another example of a client-side python code that parses the Hailo meta-data and displays each detected object’s details:

import sys
import zmq
import time
import json
import numpy as np

def extract_detections(hailo_output, threshold=0.0, extract_track_id=True):

    """Extract detections from the HailoRT-postprocess output."""

    xyxy = []
    confidence = []
    class_id = []
    label= []
    if extract_track_id:
        track_id = []
    num_detections = 0

    for detection in hailo_output:

        hailo_detection= detection['HailoDetection']
        score = hailo_detection['confidence']

        if score < threshold:
            continue

        xmin= hailo_detection['HailoBBox']['xmin']
        ymin= hailo_detection['HailoBBox']['ymin']
        xmax= hailo_detection['HailoBBox']['xmin'] + hailo_detection['HailoBBox']['width']
        ymax= hailo_detection['HailoBBox']['ymin'] + hailo_detection['HailoBBox']['height']
        bbox = [xmin, ymin, xmax, ymax]        
        xyxy.append(bbox)
        confidence.append(score)
        class_id.append(hailo_detection['class_id'])
        label.append(hailo_detection['label'])
        num_detections = num_detections + 1

        if extract_track_id:
            track_id_dict=hailo_detection['SubObjects']
            if len(track_id_dict) > 0:
                track_id.append(track_id_dict[0]['HailoUniqueID']['unique_id'])

    returnedObject = {'xyxy':       np.array(xyxy),
    'confidence': np.array(confidence),
    'class_id':   np.array(class_id),
    'label': label,
    'num_detections': num_detections}

    if extract_track_id:
        returnedObject['tracker_id'] = np.array(track_id)
        
    return returnedObject

port = "5555"
if len(sys.argv) > 1:
    port =  sys.argv[1]
    int(port)
    
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, "")

print ("Collecting updates from hailoexportzm...")

# Use try and except to handle any error
try:
    socket.connect (f"tcp://localhost:{port}")  
except Exception as e:
    print (e)
    print ("Error connecting to the server")
    exit()

print(f"Connection successful to port {port}")

while True:
    hailo_meta_dict = json.loads(socket.recv())

    frame_idx= hailo_meta_dict['buffer_offset']
    detections= hailo_meta_dict['HailoROI']['SubObjects']

    tracked_detections= extract_detections(detections)

    if tracked_detections['num_detections']>0:
        for idx in range(tracked_detections['num_detections']):
            class_id= tracked_detections['class_id'][idx]
            print("Frame #", frame_idx, "Detection: ", tracked_detections['label'][idx], "(", class_id, ")"," with confidence: ", tracked_detections['confidence'][idx], " tracker id: ", tracked_detections['tracker_id'][idx],  " and bbox:", tracked_detections['xyxy'][idx])
1 Like