In a previous thread (Here I asked for help learning to write code using python to run inference on an image. Omria pointed me towards the Hailo-Application-Code-Example repo on github, and that had exactly what I was looking for. I was able to run that code and use it was a template for my own code. I’ve pulled some of the code form the pose estimation example out of the classes they were written in and included it in my sample code as standalone functions, making adjustments where necessary. The preprocessing and inference seem to work, but during the post processing I’m getting an error from the decoder() function.
On line 271 of my code (full code included below) this error gets reported:
ValueError: all the input array dimensions except for the concatenation axis must match exactly, but along dimension 1, the array at index 0 has size 400 and the array at index 1 has size 8400
I am loading the same sample image (zidane.jpg) from the Hailo-Application-Code-Example repo, so I would expect the output of the inference to be the same, but my code throws the above error while the example code from github works fine. I’ve tried adding debugging to see where things get weird, but up until the error everything looks reasonably similar. The only difference is I’m loading the image via openCV, not PIL. Can anyone help me identify what might be going wrong in my code? Thanks in advance!
My Code Here:
from picamera2 import Picamera2
import cv2
import numpy as np
import multiprocessing as mp
from multiprocessing import Process
from hailo_platform import HEF
from hailo_utils import HailoAsyncInference, load_input_images, validate_images, divide_list_to_batches
MODEL_NAME="yolov8s_pose.hef"
BATCH_SIZE=1
CLASS_NUM =1
REGRESSION_LENGTH = 15
STRIDES=[8, 16, 32]
def output_data_type2dict(hef: HEF, data_type: str) -> dict:
"""
initiates a dictionary where the keys are layers names and
all values are the same requested data type.
Args:
hef(HEF) : the HEF model file.
data_type(str) : the requested data type (e.g 'FLOAT32', 'UINT8', or 'UINT16')
Returns:
Dict: layer name to data type
"""
data_type_dict = {info.name: data_type for info in hef.get_output_vstream_infos()}
return data_type_dict
def preprocess(image, model_w, model_h):
"""
Resize image with unchanged aspect ratio using padding.
Args:
image (PIL.Image.Image): Input image.
model_w (int): Model input width.
model_h (int): Model input height.
Returns:
PIL.Image.Image: Preprocessed and padded image.
"""
img_h, img_w, channels = image.shape
scale = min(model_w / img_w, model_h / img_h)
new_img_w, new_img_h = int(img_w * scale), int(img_h * scale)
print(f"{img_w}x{img_h} {scale} {new_img_w}x{new_img_h}")
# Step 1: Resize to width = 640, keep aspect ratio
target_width = 640
height, width = image.shape[:2]
scale = target_width / width
new_height = int(height * scale)
resized_img = cv2.resize(image, (target_width, new_height), interpolation=cv2.INTER_AREA)
# Step 2: Pad vertically to reach 640 height
pad_vert = 640 - new_height
if pad_vert < 0:
raise ValueError("New height exceeds 640 ? something went wrong.")
pad_top = pad_vert // 2
pad_bottom = pad_vert - pad_top
# Step 3: Add padding
padded_img = cv2.copyMakeBorder(resized_img,
top=pad_top,
bottom=pad_bottom,
left=0,
right=0,
borderType=cv2.BORDER_CONSTANT,
value=(0, 0, 0)) # black padding
#For now just force the image to 640x640
# return cv2.resize(image,(model_w,model_h))
return padded_img
# image = image.resize((new_img_w, new_img_h), Image.Resampling.BICUBIC)
# padding_color = (114, 114, 114)
# padded_image = Image.new('RGB', (model_w, model_h), padding_color)
# padded_image.paste(image, ((model_w - new_img_w) // 2, (model_h - new_img_h) // 2))
# return padded_image
def preprocess_input(
image,
input_queue,
width,
height ):
processed_batch=[]
processed_image=preprocess(image, width, height)
processed_batch.append(processed_image)
input_queue.put(processed_batch)
input_queue.put(None)
def _softmax(x):
return np.exp(x) / np.expand_dims(np.sum(np.exp(x), axis=-1), axis=-1)
def xywh2xyxy(x):
y = np.copy(x)
y[:, 0] = x[:, 0] - x[:, 2] / 2
y[:, 1] = x[:, 1] - x[:, 3] / 2
y[:, 2] = x[:, 0] + x[:, 2] / 2
y[:, 3] = x[:, 1] + x[:, 3] / 2
return y
def max_value(a, b):
return a if a >= b else b
def min_value(a,b):
return a if a <= b else b
def nms(dets, thresh):
x1, y1, x2, y2 = dets[:, 0], dets[:, 1], dets[:, 2], dets[:, 3]
scores = dets[:, 4]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = np.argsort(scores)[::-1]
suppressed = np.zeros(dets.shape[0], dtype=int)
for i in range(len(order)):
idx_i = order[i]
if suppressed[idx_i] == 1:
continue
for j in range(i + 1, len(order)):
idx_j = order[j]
if suppressed[idx_j] == 1:
continue
xx1 = max_value(x1[idx_i], x1[idx_j])
yy1 = max_value(y1[idx_i], y1[idx_j])
xx2 = min_value(x2[idx_i], x2[idx_j])
yy2 = min_value(y2[idx_i], y2[idx_j])
w = max_value(0.0, xx2 - xx1 + 1)
h = max_value(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[idx_i] + areas[idx_j] - inter)
if ovr >= thresh:
suppressed[idx_j] = 1
return np.where(suppressed == 0)[0]
def decoder(raw_boxes, raw_kpts, strides, image_dims, reg_max):
boxes = None
decoded_kpts = None
print(raw_boxes)
for box_distribute, kpts, stride, _ in zip(raw_boxes, raw_kpts, strides, np.arange(3)):
shape = [int(x / stride) for x in image_dims]
grid_x = np.arange(shape[1]) + 0.5
grid_y = np.arange(shape[0]) + 0.5
grid_x, grid_y = np.meshgrid(grid_x, grid_y)
ct_row = grid_y.flatten() * stride
ct_col = grid_x.flatten() * stride
center = np.stack((ct_col, ct_row, ct_col, ct_row), axis=1)
print(f"{shape}") # {grid_x} {grid_y} {ct_row} {ct_col} {center}")
reg_range = np.arange(reg_max + 1)
box_distribute = np.reshape(box_distribute,
(-1,
box_distribute.shape[1] * box_distribute.shape[2],
4,
reg_max + 1))
box_distance = _softmax(box_distribute) * np.reshape(reg_range, (1, 1, 1, -1))
box_distance = np.sum(box_distance, axis=-1) * stride
box_distance = np.concatenate([box_distance[:, :, :2] * (-1), box_distance[:, :, 2:]],
axis=-1)
decode_box = np.expand_dims(center, axis=0) + box_distance
xmin, ymin, xmax, ymax = decode_box[:, :, 0], decode_box[:, :, 1], decode_box[:, :, 2], decode_box[:, :, 3]
decode_box = np.transpose([xmin, ymin, xmax, ymax], [1, 2, 0])
xywh_box = np.transpose([(xmin + xmax) / 2,
(ymin + ymax) / 2, xmax - xmin, ymax - ymin], [1, 2, 0])
boxes = xywh_box if boxes is None else np.concatenate([boxes, xywh_box], axis=1)
kpts[..., :2] *= 2
kpts[..., :2] = stride * (kpts[..., :2] - 0.5) + np.expand_dims(center[..., :2], axis=1)
decoded_kpts = kpts if decoded_kpts is None else np.concatenate([decoded_kpts, kpts],
axis=1)
return boxes, decoded_kpts
def non_max_supression(prediction, conf_thres=0.1, iou_thres=0.45, max_det=100, n_kpts=17):
assert 0 <= conf_thres <= 1, f'Invalid confidence threshold {conf_thres}, valid values are between 0.0 and 1.0'
assert 0 <= iou_thres <= 1, f'Invalid IoU threshold {iou_thres}, valid values are between 0.0 and 1.0'
nc = prediction.shape[2] - n_kpts * 3 - 4
xc = prediction[..., 4] > conf_thres
ki = 4 + nc
output = []
for xi, x in enumerate(prediction):
x = x[xc[xi]]
if not x.shape[0]:
output.append({
'bboxes': np.zeros((0, 4)),
'keypoints': np.zeros((0, n_kpts, 3)),
'scores': np.zeros((0)),
'num_detections': 0
})
continue
boxes = xywh2xyxy(x[:, :4])
kpts = x[:, ki:]
conf = np.expand_dims(x[:, 4:ki].max(1), 1)
j = np.expand_dims(x[:, 4:ki].argmax(1), 1).astype(np.float32)
keep = np.squeeze(conf, 1) > conf_thres
x = np.concatenate((boxes, conf, j, kpts), 1)[keep]
x = x[x[:, 4].argsort()[::-1][:max_det]]
if not x.shape[0]:
output.append({
'bboxes': np.zeros((0, 4)),
'keypoints': np.zeros((0, n_kpts, 3)),
'scores': np.zeros((0)),
'num_detections': 0
})
continue
boxes = x[:, :4]
scores = x[:, 4]
kpts = x[:, 6:].reshape(-1, n_kpts, 3)
i = nms(np.concatenate((boxes, np.expand_dims(scores, 1)), axis=1), iou_thres)
output.append({
'bboxes': boxes[i],
'keypoints': kpts[i],
'scores': scores[i],
'num_detections': len(i)
})
return output
def _sigmoid(x):
return 1 / (1 + np.exp(-x))
def extract_pose_estimation_results(endnodes, height, width):
batch_size = endnodes[0].shape[0]
strides = STRIDES[::-1]
image_dims = (height, width)
raw_boxes = endnodes[:7:3]
scores = [
np.reshape(s, (-1, s.shape[1] * s.shape[2], CLASS_NUM)) for s in endnodes[1:8:3]
]
scores = np.concatenate(scores, axis=1)
kpts = [
np.reshape(c, (-1, c.shape[1] * c.shape[2], 17, 3)) for c in endnodes[2:9:3]
]
decoded_boxes, decoded_kpts = decoder(raw_boxes,kpts, strides,
image_dims, REGRESSION_LENGTH)
decoded_kpts = np.reshape(decoded_kpts, (batch_size, -1, 51))
print(f"{decoded_boxes.shape} {decoded_kpts.shape}")
predictions = np.concatenate([decoded_boxes, scores, decoded_kpts], axis=2)
nms_res = non_max_suppression(
predictions, conf_thres=0.001,
iou_thres=0.7, max_det=100
)
output = {
'bboxes': np.zeros((batch_size, 100, 4)),
'keypoints': np.zeros((batch_size, 100, 17, 2)),
'joint_scores': np.zeros((batch_size, 100, 17, 1)),
'scores': np.zeros((batch_size, 100, 1))
}
for b in range(batch_size):
output['bboxes'][b, :nms_res[b]['num_detections']] = nms_res[b]['bboxes']
output['keypoints'][b, :nms_res[b]['num_detections']] = nms_res[b]['keypoints'][..., :2]
output['joint_scores'][b, :nms_res[b]['num_detections'],
..., 0] = _sigmoid(nms_res[b]['keypoints'][..., 2])
output['scores'][b, :nms_res[b]['num_detections'], ..., 0] = nms_res[b]['scores']
return output
def post_process(raw_detections, height, width):
class_num=CLASS_NUM
raw_detections_keys = list(raw_detections.keys())
layer_from_shape = {raw_detections[key].shape: key for key in raw_detections_keys}
detection_output_channels = (REGRESSION_LENGTH + 1) * 4 # (regression length + 1) * num_coordinates
keypoints = 51
endnodes = [
raw_detections[layer_from_shape[1, 20, 20, detection_output_channels]],
raw_detections[layer_from_shape[1, 20, 20, class_num]],
raw_detections[layer_from_shape[1, 20, 20, keypoints]],
raw_detections[layer_from_shape[1, 40, 40, detection_output_channels]],
raw_detections[layer_from_shape[1, 40, 40, class_num]],
raw_detections[layer_from_shape[1, 40, 40, keypoints]],
raw_detections[layer_from_shape[1, 80, 80, detection_output_channels]],
raw_detections[layer_from_shape[1, 80, 80, class_num]],
raw_detections[layer_from_shape[1, 80, 80, keypoints]]
]
predictions_dict = extract_pose_estimation_results(endnodes, height, width)
print(predictions_dict)
def postprocess_output(
output_queue,
width,
height):
result=output_queue.get()
if result is None:
print("postprocess_output: No more results")
return #Exit loop if sentinel value (?) is received
processed_image, raw_detections = result
#TODO: Do postprocessing and visualization here
post_process(raw_detections, height, width)
def infer(
image,
hef_file,
layer_types ):
#Create input queues
input_queue = mp.Queue()
output_queue = mp.Queue()
#Create hailo inference object
hailo_inference = HailoAsyncInference(
hef_file, input_queue, output_queue, BATCH_SIZE, output_type=layer_types
)
height, width, _ = hailo_inference.get_input_shape()
#Create preprocess task
preprocess = Process(
target=preprocess_input,
name="image_enqueuer",
args=(image, input_queue, width, height)
)
#Create postprocess task
postprocess = Process(
target=postprocess_output,
name="image_processor",
args=(output_queue, width, height)
)
preprocess.start()
postprocess.start()
try:
hailo_inference.run()
preprocess.join()
# To signal processing process to exit
output_queue.put(None)
postprocess.join()
#check_process_errors(preprocess, postprocess)
print(f'Inference was successful!')
except Exception as e:
print(f"Inference error: {e}")
# Ensure cleanup if there's an error
input_queue.close()
output_queue.close()
preprocess.terminate()
postprocess.terminate()
exit(1) # Force exit on error
if __name__ == "__main__":
#Start camera
cap = Picamera2()
cap.configure(cap.create_preview_configuration(main={"format": "RGB888", "size": (1920,1080)}))
cap.start()
try:
while True:
#Capt Frame
frame=cap.capture_array()
frame = cv2.imread("zidane.jpg")
#COnvert to correct color space
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#Get output layer types for model
output_type_dict = output_data_type2dict(HEF(MODEL_NAME), 'FLOAT32')
infer(frame, MODEL_NAME, output_type_dict)
#Show frame (resized to fit on the VNC screen)
frame=cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
cv2.imshow("Pose Estimation Test", cv2.resize(frame,(640,360)))
if cv2.waitKey(1) & 0xFF==ord('q'):
break
finally:
cv2.destroyAllWindows()
#Stop Camera
cap.stop()