Successfully counting COCO objects

Hi All,

Im new to Pi and Hailo 8L. Mostly an Arduino & ESP32 guy, but after building my own Siren Detection System with Edge Impulse, I wanted to move into object recognition and thought its about time I started to learn.

I have managed to completely setup the Hailo 8L and my new PI and everything is working. What I need to do now is count the different COCO elements I have listed in my code as follows:

# Define custom labels to detect (edit as needed)
MY_LABELS = ["person", "car", "truck", "motorcycle", "bicycle", "cell phone"]  # Modify this list with your chosen labels

# Define confidence threshold
CONFIDENCE_THRESHOLD = 0.95  # Adjust the confidence threshold as needed

Looking online, I am supposed to configure NVM’s which should allow me to count each COCO object detected, which led to this:

def app_callback(pad, info, user_data):
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK

    user_data.increment()
    string_to_print = f"Frame count: {user_data.get_count()}\n"

    # Get the video frame information
    format, width, height = get_caps_from_pad(pad)

    frame = None
    if user_data.use_frame and format is not None and width is not None and height is not None:
        frame = get_numpy_from_buffer(buffer, format, width, height)

    # Get detections from the buffer
    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

    detection_count = 0
    for detection in detections:
        label = detection.get_label()
        bbox = detection.get_bbox()
        confidence = detection.get_confidence()

        # Filter detections by confidence level
        if confidence >= CONFIDENCE_THRESHOLD:
            xmin = int(bbox.xmin())
            ymin = int(bbox.ymin())
            xmax = int(bbox.xmax())
            ymax = int(bbox.ymax())

            # Update detection count for the label
            user_data.update_detection_count(label)

            # Draw bounding box and label on the frame
            if user_data.use_frame:
                cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
                text = f"{label}: {confidence:.2f}"
                cv2.putText(frame, text, (xmin, ymin - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

            string_to_print += f"Detected {label} with confidence {confidence:.2f} at [{xmin}, {ymin}, {xmax}, {ymax}]\n"
            detection_count += 1


    # Print the detection counts
    if user_data.use_frame:
        cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(frame, f"{user_data.new_function()} {user_data.new_variable}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        user_data.set_frame(frame)

    for label, count in user_data.detection_counts.items():
        string_to_print += f"Total {label} detections (>= {CONFIDENCE_THRESHOLD*100:.0f}% confidence): {count}\n"

    print(string_to_print)
    return Gst.PadProbeReturn.OK

if __name__ == "__main__":
    user_data = user_app_callback_class()
    app = GStreamerDetectionApp(app_callback, user_data)
    app.run()

But I get dozens of counts when there is only one of me in the shot and simply adjusting the confidence level isnt accurate enough.

Does anyone have any code that I could plagiarise please?

Hey @c.cooper

Welcome to the Hailo Community!

I understand you’re seeing multiple counts even when there’s just one object (like yourself) in the frame. This is happening because the detection is triggering multiple times across frames. Here’s how we can fix this:

  1. Implement object tracking to follow objects across frames instead of detecting them repeatedly. This helps maintain consistent counts by giving each object a unique ID.
  2. Use OpenCV’s built-in tracking capabilities to follow objects once they’re detected. This is more efficient than running detection on every frame.
  3. Keep a running tally of unique objects by their labels, which prevents the same object from being counted multiple times.
  4. For even better results, you can:
  • Use a more accurate (but slightly slower) tracking algorithm
  • Run detection less frequently (every few frames)
  • Adjust your confidence thresholds

Would you like me to provide the specific code implementation for any of these suggestions?

Dear Omria,

Thank you for your reply, which is very helpful and it would be ideal if you could provide the code implementation for your main recommendation.

Regards,

Christopher

Hey @c.cooper

import cv2

class ObjectTracker:
    def __init__(self):
        # Use MultiTracker to track multiple objects
        self.trackers = cv2.legacy.MultiTracker_create()
        self.detected_labels = {}  # Store labels and counts

    def add_tracker(self, frame, bbox, label):
        tracker = cv2.legacy.TrackerKCF_create()  # Or CSRT for better accuracy
        self.trackers.add(tracker, frame, bbox)
        if label not in self.detected_labels:
            self.detected_labels[label] = 1
        else:
            self.detected_labels[label] += 1

    def update_trackers(self, frame):
        success, boxes = self.trackers.update(frame)
        return success, boxes

    def get_counts(self):
        return self.detected_labels

def app_callback(pad, info, user_data):
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK

    frame = get_numpy_from_buffer(buffer, *get_caps_from_pad(pad))
    tracker = user_data.tracker

    detections = hailo.get_roi_from_buffer(buffer).get_objects_typed(hailo.HAILO_DETECTION)

    for detection in detections:
        label = detection.get_label()
        bbox = detection.get_bbox()
        confidence = detection.get_confidence()

        if confidence >= CONFIDENCE_THRESHOLD:
            # Add detected objects to tracker if new
            bbox_tuple = (int(bbox.xmin()), int(bbox.ymin()), 
                          int(bbox.width()), int(bbox.height()))
            tracker.add_tracker(frame, bbox_tuple, label)

    # Update and draw tracked objects
    success, tracked_boxes = tracker.update_trackers(frame)
    for i, box in enumerate(tracked_boxes):
        (x, y, w, h) = [int(v) for v in box]
        cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)

    # Display counts on the frame
    counts = tracker.get_counts()
    y_offset = 30
    for label, count in counts.items():
        text = f"{label}: {count}"
        cv2.putText(frame, text, (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        y_offset += 30

    user_data.set_frame(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
    return Gst.PadProbeReturn.OK

if __name__ == "__main__":
    user_data = user_app_callback_class()
    user_data.tracker = ObjectTracker()  # Initialize the tracker
    app = GStreamerDetectionApp(app_callback, user_data)
    app.run()

You can also look at the implementation of the tracker in : Hailo-Application-Code-Examples/runtime/python/detection_with_tracker/detection_with_tracker.py at main · hailo-ai/Hailo-Application-Code-Examples · GitHub

Hi,

Thank you for that, which I have tried already and I get an error on line 63, namely:

Traceback (most recent call last):
  File "/home/c.cooper/hailo-rpi5-examples/basic_pipelines/detection_tracker.py", line 63, in <module>
    user_data = user_app_callback_class()
                ^^^^^^^^^^^^^^^^^^^^^^^
NameError: name 'user_app_callback_class' is not defined

I have tried to get that particular example working, but couldnt, so I am now trying to get this Post from Edge Impulse operation, but need to build an Ubuntu machine first

If there is an easy solution to the error, just implementing the example code would be easier for me to learn from.

Thanks.

Hi,

I have managed to get it working with the following script.

i

mport gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
import supervision as sv
from hailo_rpi_common import (
    get_caps_from_pad,
    get_numpy_from_buffer,
    app_callback_class,
)
from detection_pipeline import GStreamerDetectionApp

# Line crossing parameters
START = sv.Point(340, 0)  # Line start point (adjust to fit the actual frame)
END = sv.Point(340, 640)  # Line end point (adjust to fit the actual frame)
line_zone = sv.LineZone(start=START, end=END, triggering_anchors=(sv.Position.CENTER, sv.Position.TOP_CENTER, sv.Position.BOTTOM_CENTER))

# -----------------------------------------------------------------------------------------------
# User-defined class to be used in the callback function
# -----------------------------------------------------------------------------------------------
class user_app_callback_class(app_callback_class):
    def __init__(self):
        super().__init__()
        self.new_variable = 42  # Example variable
        self.in_count = 0       # Count of objects crossing in one direction
        self.out_count = 0      # Count of objects crossing in the opposite direction

    def new_function(self):  # New function example
        return "The meaning of life is: "

# -----------------------------------------------------------------------------------------------
# User-defined callback function
# -----------------------------------------------------------------------------------------------
def app_callback(pad, info, user_data):
    # Get the GstBuffer from the probe info
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK

    # Get the video frame
    format, width, height = get_caps_from_pad(pad)
    frame = None
    if user_data.use_frame and format is not None and width is not None and height is not None:
        frame = get_numpy_from_buffer(buffer, format, width, height)

    # Get the detections from the buffer
    roi = hailo.get_roi_from_buffer(buffer)
    hailo_detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

    # Parse the detections into boxes, confidence, and IDs for tracking
    n = len(hailo_detections)
    boxes = np.zeros((n, 4))
    confidence = np.zeros(n)
    class_id = np.zeros(n)
    tracker_id = np.empty(n)

    for i, detection in enumerate(hailo_detections):
        if detection.get_label() == "person":  # Only track "person" objects
            class_id[i] = detection.get_class_id()
            confidence[i] = detection.get_confidence()
            unique_ids = detection.get_objects_typed(hailo.HAILO_UNIQUE_ID)
            tracker_id[i] = unique_ids[0].get_id() if unique_ids else -1  # Use -1 if no unique ID is found
            bbox = detection.get_bbox()
            boxes[i] = [
                bbox.xmin() * width,
                bbox.ymin() * height,
                bbox.xmax() * width,
                bbox.ymax() * height,
            ]

    # Use supervision Detections for line crossing
    detections = sv.Detections(
        xyxy=boxes,
        confidence=confidence,
        class_id=class_id,
        tracker_id=tracker_id
    )

    # Check line crossings
    line_zone.trigger(detections)
    user_data.in_count = line_zone.in_count
    user_data.out_count = line_zone.out_count

    # Overlay text for in and out counts
    if user_data.use_frame:
        # Draw the line on the frame
        start_point = (START.x, START.y)
        end_point = (END.x, END.y)
        cv2.line(frame, start_point, end_point, (0, 255, 0), 2)

        # Display in and out counts
        cv2.putText(frame, f'IN: {user_data.in_count}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(frame, f'OUT: {user_data.out_count}', (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

        # Display example variable and function result
        cv2.putText(frame, f"{user_data.new_function()} {user_data.new_variable}", (10, 110), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)

        # Convert the frame to BGR and update it
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
        user_data.set_frame(frame)

    return Gst.PadProbeReturn.OK

if __name__ == "__main__":
    # Create an instance of the user app callback class
    user_data = user_app_callback_class()
    app = GStreamerDetectionApp(app_callback, user_data)
    app.run()

Not the prettiest, but now I can improve it.

Thanks.