Here is my code. When I run it, I get an error on the line:
with self.network_group.activate(self.network_group_params):
Does anyone know what the issue might be?
import cv2
import numpy as np
from hailo_platform import (
HEF, Device, VDevice, HailoStreamInterface, InferVStreams, ConfigureParams,
InputVStreamParams, OutputVStreamParams, InputVStreams, OutputVStreams, FormatType
)
class YOLOv8_Inference_HEF:
def __init__(self, hef_path, video_path, labels, input_resolution=(640, 640), confidence_threshold=0.5):
self.hef_path = hef_path
self.video_path = video_path
self.labels = labels
self.input_res_h, self.input_res_w = input_resolution
self.confidence_threshold = confidence_threshold
self.network_group = None
self.network_group_params = None
self.infer_pipeline = None
self.input_vstream_info = None
def setup_device(self):
hef = HEF(self.hef_path)
devices = Device.scan()
if not devices:
raise RuntimeError('No Devices Found')
target = VDevice(device_id=devices)
configure_params = ConfigureParams.create_from_hef(hef, interface=HailoStreamInterface.PCIe)
self.network_group = target.configure(hef, configure_params)[0]
self.network_group_params = self.network_group.create_params()
input_vstreams_params = InputVStreamParams.make_from_network_group(self.network_group, quantized=False,
format_type=FormatType.FLOAT32)
output_vstreams_params = OutputVStreamParams.make_from_network_group(self.network_group, quantized=False,
format_type=FormatType.FLOAT32)
self.infer_pipeline = InferVStreams(self.network_group, input_vstreams_params, output_vstreams_params)
self.input_vstream_info = hef.get_input_vstream_infos()[0]
def run_Inference(self, frame):
resized_frame = cv2.resize(frame, (self.input_res_h, self.input_res_w), interpolation=cv2.INTER_AREA)
input_data = {
self.input_vstream_info.name: np.expand_dims(np.asarray(resized_frame), axis=0).astype(np.float32)}
with self.network_group.activate(self.network_group_params):
inference_results = self.infer_pipeline.infer(input_data)
return inference_results
def annotate_frame(self, frame, inference_results):
original_height, original_width, _ = frame.shape
scale_x, scale_y = original_width / self.input_res_w, original_height / self.input_res_h
result_key = list(inference_results.keys())[0]
detections = inference_results[result_key][0]
objects_positions = {'c': None, '1': None, '2': None}
for idx, class_detections in enumerate(detections):
if class_detections.shape[0] == 0:
continue
class_name = self.labels[idx]
if class_name not in objects_positions:
continue
for detection in class_detections:
confidence = detection[4]
if confidence > self.confidence_threshold:
bbox_scaled = detection[0:4] * self.input_res_w
ymin, xmin, ymax, xmax = bbox_scaled
xmin, xmax = int(xmin * scale_x), int(xmax * scale_x)
ymin, ymax = int(ymin * scale_y), int(ymax * scale_y)
bbox = [xmin, ymin, xmax, ymax]
objects_positions[class_name] = bbox
center_x = int((xmin + xmax) / 2)
center_y = int((ymin + ymax) / 2)
cv2.circle(frame, (center_x, center_y), 20, (0, 255, 255), thickness=-1)
frame = cv2.putText(frame, class_name, (center_x, center_y), cv2.FONT_HERSHEY_SIMPLEX, 1,
(0, 255, 0), 1, cv2.LINE_AA)
if objects_positions['c'] is not None and objects_positions['1'] is not None:
center_line_x, center_line_y = int((objects_positions['c'][0] + objects_positions['c'][2]) / 2), \
int((objects_positions['c'][1] + objects_positions['c'][3]) / 2)
obj1_x, obj1_y = int((objects_positions['1'][0] + objects_positions['1'][2]) / 2), \
int((objects_positions['1'][1] + objects_positions['1'][3]) / 2)
frame = cv2.line(frame, (center_line_x, center_line_y), (obj1_x, obj1_y), (255, 0, 0), 1)
return frame
def process_video(self):
cap = cv2.VideoCapture(self.video_path)
while cap.isOpened():
start_time = cv2.getTickCount()
ret, frame = cap.read()
if not ret:
break
inference_results = self.run_Inference(frame)
frame = self.annotate_frame(frame, inference_results)
end_time = cv2.getTickCount()
time_interval = (end_time - start_time) / cv2.getTickFrequency()
fps = 1.0 / time_interval
print(f'FPS = {fps:.2f}')
cv2.imshow('Inference', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
def main():
labels = ['c', '1', '2']
video_path = '1plus_cut_264.mp4'
hef_path = 'yolov8n.hef'
yolo_inference = YOLOv8_Inference_HEF(hef_path, video_path, labels)
yolo_inference.setup_device()
yolo_inference.process_video()
if __name__ == '__main__':
main()