How to access hailotracker results in a callback?

Hi,

I have this gstreamer pipeline, running on a Pi 5 with the 26 TOPS Hailo HAT

libcamerasrc name=source ! video/x-raw, format=RGB, width=1536, height=864 ! \
  queue name=source_scale_q leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! \
  videoscale name=source_videoscale n-threads=2 ! \
  queue name=source_convert_q leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! \
  videoconvert n-threads=3 name=source_convert qos=false ! \
  video/x-raw, format=RGB, pixel-aspect-ratio=1/1 ! \
  tee name=t ! \
    queue name=bypass_queue leaky=no max-size-buffers=20 max-size-bytes=0 max-size-time=0 ! \
    mux.sink_0 \
  t. ! \
    queue name=queue_hailonet leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! \
    videoconvert n-threads=3 ! \
    hailonet hef-path=yolov8m.hef batch-size=2 nms-score-threshold=0.3 nms-iou-threshold=0.45 output-format-type=HAILO_FORMAT_TYPE_FLOAT32 force-writable=true ! \
    queue name=queue_hailofilter leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! \
    hailofilter so-path=/home/jelsey/dev/cvml-at-the-edge/components/consumer-tracking/libyolo_hailortpp_postprocess.so qos=false ! \
    queue name=queue_hailotracker leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! \
    hailotracker keep-tracked-frames=3 keep-new-frames=3 keep-lost-frames=3 ! \
    queue name=queue_hmuc leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! \
    mux.sink_1 \
hailomuxer name=mux ! \
  queue name=queue_hailo_python leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! \
  queue name=queue_user_callback leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! \
  identity name=identity_callback ! \
  queue name=queue_hailooverlay leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! \
  hailooverlay ! \
  queue name=queue_videoconvert leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! \
  videoconvert n-threads=3 qos=false ! \
  queue name=queue_textoverlay leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! \
  textoverlay name=hailo_text text='test text' valignment=top halignment=center ! \
  queue name=queue_hailo_display leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! \
  fpsdisplaysink video-sink=xvimagesink name=hailo_display sync=false text-overlay=true signal-fps-measurements=true

I pieced this together from the detection with tracker example

This runs ok on my pi and it will track objects like a person, book etc and give it an ID, incrementing from 1.

My question is, how can I have a callback after the hailotracker so I can act on the tracked objects? When person 1 is detected, I want to write some custom python code to emit to MQTT, and again for person 2 etc.

I have a callback that is executing on the detections, but I don’t want to keep emitting the same detected person, hence wanting to wait for a tracking ID to be assigned first.

I’m new to this so apologies if this is a dumb question, open to suggestions on other ways to do this.

Hey @james.elsey ,

Welcome to the Hailo Community!

Our examples in the dev branch have been updated with tracker functionality. Here’s how to implement custom Python processing with HailoTracker:

Processing Tracked Objects with Custom Python Callbacks

You can process tracked objects and trigger events (like MQTT messages) by implementing a callback system after HailoTracker assigns tracking IDs. Here’s how:

Pipeline Configuration

Add an identity element after HailoTracker for callback processing:

hailotracker keep-tracked-frames=3 keep-new-frames=3 keep-lost-frames=3 ! \
    queue name=queue_hmuc leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! \
    identity name=tracker_callback signal-handoffs=true ! \
    mux.sink_1

Implementation Example

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst

class TrackerProcessor:
    def __init__(self):
        self.emitted_ids = set()
        Gst.init(None)
        self.setup_pipeline()

    def setup_pipeline(self):
        pipeline_str = '''
            libcamerasrc name=source ! video/x-raw, format=RGB, width=1536, height=864 ! 
            hailotracker keep-tracked-frames=3 keep-new-frames=3 keep-lost-frames=3 ! 
            queue name=queue_hmuc leaky=no max-size-buffers=3 max-size-bytes=0 max-size-time=0 ! 
            identity name=tracker_callback signal-handoffs=true ! 
            mux.sink_1
        '''
        self.pipeline = Gst.parse_launch(pipeline_str)
        
        # Connect callback
        identity = self.pipeline.get_by_name('tracker_callback')
        identity.connect('handoff', self.process_tracked_objects)
        
    def process_tracked_objects(self, element, buffer, pad):
        tracking_data = self.parse_buffer(buffer)
        self.handle_new_objects(tracking_data)
    
    def parse_buffer(self, buffer):
        # Implement your buffer parsing logic here
        return [{"id": 1, "type": "person"}, {"id": 2, "type": "car"}]
    
    def handle_new_objects(self, objects):
        for obj in objects:
            if obj['id'] not in self.emitted_ids:
                self.publish_to_mqtt(obj)
                self.emitted_ids.add(obj['id'])
    
    def publish_to_mqtt(self, obj):
        # Implement your MQTT publishing logic here
        print(f"Publishing object to MQTT: {obj}")

    def run(self):
        self.pipeline.set_state(Gst.State.PLAYING)
        try:
            while True:
                Gst.main_iteration()
        except KeyboardInterrupt:
            self.pipeline.set_state(Gst.State.NULL)

if __name__ == "__main__":
    processor = TrackerProcessor()
    processor.run()

Integration Notes

  1. Buffer Parsing: Implement the parse_buffer() method according to HailoTracker’s metadata format.

  2. MQTT Integration: Add your MQTT client setup in the publish_to_mqtt() method:

    import paho.mqtt.client as mqtt
    
    def publish_to_mqtt(self, obj):
        client = mqtt.Client()
        client.connect("your-broker", 1883)
        client.publish("tracking/events", str(obj))