Help with inference/post-processing for the scrfd model for face detection

I’ve solved my issue by rescaling the landmark outputs from the network from uint8 → float32 and applying a denormalization.

I’m getting pretty good results, but they aren’t as good as the official insightface using the ONNX file. I was wondering if this is because the model is quantized and therefor lose some accuracy, or if it’s because some other reason.

@omria Do you have any ideas?

You can see here that the results are a few pixels off.

Here is my code

import os
import queue
import sys
import threading

import cv2
import numpy as np
from PIL import Image
# Add the project root to the Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, project_root)

from nvr.ai.inference.hailo_inference import HailoAsyncInference
from nvr.util.image import SharedMemoryFrameManager
from nvr.config import PixelFormatEnum
from nvr.ai.face_detection.face_postprocessing import SCRFDPostProc
from pathlib import Path

class FaceDetector:
    def __init__(
        self,
        hef_path,
        image_dims=(640, 640),
    ):
        self.image_dims = image_dims
        self.frame_manager = SharedMemoryFrameManager()
        self.input_queue = queue.Queue()
        self.output_queue = queue.Queue()
        self.inference = HailoAsyncInference(
            hef_path=hef_path,
            input_queue=self.input_queue,
            output_queue=self.output_queue,
            batch_size=1,
        )

        self.inference_thread = threading.Thread(target=self.inference.run)
        self.inference_thread.start()
        print("Face detection thread started")

        config = {
            "nms_iou_thresh": 0.25,
            "score_threshold": 0.5,
            "anchors": {
                "steps": [8, 16, 32],
                "min_sizes": [
                    [16, 32],
                    [64, 128],
                    [256, 512]
                ]
            }
        }

        self.postprocessor = SCRFDPostProc(
            image_dims=image_dims,
            nms_iou_thresh=config["nms_iou_thresh"],
            score_threshold=config["score_threshold"],
            anchors=config["anchors"]
        )


    def run_network_inference(self, input_batch):
        # Create shared memory buffers for each input
        shm_names = []
        for batch_item in input_batch:
            shm_name = f"face_detection_{id(batch_item)}"
            buffer = self.frame_manager.create(shm_name, batch_item.nbytes)
            buffer[:] = batch_item.tobytes()
            shm_names.append((shm_name, batch_item.shape))

        # Send to inference queue
        self.input_queue.put(shm_names)

        # Collect results
        results = []
        for _ in input_batch:
            _, result = self.output_queue.get()
            results.append(result)

        # Clean up shared memory
        for shm_name, _ in shm_names:
            self.frame_manager.delete(shm_name)

        return results
    
    def preprocess_image(self,image: Image.Image) -> np.ndarray:
        image = image.convert("RGB")
        image = image.resize(self.image_dims)
        image = np.array(image)
        return image
    
    def rescale_network_outputs(self, outputs):
        box_layer_names = ["scrfd_2_5g/conv43", "scrfd_2_5g/conv50", "scrfd_2_5g/conv56"]
        class_layer_names = ["scrfd_2_5g/conv42", "scrfd_2_5g/conv49", "scrfd_2_5g/conv55"]
        landmark_layer_names = ["scrfd_2_5g/conv44", "scrfd_2_5g/conv51", "scrfd_2_5g/conv57"]

        rescaled_outputs = []
        for output_name, output in outputs[0].items():
            # Convert to float32 to avoid overflow
            output = output.astype(np.float32)

            if output_name in box_layer_names:
                downscale_factor = 32 # Magic number, but probably the maximum downscale factor
                output = output / downscale_factor
            elif output_name in class_layer_names:
                # From range UINT8 [0, 255] to FLOAT32 [0, 1]
                output = output / 255
            elif output_name in landmark_layer_names:
                # Converts from Qunatized UINT8 to FLOAT32
                # These are approximate values as I couldn't find the exact values in the model
                # Exact values are determined when they compile from onnx to hef
                zero_point = 113
                scale = 29
                output = (output - zero_point) / scale
            else:
                raise ValueError(f"Unknown output name: {output_name}")
            
            reshaped = output.reshape(1, -1, output.shape[-1])
            rescaled_outputs.append(reshaped)

        return rescaled_outputs


    def detect(self, image):
        preprocessed_image = self.preprocess_image(image)
        net_outs = self.run_network_inference([preprocessed_image])
        rescaled_outputs = self.rescale_network_outputs(net_outs)
        results = self.postprocessor.main(rescaled_outputs)
        return results

    def stop(self):
        print("Stopping face detection")
        self.input_queue.put(None)
        self.inference_thread.join()
        print("Face detection thread stopped")


def plot_boxes_on_image(image, results, outpath):
        """Plot detection boxes and facial landmarks on the image."""
        image_with_boxes = image.copy()
        height, width = image.shape[:2]
        
        # Get first item in batch
        boxes = results['detection_boxes'][0]
        scores = results['detection_scores'][0]
        landmarks = results['face_landmarks'][0]

        
        # Draw each box and its landmarks
        for i, (box, score) in enumerate(zip(boxes, scores)):
            # Draw bounding box
            x1, y1, x2, y2 = box
            x1, x2 = int(x1 * width), int(x2 * width)
            y1, y2 = int(y1 * height), int(y2 * height)
            cv2.rectangle(image_with_boxes, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            # Draw score
            score_text = f"Score: {score:.2f}"
            cv2.putText(image_with_boxes, score_text, (x1, y1-10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
            
            # Draw landmarks
            landmarks_points = landmarks[i]
            for j in range(0, len(landmarks_points), 2):
                x = int(landmarks_points[j] * width)
                y = int(landmarks_points[j + 1] * height)
                cv2.circle(image_with_boxes, (x, y), 2, (0, 0, 255), -1)
        
        cv2.imwrite(str(outpath), image_with_boxes)
        return image_with_boxes

def main():
    print("Starting face detection...")
    hef_path = "models/scrfd_2.5g-hailo8.hef"
    image_paths = [Path(img_path) for img_path in Path("sample_images/face").glob("*.jpg")]

    image_dims = (640, 640)
    face_detector = FaceDetector(hef_path=hef_path, image_dims=image_dims)

    try:
        for img_path in image_paths:
            image = Image.open(img_path)
        
            print("Image read successfully")
            print(f"Image shape: {image.size}")

            results = face_detector.detect(image)
            outpath = Path("outputs") / f"face_detection/{img_path.name}"
            outpath.parent.mkdir(parents=True, exist_ok=True)

            # Plot the results on the image
            preprocessed_image = face_detector.preprocess_image(image)
            opencv_image = cv2.cvtColor(preprocessed_image, cv2.COLOR_RGB2BGR)
            plot_boxes_on_image(opencv_image, results, outpath=outpath)


    finally:
        face_detector.stop()



if __name__ == "__main__":
    main()

import numpy as np

# Post processing code ported and modified from
# https://github.com/hailo-ai/hailo_model_zoo/blob/master/hailo_model_zoo/core/postprocessing/face_detection/scrfd.py

# Non maximum suppression code mostly taken from
# https://blog.roboflow.com/how-to-code-non-maximum-suppression-nms-in-plain-numpy/



def box_iou_batch(boxes_a: np.ndarray, boxes_b: np.ndarray) -> np.ndarray:

        def box_area(box):
            return (box[2] - box[0]) * (box[3] - box[1])

        area_a = box_area(boxes_a.T)
        area_b = box_area(boxes_b.T)

        top_left = np.maximum(boxes_a[:, None, :2], boxes_b[:, :2])
        bottom_right = np.minimum(boxes_a[:, None, 2:], boxes_b[:, 2:])

        area_inter = np.prod(
            np.clip(bottom_right - top_left, a_min=0, a_max=None), 2)
            
        return area_inter / (area_a[:, None] + area_b - area_inter)


def non_max_suppression(prediction_boxes: np.ndarray, prediction_scores: np.ndarray, iou_threshold: float) -> np.ndarray:
    classes = np.ones_like(prediction_scores) # Note: Only supports one class
    
    # Reshape our values to expected shape [x1, y1, x2, y2, score, class]
    predictions = np.concatenate([prediction_boxes, prediction_scores[:, np.newaxis], classes[:, np.newaxis]], axis=1)
    rows, columns = predictions.shape

    sort_index = np.flip(predictions[:, 4].argsort())
    predictions = predictions[sort_index]

    boxes = predictions[:, :4]
    categories = predictions[:, 5]
    ious = box_iou_batch(boxes, boxes)
    ious = ious - np.eye(rows)

    keep = np.ones(rows, dtype=bool)

    for index, (iou, category) in enumerate(zip(ious, categories)):
        if not keep[index]:
            continue

        condition = (iou > iou_threshold) & (categories == category)
        keep = keep & ~condition

    return keep[sort_index.argsort()]

class SCRFDPostProc(object):
    NUM_CLASSES = 1
    NUM_LANDMARKS = 10
    LABEL_OFFSET = 1

    def __init__(self, image_dims, nms_iou_thresh, score_threshold, anchors):
        self._image_dims = image_dims
        self._nms_iou_thresh = nms_iou_thresh
        self._score_threshold = score_threshold
        self._num_branches = len(anchors["steps"])
        self.anchors = anchors
        if anchors is None:
            raise ValueError("Missing detection anchors metadata")
        self._anchors = self.extract_anchors(anchors["min_sizes"], anchors["steps"])

    def collect_box_class_predictions(self, output_branches):
        box_predictors_list = []
        class_predictors_list = []
        landmarks_predictors_list = []
        num_branches = self._num_branches
        assert len(output_branches) % num_branches == 0, "All branches must have the same number of output nodes"
        num_output_nodes_per_branch = len(output_branches) // num_branches
        
        for branch_index in range(0, len(output_branches), num_output_nodes_per_branch):
            num_of_batches = output_branches[branch_index].shape[0]
            box_predictors_list.append(output_branches[branch_index].reshape(num_of_batches, -1, 4))
            class_predictors_list.append(
                output_branches[branch_index + 1].reshape(num_of_batches, -1, self.NUM_CLASSES)
            )

            if num_output_nodes_per_branch > 2:
                landmarks_predictors_list.append(
                    output_branches[branch_index + 2].reshape(num_of_batches, -1, 10)
                )

        box_predictors = np.concatenate(box_predictors_list, axis=1)
        class_predictors = np.concatenate(class_predictors_list, axis=1)
        landmarks_predictors = np.concatenate(landmarks_predictors_list, axis=1) if landmarks_predictors_list else None
        return box_predictors, class_predictors, landmarks_predictors

    def extract_anchors(self, min_sizes, steps):
        anchors = []
        for stride, min_size in zip(steps, min_sizes):
            height = self._image_dims[0] // stride
            width = self._image_dims[1] // stride
            num_anchors = len(min_size)

            anchor_centers = np.stack(np.mgrid[:height, :width][::-1], axis=-1).astype(np.float32)
            anchor_centers = (anchor_centers * stride).reshape((-1, 2))
            anchor_centers[:, 0] /= self._image_dims[0]
            anchor_centers[:, 1] /= self._image_dims[1]
            if num_anchors > 1:
                anchor_centers = np.stack([anchor_centers] * num_anchors, axis=1).reshape((-1, 2))
            anchor_scales = np.ones_like(anchor_centers, dtype=np.float32) * stride
            anchor_scales[:, 0] /= self._image_dims[0]
            anchor_scales[:, 1] /= self._image_dims[1]
            anchor = np.concatenate([anchor_centers, anchor_scales], axis=1)
            anchors.append(anchor)
        return np.concatenate(anchors, axis=0)

    def _decode_landmarks(self, landmarks_detections, anchors):
        preds = []
        for i in range(0, self.NUM_LANDMARKS, 2):
            px = anchors[:, 0] + landmarks_detections[:, i] * anchors[:, 2]
            py = anchors[:, 1] + landmarks_detections[:, i + 1] * anchors[:, 3]
            preds.append(px)
            preds.append(py)
        return np.stack(preds, axis=-1)

    def _decode_boxes(self, box_detections, anchors):
        x1 = anchors[:, 0] - box_detections[:, 0] * anchors[:, 2]
        y1 = anchors[:, 1] - box_detections[:, 1] * anchors[:, 3]
        x2 = anchors[:, 0] + box_detections[:, 2] * anchors[:, 2]
        y2 = anchors[:, 1] + box_detections[:, 3] * anchors[:, 3]
        return np.stack([x1, y1, x2, y2], axis=-1)

    def main(self, endnodes):
        box_predictions, classes_predictions, landmarks_predictors = self.collect_box_class_predictions(endnodes)
        additional_fields = {}

        detection_scores = classes_predictions
        batch_size, num_proposals = box_predictions.shape[:2]

        tiled_anchor_boxes = np.tile(self._anchors[np.newaxis, :, :], [batch_size, 1, 1])
        tiled_anchors_boxlist = tiled_anchor_boxes.reshape(-1, 4)

        decoded_boxes = self._decode_boxes(box_predictions.reshape(-1, 4), tiled_anchors_boxlist)
        detection_boxes = decoded_boxes.reshape(batch_size, num_proposals, 4)

        decoded_landmarks = self._decode_landmarks(
            landmarks_predictors.reshape(-1, 10), tiled_anchors_boxlist
        )
        decoded_landmarks = decoded_landmarks.reshape(batch_size, num_proposals, 10)
        detection_boxes = np.expand_dims(detection_boxes, axis=2)

        nmsed_boxes, nmsed_scores, nmsed_landmarks = (
            self._batch_multiclass_nms(
                boxes=detection_boxes,
                scores=detection_scores,
                landmarks=decoded_landmarks
            )
        )

        num_detections = nmsed_scores.size
        results = {
            "detection_boxes": nmsed_boxes,
            "detection_scores": nmsed_scores,
            "num_detections": num_detections,
            "face_landmarks": nmsed_landmarks
        }

        return results

    def _batch_multiclass_nms(self, boxes, scores, landmarks):
        assert boxes.shape[0] == 1, "Batch size must be 1"

        batch_boxes = boxes[0, :, 0, :]  # Shape: [num_boxes, 4]
        batch_scores = scores[0, :, 0]  # Shape: [num_boxes]
        batch_landmarks = landmarks[0]  # Shape: [num_boxes, 10]

        # Filter based on score threshold
        score_mask = batch_scores >= self._score_threshold
        filtered_boxes = batch_boxes[score_mask]
        filtered_scores = batch_scores[score_mask]
        filtered_landmarks = batch_landmarks[score_mask]


        # Apply non-max suppression
        keep_indices = non_max_suppression(
            filtered_boxes, 
            filtered_scores, 
            iou_threshold=self._nms_iou_thresh
        )

        # Apply the keep indices to all our tensors
        nmsed_boxes = filtered_boxes[keep_indices]
        nmsed_scores = filtered_scores[keep_indices]
        nmsed_landmarks = filtered_landmarks[keep_indices]

        # Add batch dimension back
        nmsed_boxes = np.expand_dims(nmsed_boxes, axis=0)
        nmsed_scores = np.expand_dims(nmsed_scores, axis=0)
        nmsed_landmarks = np.expand_dims(nmsed_landmarks, axis=0)
        
        return nmsed_boxes, nmsed_scores, nmsed_landmarks


if __name__ == "__main__":
    config = {
        "nms_iou_thresh": 0.4,
        "score_threshold": 0.02,
        "anchors": {
            "steps": [8, 16, 32],
            "min_sizes": [
                [16, 32],
                [64, 128],
                [256, 512]
            ]
        }
    }

    postprocessor = SCRFDPostProc(
        image_dims=(640, 640),
        nms_iou_thresh=config["nms_iou_thresh"],
        score_threshold=config["score_threshold"],
        anchors=config["anchors"]
    )