I’ve solved my issue by rescaling the landmark outputs from the network from uint8 → float32 and applying a denormalization.
I’m getting pretty good results, but they aren’t as good as the official insightface using the ONNX file. I was wondering if this is because the model is quantized and therefor lose some accuracy, or if it’s because some other reason.
@omria Do you have any ideas?
You can see here that the results are a few pixels off.
Here is my code
import os
import queue
import sys
import threading
import cv2
import numpy as np
from PIL import Image
# Add the project root to the Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, project_root)
from nvr.ai.inference.hailo_inference import HailoAsyncInference
from nvr.util.image import SharedMemoryFrameManager
from nvr.config import PixelFormatEnum
from nvr.ai.face_detection.face_postprocessing import SCRFDPostProc
from pathlib import Path
class FaceDetector:
def __init__(
self,
hef_path,
image_dims=(640, 640),
):
self.image_dims = image_dims
self.frame_manager = SharedMemoryFrameManager()
self.input_queue = queue.Queue()
self.output_queue = queue.Queue()
self.inference = HailoAsyncInference(
hef_path=hef_path,
input_queue=self.input_queue,
output_queue=self.output_queue,
batch_size=1,
)
self.inference_thread = threading.Thread(target=self.inference.run)
self.inference_thread.start()
print("Face detection thread started")
config = {
"nms_iou_thresh": 0.25,
"score_threshold": 0.5,
"anchors": {
"steps": [8, 16, 32],
"min_sizes": [
[16, 32],
[64, 128],
[256, 512]
]
}
}
self.postprocessor = SCRFDPostProc(
image_dims=image_dims,
nms_iou_thresh=config["nms_iou_thresh"],
score_threshold=config["score_threshold"],
anchors=config["anchors"]
)
def run_network_inference(self, input_batch):
# Create shared memory buffers for each input
shm_names = []
for batch_item in input_batch:
shm_name = f"face_detection_{id(batch_item)}"
buffer = self.frame_manager.create(shm_name, batch_item.nbytes)
buffer[:] = batch_item.tobytes()
shm_names.append((shm_name, batch_item.shape))
# Send to inference queue
self.input_queue.put(shm_names)
# Collect results
results = []
for _ in input_batch:
_, result = self.output_queue.get()
results.append(result)
# Clean up shared memory
for shm_name, _ in shm_names:
self.frame_manager.delete(shm_name)
return results
def preprocess_image(self,image: Image.Image) -> np.ndarray:
image = image.convert("RGB")
image = image.resize(self.image_dims)
image = np.array(image)
return image
def rescale_network_outputs(self, outputs):
box_layer_names = ["scrfd_2_5g/conv43", "scrfd_2_5g/conv50", "scrfd_2_5g/conv56"]
class_layer_names = ["scrfd_2_5g/conv42", "scrfd_2_5g/conv49", "scrfd_2_5g/conv55"]
landmark_layer_names = ["scrfd_2_5g/conv44", "scrfd_2_5g/conv51", "scrfd_2_5g/conv57"]
rescaled_outputs = []
for output_name, output in outputs[0].items():
# Convert to float32 to avoid overflow
output = output.astype(np.float32)
if output_name in box_layer_names:
downscale_factor = 32 # Magic number, but probably the maximum downscale factor
output = output / downscale_factor
elif output_name in class_layer_names:
# From range UINT8 [0, 255] to FLOAT32 [0, 1]
output = output / 255
elif output_name in landmark_layer_names:
# Converts from Qunatized UINT8 to FLOAT32
# These are approximate values as I couldn't find the exact values in the model
# Exact values are determined when they compile from onnx to hef
zero_point = 113
scale = 29
output = (output - zero_point) / scale
else:
raise ValueError(f"Unknown output name: {output_name}")
reshaped = output.reshape(1, -1, output.shape[-1])
rescaled_outputs.append(reshaped)
return rescaled_outputs
def detect(self, image):
preprocessed_image = self.preprocess_image(image)
net_outs = self.run_network_inference([preprocessed_image])
rescaled_outputs = self.rescale_network_outputs(net_outs)
results = self.postprocessor.main(rescaled_outputs)
return results
def stop(self):
print("Stopping face detection")
self.input_queue.put(None)
self.inference_thread.join()
print("Face detection thread stopped")
def plot_boxes_on_image(image, results, outpath):
"""Plot detection boxes and facial landmarks on the image."""
image_with_boxes = image.copy()
height, width = image.shape[:2]
# Get first item in batch
boxes = results['detection_boxes'][0]
scores = results['detection_scores'][0]
landmarks = results['face_landmarks'][0]
# Draw each box and its landmarks
for i, (box, score) in enumerate(zip(boxes, scores)):
# Draw bounding box
x1, y1, x2, y2 = box
x1, x2 = int(x1 * width), int(x2 * width)
y1, y2 = int(y1 * height), int(y2 * height)
cv2.rectangle(image_with_boxes, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Draw score
score_text = f"Score: {score:.2f}"
cv2.putText(image_with_boxes, score_text, (x1, y1-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
# Draw landmarks
landmarks_points = landmarks[i]
for j in range(0, len(landmarks_points), 2):
x = int(landmarks_points[j] * width)
y = int(landmarks_points[j + 1] * height)
cv2.circle(image_with_boxes, (x, y), 2, (0, 0, 255), -1)
cv2.imwrite(str(outpath), image_with_boxes)
return image_with_boxes
def main():
print("Starting face detection...")
hef_path = "models/scrfd_2.5g-hailo8.hef"
image_paths = [Path(img_path) for img_path in Path("sample_images/face").glob("*.jpg")]
image_dims = (640, 640)
face_detector = FaceDetector(hef_path=hef_path, image_dims=image_dims)
try:
for img_path in image_paths:
image = Image.open(img_path)
print("Image read successfully")
print(f"Image shape: {image.size}")
results = face_detector.detect(image)
outpath = Path("outputs") / f"face_detection/{img_path.name}"
outpath.parent.mkdir(parents=True, exist_ok=True)
# Plot the results on the image
preprocessed_image = face_detector.preprocess_image(image)
opencv_image = cv2.cvtColor(preprocessed_image, cv2.COLOR_RGB2BGR)
plot_boxes_on_image(opencv_image, results, outpath=outpath)
finally:
face_detector.stop()
if __name__ == "__main__":
main()
import numpy as np
# Post processing code ported and modified from
# https://github.com/hailo-ai/hailo_model_zoo/blob/master/hailo_model_zoo/core/postprocessing/face_detection/scrfd.py
# Non maximum suppression code mostly taken from
# https://blog.roboflow.com/how-to-code-non-maximum-suppression-nms-in-plain-numpy/
def box_iou_batch(boxes_a: np.ndarray, boxes_b: np.ndarray) -> np.ndarray:
def box_area(box):
return (box[2] - box[0]) * (box[3] - box[1])
area_a = box_area(boxes_a.T)
area_b = box_area(boxes_b.T)
top_left = np.maximum(boxes_a[:, None, :2], boxes_b[:, :2])
bottom_right = np.minimum(boxes_a[:, None, 2:], boxes_b[:, 2:])
area_inter = np.prod(
np.clip(bottom_right - top_left, a_min=0, a_max=None), 2)
return area_inter / (area_a[:, None] + area_b - area_inter)
def non_max_suppression(prediction_boxes: np.ndarray, prediction_scores: np.ndarray, iou_threshold: float) -> np.ndarray:
classes = np.ones_like(prediction_scores) # Note: Only supports one class
# Reshape our values to expected shape [x1, y1, x2, y2, score, class]
predictions = np.concatenate([prediction_boxes, prediction_scores[:, np.newaxis], classes[:, np.newaxis]], axis=1)
rows, columns = predictions.shape
sort_index = np.flip(predictions[:, 4].argsort())
predictions = predictions[sort_index]
boxes = predictions[:, :4]
categories = predictions[:, 5]
ious = box_iou_batch(boxes, boxes)
ious = ious - np.eye(rows)
keep = np.ones(rows, dtype=bool)
for index, (iou, category) in enumerate(zip(ious, categories)):
if not keep[index]:
continue
condition = (iou > iou_threshold) & (categories == category)
keep = keep & ~condition
return keep[sort_index.argsort()]
class SCRFDPostProc(object):
NUM_CLASSES = 1
NUM_LANDMARKS = 10
LABEL_OFFSET = 1
def __init__(self, image_dims, nms_iou_thresh, score_threshold, anchors):
self._image_dims = image_dims
self._nms_iou_thresh = nms_iou_thresh
self._score_threshold = score_threshold
self._num_branches = len(anchors["steps"])
self.anchors = anchors
if anchors is None:
raise ValueError("Missing detection anchors metadata")
self._anchors = self.extract_anchors(anchors["min_sizes"], anchors["steps"])
def collect_box_class_predictions(self, output_branches):
box_predictors_list = []
class_predictors_list = []
landmarks_predictors_list = []
num_branches = self._num_branches
assert len(output_branches) % num_branches == 0, "All branches must have the same number of output nodes"
num_output_nodes_per_branch = len(output_branches) // num_branches
for branch_index in range(0, len(output_branches), num_output_nodes_per_branch):
num_of_batches = output_branches[branch_index].shape[0]
box_predictors_list.append(output_branches[branch_index].reshape(num_of_batches, -1, 4))
class_predictors_list.append(
output_branches[branch_index + 1].reshape(num_of_batches, -1, self.NUM_CLASSES)
)
if num_output_nodes_per_branch > 2:
landmarks_predictors_list.append(
output_branches[branch_index + 2].reshape(num_of_batches, -1, 10)
)
box_predictors = np.concatenate(box_predictors_list, axis=1)
class_predictors = np.concatenate(class_predictors_list, axis=1)
landmarks_predictors = np.concatenate(landmarks_predictors_list, axis=1) if landmarks_predictors_list else None
return box_predictors, class_predictors, landmarks_predictors
def extract_anchors(self, min_sizes, steps):
anchors = []
for stride, min_size in zip(steps, min_sizes):
height = self._image_dims[0] // stride
width = self._image_dims[1] // stride
num_anchors = len(min_size)
anchor_centers = np.stack(np.mgrid[:height, :width][::-1], axis=-1).astype(np.float32)
anchor_centers = (anchor_centers * stride).reshape((-1, 2))
anchor_centers[:, 0] /= self._image_dims[0]
anchor_centers[:, 1] /= self._image_dims[1]
if num_anchors > 1:
anchor_centers = np.stack([anchor_centers] * num_anchors, axis=1).reshape((-1, 2))
anchor_scales = np.ones_like(anchor_centers, dtype=np.float32) * stride
anchor_scales[:, 0] /= self._image_dims[0]
anchor_scales[:, 1] /= self._image_dims[1]
anchor = np.concatenate([anchor_centers, anchor_scales], axis=1)
anchors.append(anchor)
return np.concatenate(anchors, axis=0)
def _decode_landmarks(self, landmarks_detections, anchors):
preds = []
for i in range(0, self.NUM_LANDMARKS, 2):
px = anchors[:, 0] + landmarks_detections[:, i] * anchors[:, 2]
py = anchors[:, 1] + landmarks_detections[:, i + 1] * anchors[:, 3]
preds.append(px)
preds.append(py)
return np.stack(preds, axis=-1)
def _decode_boxes(self, box_detections, anchors):
x1 = anchors[:, 0] - box_detections[:, 0] * anchors[:, 2]
y1 = anchors[:, 1] - box_detections[:, 1] * anchors[:, 3]
x2 = anchors[:, 0] + box_detections[:, 2] * anchors[:, 2]
y2 = anchors[:, 1] + box_detections[:, 3] * anchors[:, 3]
return np.stack([x1, y1, x2, y2], axis=-1)
def main(self, endnodes):
box_predictions, classes_predictions, landmarks_predictors = self.collect_box_class_predictions(endnodes)
additional_fields = {}
detection_scores = classes_predictions
batch_size, num_proposals = box_predictions.shape[:2]
tiled_anchor_boxes = np.tile(self._anchors[np.newaxis, :, :], [batch_size, 1, 1])
tiled_anchors_boxlist = tiled_anchor_boxes.reshape(-1, 4)
decoded_boxes = self._decode_boxes(box_predictions.reshape(-1, 4), tiled_anchors_boxlist)
detection_boxes = decoded_boxes.reshape(batch_size, num_proposals, 4)
decoded_landmarks = self._decode_landmarks(
landmarks_predictors.reshape(-1, 10), tiled_anchors_boxlist
)
decoded_landmarks = decoded_landmarks.reshape(batch_size, num_proposals, 10)
detection_boxes = np.expand_dims(detection_boxes, axis=2)
nmsed_boxes, nmsed_scores, nmsed_landmarks = (
self._batch_multiclass_nms(
boxes=detection_boxes,
scores=detection_scores,
landmarks=decoded_landmarks
)
)
num_detections = nmsed_scores.size
results = {
"detection_boxes": nmsed_boxes,
"detection_scores": nmsed_scores,
"num_detections": num_detections,
"face_landmarks": nmsed_landmarks
}
return results
def _batch_multiclass_nms(self, boxes, scores, landmarks):
assert boxes.shape[0] == 1, "Batch size must be 1"
batch_boxes = boxes[0, :, 0, :] # Shape: [num_boxes, 4]
batch_scores = scores[0, :, 0] # Shape: [num_boxes]
batch_landmarks = landmarks[0] # Shape: [num_boxes, 10]
# Filter based on score threshold
score_mask = batch_scores >= self._score_threshold
filtered_boxes = batch_boxes[score_mask]
filtered_scores = batch_scores[score_mask]
filtered_landmarks = batch_landmarks[score_mask]
# Apply non-max suppression
keep_indices = non_max_suppression(
filtered_boxes,
filtered_scores,
iou_threshold=self._nms_iou_thresh
)
# Apply the keep indices to all our tensors
nmsed_boxes = filtered_boxes[keep_indices]
nmsed_scores = filtered_scores[keep_indices]
nmsed_landmarks = filtered_landmarks[keep_indices]
# Add batch dimension back
nmsed_boxes = np.expand_dims(nmsed_boxes, axis=0)
nmsed_scores = np.expand_dims(nmsed_scores, axis=0)
nmsed_landmarks = np.expand_dims(nmsed_landmarks, axis=0)
return nmsed_boxes, nmsed_scores, nmsed_landmarks
if __name__ == "__main__":
config = {
"nms_iou_thresh": 0.4,
"score_threshold": 0.02,
"anchors": {
"steps": [8, 16, 32],
"min_sizes": [
[16, 32],
[64, 128],
[256, 512]
]
}
}
postprocessor = SCRFDPostProc(
image_dims=(640, 640),
nms_iou_thresh=config["nms_iou_thresh"],
score_threshold=config["score_threshold"],
anchors=config["anchors"]
)