I trained a yolov8m model to detect a single class object. .pt and .onnx models working great. However when I convert my model to .hef format, accuracy decreasing to much, even I set threshold 0.001, model detection is very less than my expect. I was expecting too much false positive but detection number is too less.
Referance Topic : Guide to using the DFC to convert a modified YoloV11 on Google Colab
Here my steps:
1- Train a .pt file with yolov8m base model.
2- Onnx conversion (Its work great):
# convert_to_onnx.py
from ultralytics import YOLO
import torch
import time
import cv2
import numpy as np
def convert_yolo_to_onnx(model_path, output_path=None, imgsz=640):
"""
YOLO modelini ONNX formatına çevirir
"""
if output_path is None:
output_path = model_path.replace('.pt', '.onnx')
print(f"YOLO model yükleniyor: {model_path}")
model = YOLO(model_path)
print("ONNX'e çeviriliyor...")
try:
# ONNX'e çevir
model.export(
format='onnx',
imgsz=imgsz,
opset=12, # ONNX opset versiyonu
simplify=True, # Model basitleştirmesi
dynamic=False, # Dinamik shape'ler (False daha hızlı)
half=False # FP16 precision (GPU için True yapabilirsiniz)
)
print(f"✅ Model başarıyla ONNX formatına çevrildi: {output_path}")
return output_path
except Exception as e:
print(f"❌ ONNX çevirme hatası: {e}")
return None
def test_model_speed(pt_path, onnx_path, test_frames=100):
"""
PT ve ONNX modellerinin hızını karşılaştırır
"""
# Test image oluştur
test_img = np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8)
results = {}
# PT Model Test
print(f"\n🔄 PT Model test ediliyor ({test_frames} frame)...")
model_pt = YOLO(pt_path)
start_time = time.time()
for i in range(test_frames):
_ = model_pt.predict(test_img, verbose=False, conf=0.25)
if (i + 1) % 20 == 0:
print(f" PT Model: {i+1}/{test_frames} frame tamamlandı")
pt_time = time.time() - start_time
pt_fps = test_frames / pt_time
results['PT'] = {'time': pt_time, 'fps': pt_fps}
print(f"✅ PT Model - Toplam süre: {pt_time:.2f}s, FPS: {pt_fps:.1f}")
# ONNX Model Test
print(f"\n🔄 ONNX Model test ediliyor ({test_frames} frame)...")
try:
model_onnx = YOLO(onnx_path)
start_time = time.time()
for i in range(test_frames):
_ = model_onnx.predict(test_img, verbose=False, conf=0.25)
if (i + 1) % 20 == 0:
print(f" ONNX Model: {i+1}/{test_frames} frame tamamlandı")
onnx_time = time.time() - start_time
onnx_fps = test_frames / onnx_time
results['ONNX'] = {'time': onnx_time, 'fps': onnx_fps}
print(f"✅ ONNX Model - Toplam süre: {onnx_time:.2f}s, FPS: {onnx_fps:.1f}")
# Karşılaştırma
speed_improvement = (pt_time - onnx_time) / pt_time * 100
fps_improvement = (onnx_fps - pt_fps) / pt_fps * 100
print(f"\n📊 PERFORMANS KARŞILAŞTIRMASI:")
print(f" Hız iyileştirmesi: {speed_improvement:+.1f}%")
print(f" FPS iyileştirmesi: {fps_improvement:+.1f}%")
print(f" ONNX {'daha hızlı' if speed_improvement > 0 else 'daha yavaş'}")
except Exception as e:
print(f"❌ ONNX model test hatası: {e}")
results['ONNX'] = None
return results
def compare_accuracy(pt_path, onnx_path, test_image_path=None):
"""
PT ve ONNX modellerinin doğruluğunu karşılaştırır
"""
if test_image_path is None:
# Test image oluştur
test_img = np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8)
print("🖼️ Rastgele test image kullanılıyor")
else:
test_img = cv2.imread(test_image_path)
print(f"🖼️ Test image: {test_image_path}")
print("\n🔍 Doğruluk karşılaştırması yapılıyor...")
# PT Model
model_pt = YOLO(pt_path)
results_pt = model_pt.predict(test_img, verbose=False, conf=0.25)
# ONNX Model
model_onnx = YOLO(onnx_path)
results_onnx = model_onnx.predict(test_img, verbose=False, conf=0.25)
print(f"PT Model detections: {len(results_pt[0].boxes) if results_pt[0].boxes is not None else 0}")
print(f"ONNX Model detections: {len(results_onnx[0].boxes) if results_onnx[0].boxes is not None else 0}")
# Detayları yazdır
def print_detections(results, model_name):
print(f"\n{model_name} Detections:")
if results[0].boxes is not None:
for i, box in enumerate(results[0].boxes):
conf = float(box.conf[0])
cls_id = int(box.cls[0])
class_name = model_pt.names[cls_id] # Class names aynı
print(f" {i+1}. {class_name}: {conf:.4f}")
else:
print(" Hiç detection yok")
print_detections(results_pt, "PT")
print_detections(results_onnx, "ONNX")
if __name__ == "__main__":
# Model yolu
pt_model_path = "best.pt"
print("🚀 YOLO to ONNX Converter")
print("=" * 50)
# 1. ONNX'e çevir
onnx_path = convert_yolo_to_onnx(pt_model_path)
if onnx_path:
# 2. Hız testi
speed_results = test_model_speed(pt_model_path, onnx_path, test_frames=50)
# 3. Doğruluk testi
compare_accuracy(pt_model_path, onnx_path)
print(f"\n✅ Çevirme işlemi tamamlandı!")
print(f"📁 ONNX Model: {onnx_path}")
print(f"🎯 Artık ONNX modelini detection kodunuzda kullanabilirsiniz")
else:
print("❌ ONNX çevirme işlemi başarısız oldu")
3 - Parsing Onnx:
from hailo_sdk_client import ClientRunner
# Define the ONNX model path and configuration
onnx_path = "/home/yusuf/runs/detect/train2/weights/epoch_20/best.onnx" # *** REPLACE WITH YOUR ONNX FILE PATH ***
onnx_model_name = "yolov8m_hailo" # *** REPLACE WITH YOUR DESIRED MODEL NAME ***
chosen_hw_arch = "hailo8" # Specify the target hardware architecture
# Initialize the ClientRunner
runner = ClientRunner(hw_arch=chosen_hw_arch)
# Use the recommended end node names for translation based on your model's structure
end_node_names = [
"/model.22/cv2.0/cv2.0.2/Conv",
"/model.22/cv3.0/cv3.0.2/Conv",
"/model.22/cv2.1/cv2.1.2/Conv",
"/model.22/cv3.1/cv3.1.2/Conv",
"/model.22/cv2.2/cv2.2.2/Conv",
"/model.22/cv3.2/cv3.2.2/Conv",
]
try:
# Translate the ONNX model to Hailo's format
# MODIFICATION: Changed the key from "input" to "images"
hn, npz = runner.translate_onnx_model(
onnx_path,
onnx_model_name,
end_node_names=end_node_names,
net_input_shapes={"images": [1, 3, 640, 640]}, # Adjust input shapes if needed
)
print("Model translation successful.")
except Exception as e:
print(f"Error during model translation: {e}")
raise
# Save the Hailo model HAR file
hailo_model_har_name = f"{onnx_model_name}.har"
try:
runner.save_har(hailo_model_har_name)
print(f"HAR file saved as: {hailo_model_har_name}")
except Exception as e:
print(f"Error saving HAR file: {e}")
4 - Model optimization:
from hailo_sdk_client import ClientRunner
# Load the HAR file
har_path = "/home/yusuf/Downloads/yolov8m_hailo.har"
runner = ClientRunner(har=har_path)
from pprint import pprint
try:
# Access the HailoNet as an OrderedDict
hn_dict = runner.get_hn() # Or use runner._hn if get_hn() is unavailable
print("Inspecting layers from HailoNet (OrderedDict):")
# Pretty-print each layer
for key, value in hn_dict.items():
print(f"Key: {key}")
pprint(value)
print("\n" + "="*80 + "\n") # Add a separator between layers for clarity
except Exception as e:
print(f"Error while inspecting hn_dict: {e}")
5 - Nms Config for yolov8m:
import json
import os
# --- KULLANICI AYARLARI: Bu bölümü kendi projenize göre güncelleyin ---
# Modeliniz kaç sınıf için eğitildi? Bu çok önemli!
# Örnek olarak COCO veri seti için 80 kullanılmıştır.
NUMBER_OF_CLASSES = 1
# JSON dosyasını kaydetmek istediğiniz dizin
# Örneğin, Downloads klasörünüz olabilir.
OUTPUT_DIR = "/home/yusuf/Downloads/"
# --- NMS Yapılandırması: .har dosyanıza göre ayarlandı ---
nms_config = {
# Tespit için minimum güven skoru eşiği
"nms_scores_th": 0.05,
# Çakışan kutuları birleştirmek için IoU (Intersection over Union) eşiği
"nms_iou_th": 0.7,
# Modelin girdi görüntü boyutları [yükseklik, genişlik]
"image_dims": [640, 640],
# Sınıf başına izin verilen maksimum tespit sayısı
"max_proposals_per_class": 100,
# *** BURAYI MUTLAKA GÜNCELLEYİN ***
"classes": NUMBER_OF_CLASSES,
# YOLOv8 DFL başlığı için regresyon uzunluğu (64 kanal / 4)
"regression_length": 16,
"background_removal": False,
"background_removal_index": 0,
# Tespit başlıklarının (decoder) tanımı
"bbox_decoders": [
{
"name": "bbox_decoder_p3", # P3 seviyesi (küçük nesneler)
"stride": 8,
"reg_layer": "conv57", # .har dosyanızdaki regresyon katmanı
"cls_layer": "conv58" # .har dosyanızdaki sınıflandırma katmanı
},
{
"name": "bbox_decoder_p4", # P4 seviyesi (orta nesneler)
"stride": 16,
"reg_layer": "conv70",
"cls_layer": "conv71"
},
{
"name": "bbox_decoder_p5", # P5 seviyesi (büyük nesneler)
"stride": 32,
"reg_layer": "conv82",
"cls_layer": "conv83"
}
]
}
# --- Dosyayı Kaydetme ---
# Çıktı dizininin var olup olmadığını kontrol et, yoksa oluştur
os.makedirs(OUTPUT_DIR, exist_ok=True)
output_path = os.path.join(OUTPUT_DIR, "nms_config.json")
# Yapılandırmayı JSON dosyası olarak kaydet
with open(output_path, "w") as json_file:
json.dump(nms_config, json_file, indent=4)
print(f"NMS yapılandırma dosyası başarıyla şuraya kaydedildi: {output_path}")
6- Calibration dataset from test data ( 1024 images):
import numpy as np
from PIL import Image
import os
# --- KULLANICI AYARLARI ---
# Kalibrasyon görsellerinizin bulunduğu dizin
IMAGE_DIR = "/home/yusuf/calibration_dataset"
# Çıktı .npy dosyasının kaydedileceği dizin ve dosya adı
OUTPUT_PATH = "/home/yusuf/Downloads/yolov8m_calibration_data.npy"
# Modelinizin beklediği girdi boyutları
IMAGE_WIDTH = 640
IMAGE_HEIGHT = 640
# --- VERİ İŞLEME ---
# İşlenmiş görüntü dizilerini depolamak için boş bir liste
processed_images = []
print(f"'{IMAGE_DIR}' dizinindeki görseller işleniyor...")
# Dizindeki tüm dosyaları listele
image_files = sorted(os.listdir(IMAGE_DIR)) # Sıralı okumak iyi bir alışkanlıktır
# Her bir görüntü dosyasını işle
for i, img_name in enumerate(image_files):
img_path = os.path.join(IMAGE_DIR, img_name)
# Sadece geçerli görüntü dosyalarını işle (.jpg, .png, vb.)
if img_name.lower().endswith(('.jpg', '.jpeg', '.png', '.bmp')):
try:
# Görüntüyü aç ve RGB formatına dönüştür (siyah-beyaz olma ihtimaline karşı)
img = Image.open(img_path).convert("RGB")
# Görüntüyü yeniden boyutlandır
img_resized = img.resize((IMAGE_WIDTH, IMAGE_HEIGHT))
# Görüntüyü NumPy dizisine dönüştür (değerler [0, 255] aralığında kalır)
img_array = np.array(img_resized, dtype=np.uint8)
# İşlenmiş diziyi listeye ekle
processed_images.append(img_array)
# İlerlemeyi göstermek için her 100 görselde bir mesaj yazdır
if (i + 1) % 100 == 0:
print(f"{i + 1} / {len(image_files)} görsel işlendi.")
except Exception as e:
print(f"'{img_name}' dosyası işlenirken bir hata oluştu: {e}")
# Python listesini tek bir büyük NumPy dizisine dönüştür
# Sonuçta ortaya çıkan şekil: (görüntü_sayısı, yükseklik, genişlik, kanallar)
# Örnek: (1024, 640, 640, 3)
calibration_data = np.array(processed_images)
# Oluşturulan NumPy dizisini .npy dosyası olarak kaydet
np.save(OUTPUT_PATH, calibration_data)
print("\nİşlem tamamlandı!")
print(f"Kalibrasyon veri seti '{OUTPUT_PATH}' konumuna kaydedildi.")
print(f"Kaydedilen verinin şekli (shape): {calibration_data.shape}")
print(f"Veri tipi (dtype): {calibration_data.dtype}")
7 - Final .har file creation step:
import os
from hailo_sdk_client import ClientRunner
# --- 1. Adım: Dosya Yollarını ve Model Adını Tanımlama ---
# İlk derlemeden elde edilen, nicemlenmemiş HAR dosyasının yolu
unquantized_har_path = "/home/yusuf/Downloads/yolov8m_hailo.har"
# Bir önceki adımda oluşturduğunuz kalibrasyon verisinin yolu
calibration_data_path = "/home/yusuf/Downloads/yolov8m_calibration_data.npy"
# NMS yapılandırma dosyasının yolu
nms_config_path = "/home/yusuf/Downloads/nms_config.json"
# Oluşturulacak nihai, optimize edilmiş HAR dosyasının yolu
quantized_har_path = "/home/yusuf/Downloads/yolov8m_quantized.har"
# --- Dosyaların var olup olmadığını kontrol edelim ---
assert os.path.isfile(unquantized_har_path), f"HAR dosyası bulunamadı: {unquantized_har_path}"
assert os.path.isfile(calibration_data_path), f"Kalibrasyon dosyası bulunamadı: {calibration_data_path}"
assert os.path.isfile(nms_config_path), f"NMS yapılandırma dosyası bulunamadı: {nms_config_path}"
# --- 2. Adım: Model Optimizasyon Betiğini (.alls) Oluşturma ---
# Bu betik, derleyiciye ne yapacağını söyleyen komutları içerir.
alls_script = f"""
# Girdi verisini [0, 255] aralığından [0, 1] aralığına normalleştir
normalization1 = normalization([0.0, 0.0, 0.0], [255.0, 255.0, 255.0])
# Kalibrasyon ayarlarını yap
model_optimization_config(calibration, batch_size=2)
# Sınıflandırma katmanlarının çıkış aktivasyonunu sigmoid olarak değiştir
change_output_activation(conv58, sigmoid)
change_output_activation(conv71, sigmoid)
change_output_activation(conv83, sigmoid)
# Nicemleme sonrası doğruluğu artırmak için ince ayar yap
post_quantization_optimization(finetune, policy=enabled, learning_rate=0.000025)
# NMS işlemini modelin sonuna ekle
nms_postprocess("{nms_config_path}", meta_arch=yolov8, engine=cpu)
"""
print("--- Optimizasyon Betiği ---")
print(alls_script)
print("--------------------------\n")
# --- 3. Adım: Modeli Optimize Etme ve Kaydetme ---
try:
# ClientRunner'ı nicemlenmemiş HAR dosyası ile başlat
runner = ClientRunner(har=unquantized_har_path)
# Optimizasyon betiğini yükle
runner.load_model_script(alls_script)
print("Model optimizasyonu ve kalibrasyonu başlıyor...")
print(f"Kalibrasyon verisi kullanılıyor: {calibration_data_path}")
# DÜZELTME: runner.build() yerine tutorial'da belirtilen runner.optimize() kullanılıyor.
# Bu fonksiyon kalibrasyon, niceleme ve betikteki diğer tüm adımları çalıştırır.
runner.optimize(calibration_data_path)
# Nihai, optimize edilmiş ve nicemlenmiş modeli kaydet
runner.save_har(quantized_har_path)
print("\nİşlem başarıyla tamamlandı!")
print(f"Optimize edilmiş HAR dosyası şuraya kaydedildi: {quantized_har_path}")
except Exception as e:
print(f"\nOptimizasyon sırasında bir hata oluştu: {e}")
raise
8- Final .hef file conversion:
import os
from hailo_sdk_client import ClientRunner
# --- 1. Adım: Dosya Yollarını Tanımlama ---
# Bir önceki adımda oluşturulan, nicemlenmiş ve optimize edilmiş HAR dosyasının yolu
quantized_har_path = "/home/yusuf/Downloads/yolov8m_quantized.har"
# Oluşturulacak nihai HEF dosyasının yolu
output_hef_path = "/home/yusuf/Downloads/yolov8m.hef"
# Girdi dosyasının var olup olmadığını kontrol et
assert os.path.isfile(quantized_har_path), f"Girdi dosyası bulunamadı: {quantized_har_path}"
# --- 2. Adım: Modeli HEF Formatına Derleme ---
print(f"'{quantized_har_path}' dosyası HEF formatına derleniyor...")
try:
# ClientRunner'ı nicemlenmiş HAR dosyası ile başlat
runner = ClientRunner(har=quantized_har_path)
print("[BILGI] ClientRunner başarıyla başlatıldı.")
# Modeli derle. Bu işlem, HAR dosyasını donanımda çalışacak
# son bir HEF dosyasına dönüştürür.
hef_binary = runner.compile()
print("[BILGI] Model başarıyla derlendi.")
# Derlenmiş HEF'i ikili (binary) olarak dosyaya yaz
with open(output_hef_path, "wb") as f:
f.write(hef_binary)
print(f"\nİşlem tamamlandı! HEF dosyası şuraya kaydedildi: {output_hef_path}")
except Exception as e:
print(f"\n[HATA] Model derlenirken bir hata oluştu: {e}")
raise
9- Using picamera2 example script:
from concurrent.futures import Future
from functools import partial
import numpy as np
from hailo_platform import HEF, FormatType, HailoSchedulingAlgorithm, VDevice
class Hailo:
TARGET = None
TARGET_REF_COUNT = 0
def __init__(self, hef_path, batch_size=None, output_type='FLOAT32'):
"""
Initialize the HailoAsyncInference class with the provided HEF model file path.
Args:
hef_path (str): Path to the HEF model file.
batch_size (int): Batch size for inference.
output_type (str): Format type of the output stream.
"""
params = VDevice.create_params()
params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN
self.batch_size = batch_size
self.hef = HEF(hef_path)
if Hailo.TARGET is None:
Hailo.TARGET = VDevice(params)
Hailo.TARGET_REF_COUNT += 1
self.target = Hailo.TARGET
self.infer_model = self.target.create_infer_model(hef_path)
self.infer_model.set_batch_size(1 if batch_size is None else batch_size)
self._set_input_output(output_type)
self.input_vstream_info, self.output_vstream_info = self._get_vstream_info()
self.configured_infer_model = self.infer_model.configure()
def __enter__(self):
"""Used for allowing use with context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_traceback):
"""Used for allowing use with context manager."""
self.close()
def _set_input_output(self, output_type):
"""
Set the input and output layer information for the HEF model.
Args:
output_type (str): Format type of the output stream.
"""
input_format_type = self.hef.get_input_vstream_infos()[0].format.type
self.infer_model.input().set_format_type(input_format_type)
output_format_type = getattr(FormatType, output_type)
for output in self.infer_model.outputs:
output.set_format_type(output_format_type)
self.num_outputs = len(self.infer_model.outputs)
def callback(self, completion_info, bindings, future, last):
"""
Callback function for handling inference results.
Args:
completion_info: Information about the completion of the inference task.
bindings: Bindings object containing input and output buffers.
"""
if future._has_had_error:
# Don't really know if this can happen.
return
elif completion_info.exception:
future._has_had_error = True
future.set_exception(completion_info.exception)
else:
if self.num_outputs <= 1:
# Only one output. Return the output directly.
if self.batch_size is None:
# No batching. Return this single output on its own.
future._intermediate_result = bindings.output().get_buffer()
else:
# Return a list containing an output for each item in the batch.
future._intermediate_result.append(bindings.output().get_buffer())
else:
# Multiple outputs. Return a dictionary of outputs keyed on the layer name.
if self.batch_size is None:
# No batching. Use a single output as the value for each key.
for name in bindings._output_names:
future._intermediate_result[name] = bindings.output(name).get_buffer()
else:
# Each key contains a list of outputs, one per item in the batch.
for name in bindings._output_names:
future._intermediate_result[name].append(bindings.output(name).get_buffer())
if last:
future.set_result(future._intermediate_result)
def _get_vstream_info(self):
"""
Get information about input and output stream layers.
Returns:
tuple: List of input stream layer information, List of output stream layer information.
"""
input_vstream_info = self.hef.get_input_vstream_infos()
output_vstream_info = self.hef.get_output_vstream_infos()
return input_vstream_info, output_vstream_info
def get_input_shape(self):
"""
Get the shape of the model's input layer.
Returns:
tuple: Shape of the model's input layer.
"""
return self.input_vstream_info[0].shape # Assumes that the model has one input
def describe(self):
"""
Return information that describes what's in the model.
Returns:
A pair of lists containing, respectively, information about the input and output layers.
"""
inputs = [(layer.name, layer.shape, layer.format.type) for layer in self.hef.get_input_vstream_infos()]
outputs = [(layer.name, layer.shape, layer.format.type) for layer in self.hef.get_output_vstream_infos()]
return inputs, outputs
def run_async(self, input_data):
"""
Run asynchronous inference on the Hailo-8 device.
Args:
input_data (np.ndarray): Input data for inference.
Returns:
future: Future to wait on for the inference results.
"""
if self.batch_size is None:
input_data = np.expand_dims(input_data, axis=0)
future = Future()
future._has_had_error = False
if self.num_outputs <= 1:
future._intermediate_result = []
else:
future._intermediate_result = {output.name: [] for output in self.infer_model.outputs}
for i, frame in enumerate(input_data):
last = i == len(input_data) - 1
bindings = self._create_bindings()
bindings.input().set_buffer(frame)
self.configured_infer_model.wait_for_async_ready(timeout_ms=10000)
self.configured_infer_model.run_async([bindings],
partial(self.callback, bindings=bindings, future=future, last=last))
return future
def run(self, input_data):
"""
Run asynchronous inference on the Hailo-8 device.
Args:
input_data (np.ndarray): Input data for inference.
Returns:
inference output or list: Inference output or List of inference outputs if batch_size is not None.
"""
future = self.run_async(input_data)
return future.result()
def _create_bindings(self):
"""
Create bindings for input and output buffers.
Returns:
bindings: Bindings object with input and output buffers.
"""
output_buffers = {name: np.empty(self.infer_model.output(name).shape, dtype=np.float32)
for name in self.infer_model.output_names}
return self.configured_infer_model.create_bindings(output_buffers=output_buffers)
def close(self):
"""Release the Hailo device."""
del self.configured_infer_model
Hailo.TARGET_REF_COUNT -= 1
if Hailo.TARGET_REF_COUNT == 0:
self.target.release()
HailoRT-CLI version 4.22.0
Python 3.12.3
Raspberry Pi 5 - Ubuntu 24.04
Hailo 8 26 TOPS
Model converted on my personal laptop and moved to raspberry pi. I also tried dfc tool conversion. It give same result.
Can some one explain why accuracy deacreasing. Actually the problem is not accuracy, its not make any detection. Normally we was expect to see too much false positive if we set threshold 0.01. But even this state, its making just a few detection (mostly true detection).