I want to create an app for reporting traffic violations.
My code
save as hef_picam_zone_tracker.py
# -*- coding: utf-8 -*-
import os
import time
import cv2
import numpy as np
from collections import Counter
from pathlib import Path
from picamera2 import Picamera2
from libcamera import Transform, controls
from hailo_platform**.pyhailort.**pyhailort import (
VDevice, HEF,
InputVStreams, OutputVStreams,
InputVStreamParams, OutputVStreamParams
)
# ========================= CONFIG =========================
# — RF433 (optional) —
RF433_ENABLED = True
RF433_PORT = “/dev/ttyUSB0”
RF433_CODE = 16380641
RF433_ONESHOT_GLOBAL = True
# — Hailo / Model —
hef_path = “/home/pi/Mae-wang/coco-vehicle.hef”
CONF_THRES = 0.50
ENABLE_CLASS_FILTER = True
SELECTED_CLASSES = {0, 1, 2, 3, 4}
# — Camera / Display —
FRAME_W, FRAME_H = 1600, 900
DISPLAY_SIZE = (1600, 900)
WINDOW_TITLE = “HEF + PiCam”
# หมุนภาพ (0/90/180/270)
ROTATION_DEG = 180
# โฟกัส: “continuous” | “auto” | “manual”
FOCUS_MODE = “auto”
AF_SPEED = “fast” # “fast” | “normal”
AF_RANGE = “normal” # “normal” | “macro” | “full”
MANUAL_FOCUS_DIOPTERS = 7.0
# — Tracking / Postprocess —
MAX_TRACK_DISTANCE = 100 # px
MIN_BOX_WIDTH = 30 # px
# — Snap output —
SNAP_DIR = “/home/pi/snaps”
# — Zones (polygons) —
in_mae_wang = [[(856,365), (1259,365), (1319,457), (756,457)]]
out_mae_wang = [[(143,601), (1210,900), (0,900), (0,667)]]
soi = [[(198,355), (349,410), (57,533), (8,428)]]
check_u = [[(612,309), (873,309), (805,367), (464,365)]]
ZONES = in_mae_wang + out_mae_wang + soi + check_u # index 1..N
# — Input color handling —
FORCE_INPUT_RGB = True
# — Debug switches —
DEBUG_PRINT_SHAPES = True
DEBUG_PRINT_EVERY_N = 30
# ========================= STATE =========================
_rf433_fired = False
_u_turn_fired_ids = set()
counted_ids = set()
last_zone = {} # {oid: last_zone_id or None}
u_turn_count = 0
enter_city_count = 0
exit_city_count = 0
next_object_id = 0
object_tracks = {} # {id: {“pos”:(cx,cy)}}
# FPS smoothing
_prev_t = time**.**time()
_fps = 0.0
_fps_alpha = 0.2
_frame_idx = 0
# ---------------- helpers: RF433 one-shot ----------------
def fire_rf433_once(reason: str = “U-TURN”, per_id: int | None = None) → bool:
global \_rf433_fired, \_u_turn_fired_ids
if not RF433_ENABLED:
return False
if RF433_ONESHOT_GLOBAL:
if \_rf433_fired:
return False
else:
if per_id is not None and per_id in \_u_turn_fired_ids:
return False
try:
import rf433
rf433**.**send_code(code=RF433_CODE, port=RF433_PORT)
print(f"\[RF433\] sent code={RF433_CODE} via {RF433_PORT} ({reason})")
except Exception as e:
print(f"\[RF433\]\[ERROR\] {e}")
return False
if RF433_ONESHOT_GLOBAL:
\_rf433_fired = True
else:
if per_id is not None:
\_u_turn_fired_ids**.**add(per_id)
return True
# ========================= UTILS =========================
def ensure_dir(path):
os**.**makedirs(path, exist_ok=True)
def draw_text_with_bg(img, text, org,
font=cv2**.**FONT_HERSHEY_SIMPLEX,
font_scale=0.6, text_color=(255,255,255),
thickness=2, bg_color=(0,0,0)):
(w, h), baseline = cv2**.**getTextSize(text, font, font_scale, thickness)
x, y = org
cv2**.**rectangle(img, (x-4, y-h-4), (x+w+4, y+baseline+4), bg_color, -1)
cv2**.**putText(img, text, org, font, font_scale, text_color, thickness)
def point_in_any_zone(xy):
x, y = xy
for i, poly in enumerate(ZONES, start=1):
pts = np**.**array(poly, dtype=np**.**int32)
if len(pts) >= 3:
if cv2**.**pointPolygonTest(pts, (float(x), float(y)), False) >= 0:
return i
return None
def draw_zones(img):
for i, poly in enumerate(ZONES, start=1):
pts = np**.**array(poly, dtype=np**.**int32)
cv2**.**polylines(img, \[pts\], True, (139,0,0), 1)
overlay = img**.**copy()
cv2**.**fillPoly(overlay, \[pts\], (255,144,30))
cv2**.**addWeighted(overlay, 0.12, img, 0.88, 0, img)
cxy = tuple(np**.**mean(pts, axis=0)**.**astype(int))
cv2**.**putText(img, f"ZONE {i}", cxy, cv2**.**FONT_HERSHEY_SIMPLEX, 0.7, (139,0,0), 1)
def clamp_box(x1, y1, x2, y2, W, H):
x1 = max(0, min(W-1, x1))
y1 = max(0, min(H-1, y1))
x2 = max(0, min(W-1, x2))
y2 = max(0, min(H-1, y2))
if x2 <= x1: x2 = min(W-1, x1+1)
if y2 <= y1: y2 = min(H-1, y1+1)
return x1, y1, x2, y2
def apply_focus_controls(picam2):
mode_map = {
"continuous": controls**.**AfModeEnum**.**Continuous,
"auto": controls**.**AfModeEnum**.**Auto,
"manual": controls**.**AfModeEnum**.**Manual,
}
speed_map = {"fast": controls**.**AfSpeedEnum**.**Fast, "normal": controls**.**AfSpeedEnum**.**Normal}
range_map = {"normal": controls**.**AfRangeEnum**.**Normal, "macro": controls**.**AfRangeEnum**.**Macro, "full": controls**.**AfRangeEnum**.**Full}
m = mode_map**.**get(FOCUS_MODE, controls**.**AfModeEnum**.**Continuous)
s = speed_map**.**get(AF_SPEED, controls**.**AfSpeedEnum**.**Fast)
r = range_map**.**get(AF_RANGE, controls**.**AfRangeEnum**.**Normal)
if m == controls**.**AfModeEnum**.**Manual:
picam2**.**set_controls({
"AfMode": controls**.**AfModeEnum**.**Manual,
"LensPosition": float(MANUAL_FOCUS_DIOPTERS),
})
elif m == controls**.**AfModeEnum**.**Auto:
picam2**.**set_controls({
"AfMode": controls**.**AfModeEnum**.**Auto,
"AfSpeed": s,
"AfRange": r,
})
try:
picam2**.**set_controls({"AfTrigger": controls**.**AfTriggerEnum**.**Start})
except Exception:
pass
else:
picam2**.**set_controls({
"AfMode": controls**.**AfModeEnum**.**Continuous,
"AfSpeed": s,
"AfRange": r,
})
# ---------------- Hailo postprocess (robust parsers) ----------------
def parse_hailo_flat(output_array, W, H,
conf_thres=0.5,
min_box_w=10,
enable_cls_filter=False,
selected_cls=None):
flat = np**.**array(output_array)**.**ravel()
dets = \[\]
stride = 0
if flat**.**size % 6 == 0:
stride = 6
elif flat**.**size % 5 == 0:
stride = 5
else:
return dets
for i in range(0, flat**.**size, stride):
if i + stride - 1 >= flat**.**size:
break
if stride == 6:
y_min, x_min, y_max, x_max, conf, cls_id = flat\[i:i+6\]
cls_id = int(cls_id)
else:
y_min, x_min, y_max, x_max, conf = flat\[i:i+5\]
cls_id = None
if conf < conf_thres:
continue
x1 = int(float(x_min) \* W)
x2 = int(float(x_max) \* W)
y1 = int(float(y_min) \* H)
y2 = int(float(y_max) \* H)
x1, y1, x2, y2 = clamp_box(x1, y1, x2, y2, W, H)
if (x2 - x1) < min_box_w:
continue
if enable_cls_filter and (cls_id is not None) and selected_cls:
if cls_id not in selected_cls:
continue
dets**.**append({
"bbox": (x1, y1, x2, y2),
"conf": float(conf),
"cls": cls_id
})
return dets
def parse_hailo_multitensor(outputs, W, H,
conf_thres=0.5,
min_box_w=10,
enable_cls_filter=False,
selected_cls=None):
boxes = None
scores = None
for t in outputs:
arr = np**.**array(t)
if arr**.**ndim == 2 and arr**.**shape\[1\] == 4:
boxes = arr
elif arr**.**ndim == 2 and arr**.**shape\[0\] == arr**.**shape\[0\]:
if arr**.**shape\[1\] > 4:
scores = arr
dets = \[\]
if boxes is None or scores is None:
return dets
N = min(boxes**.**shape\[0\], scores**.**shape\[0\])
for i in range(N):
b = boxes\[i\]
sc = scores\[i\]
cls_id = int(np**.**argmax(sc))
conf = float(np**.**max(sc))
if conf < conf_thres:
continue
y_min, x_min, y_max, x_max = \[float(v) for v in b\]
x1 = int(x_min \* W)
x2 = int(x_max \* W)
y1 = int(y_min \* H)
y2 = int(y_max \* H)
x1, y1, x2, y2 = clamp_box(x1, y1, x2, y2, W, H)
if (x2 - x1) < min_box_w:
continue
if enable_cls_filter and selected_cls:
if cls_id not in selected_cls:
continue
dets**.**append({
"bbox": (x1, y1, x2, y2),
"conf": conf,
"cls": cls_id
})
return dets
def parse_hailo_output_general(output_data, W, H,
conf_thres=0.5,
min_box_w=10,
enable_cls_filter=False,
selected_cls=None):
outputs = \[\]
if isinstance(output_data, (list, tuple)):
outputs = \[np**.**array(o) for o in output_data\]
else:
outputs = \[np**.**array(output_data)\]
if len(outputs) == 1:
dets = parse_hailo_flat(outputs\[0\], W, H,
conf_thres, min_box_w,
enable_cls_filter, selected_cls)
if len(dets) > 0:
return dets
dets = parse_hailo_multitensor(outputs, W, H,
conf_thres, min_box_w,
enable_cls_filter, selected_cls)
return dets
# ---------------- Simple nearest-neighbor tracker ----------------
def track_objects(detections):
global object_tracks, next_object_id
new_tracks = {}
for det in detections:
x1, y1, x2, y2 = det\["bbox"\]
cx, cy = (x1 + x2)//2, (y1 + y2)//2
matched_id = None
for oid, data in object_tracks**.**items():
px, py = data\["pos"\]
if (cx - px)\*\*2 + (cy - py)\*\*2 < (MAX_TRACK_DISTANCE\*\*2):
matched_id = oid
break
if matched_id is None:
matched_id = next_object_id
next_object_id += 1
new_tracks\[matched_id\] = {"pos": (cx, cy)}
object_tracks = new_tracks
return new_tracks
# ========================= MAIN LOOP =========================
def main():
global u_turn_count, enter_city_count, exit_city_count
global last_zone, \_prev_t, \_fps, \_frame_idx
ensure_dir(SNAP_DIR)
\# *---- Camera init ----*
picam2 = Picamera2()
cfg = picam2**.**create_preview_configuration(
main={"size": (FRAME_W, FRAME_H), "format": "RGB888"},
transform=Transform(rotation=ROTATION_DEG)
)
picam2**.**configure(cfg)
picam2**.**start()
apply_focus_controls(picam2)
\# *---- Hailo init ----*
hef = HEF(hef_path)
with VDevice() as device:
network_group = device**.**configure(hef)\[0\]
input_info_list = hef**.**get_input_vstream_infos()
output_info_list = hef**.**get_output_vstream_infos()
\# *NHWC*
in_h, in_w, in_c = tuple(input_info_list\[0\]**.**shape)
input_params = InputVStreamParams**.**make(network_group)
output_params = OutputVStreamParams**.**make(network_group)
with network_group**.**activate(network_group**.**create_params()):
with InputVStreams(network_group, input_params) as input_streams, \\
OutputVStreams(network_group, output_params) as output_streams:
input_stream = next(iter(input_streams))
\# *output_streams*
output_stream_list = list(output_streams)
cv2**.**namedWindow(WINDOW_TITLE, cv2**.**WINDOW_NORMAL)
cv2**.**resizeWindow(WINDOW_TITLE, DISPLAY_SIZE\[0\], DISPLAY_SIZE\[1\])
print("🚀 Running HEF + PiCam ... press 'q' to quit.")
if DEBUG_PRINT_SHAPES:
print("\[DEBUG\] input shape (H,W,C):", (in_h, in_w, in_c))
print("\[DEBUG\] outputs:", len(output_info_list), "streams")
while True:
\_frame_idx += 1
frame_bgr = picam2**.**capture_array() # *BGR888*
\# *--- Preprocess → Hailo ---*
if FORCE_INPUT_RGB:
inp = cv2**.**cvtColor(frame_bgr, cv2**.**COLOR_BGR2RGB)
else:
inp = frame_bgr**.**copy()
rgb_resized = cv2**.**resize(inp, (in_w, in_h), interpolation=cv2**.**INTER_LINEAR)
input_data = np**.**ascontiguousarray(rgb_resized)**.**ravel()\[np**.**newaxis, :\]
\# *--- Inference ---*
input_stream**.**send(input_data)
outputs = \[\]
for os\_ in output_stream_list:
outputs**.**append(np**.**copy(os\_**.**recv()\[0\]))
if DEBUG_PRINT_SHAPES and \_frame_idx == 1:
try:
for idx, o in enumerate(outputs):
arr = np**.**array(o)
print(f"\[DEBUG\] out\[{idx}\] shape:", arr**.**shape, "dtype:", arr**.**dtype)
flat = arr**.**ravel()
print(f"\[DEBUG\] out\[{idx}\] head:", flat\[:18\])
except Exception as e:
print("\[DEBUG\] shape print error:", e)
\# *--- Postprocess (ทั่วไป) ---*
dets = parse_hailo_output_general(
outputs, FRAME_W, FRAME_H,
conf_thres=CONF_THRES,
min_box_w=MIN_BOX_WIDTH,
enable_cls_filter=ENABLE_CLASS_FILTER,
selected_cls=SELECTED_CLASSES if ENABLE_CLASS_FILTER else None
)
\# *--- Debug class histogram ---*
if DEBUG_PRINT_EVERY_N and (\_frame_idx % DEBUG_PRINT_EVERY_N == 0):
hist = Counter(\[d\["cls"\] for d in dets if d**.**get("cls") is not None\])
if hist:
print(f"\[DEBUG\] class histogram @frame {\_frame_idx}:", dict(hist))
else:
print(f"\[DEBUG\] no class ids parsed @frame {\_frame_idx} "
f"(parser may be on 5-vals or scores-only)")
tracks = track_objects(dets)
\# *--- Draw zones first ---*
frame = frame_bgr**.**copy()
draw_zones(frame)
\# *--- Zone logic + draw ---*
for det, (oid, data) in zip(dets, tracks**.**items()):
x1, y1, x2, y2 = det\["bbox"\]
conf = det\["conf"\]
cls = det\["cls"\]
cx, cy = data\["pos"\]
zone_id = point_in_any_zone((cx, cy))
if oid not in last_zone:
last_zone\[oid\] = None
prev_zone = last_zone\[oid\]
if zone_id is not None and zone_id != prev_zone:
pair = (prev_zone, zone_id)
if pair == (1, 4):
u_turn_count += 1
print(f"\[U-TURN\] ID {oid}: 1 -> 4 | count = {u_turn_count}")
ts = time**.**strftime('%Y%m%d\_%H%M%S')
snap_name = os**.**path**.**join(SNAP_DIR, f"u14_full_ID{oid}\_{ts}.jpg")
cv2**.**imwrite(snap_name, frame_bgr)
print(f"\[SNAP\] saved: {snap_name}")
fire_rf433_once(reason=f"U-TURN ID {oid}", per_id=oid)
elif pair == (1, 3):
print(f"\[INFO\] ID {oid}: 1 -> 3 ")
elif pair == (2, 4):
exit_city_count += 1
print(f"\[INFO\] ID {oid}: 2 -> 4 | exit={exit_city_count}")
elif pair == (3, 4):
exit_city_count += 1
print(f"\[INFO\] ID {oid}: 3 -> 4 | exit={exit_city_count}")
elif pair == (3, 2):
enter_city_count += 1
print(f"\[INFO\] ID {oid}: 3 -> 2 | enter={enter_city_count}")
elif pair == (1, 2):
enter_city_count += 1
print(f"\[INFO\] ID {oid}: 1 -> 2 | enter={enter_city_count}")
last_zone\[oid\] = zone_id
color = (0,255,0) if zone_id else (180,180,180)
cv2**.**rectangle(frame, (x1,y1), (x2,y2), color, 2)
cv2**.**circle(frame, (cx,cy), 4, color, -1)
name = str(cls) if cls is not None else "obj"
label = f"ID {oid} | {name} {conf:.2f}" + (f" | Z{zone_id}" if zone_id else "")
cv2**.**putText(frame, label, (x1+3, y1-6), cv2**.**FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 1)
\# *--- HUD ---*
hud_x, hud_y = 10, 20
draw_text_with_bg(frame, f"U-TURN: {u_turn_count}", (hud_x, hud_y))
draw_text_with_bg(frame, f"ENTER CITY: {enter_city_count}", (hud_x, hud_y+28))
draw_text_with_bg(frame, f"EXIT CITY: {exit_city_count}", (hud_x, hud_y+56))
\# *--- FPS ---*
now = time**.**time()
inst = 1.0 / max(now - \_prev_t, 1e-6)
\_fps = \_fps_alpha \* inst + (1.0 - \_fps_alpha) \* \_fps
\_prev_t = now
draw_text_with_bg(frame, f"FPS: {\_fps:.2f}", (FRAME_W-160, 20))
\# *--- Show ---*
display_frame = cv2**.**resize(frame, DISPLAY_SIZE, interpolation=cv2**.**INTER_LINEAR)
cv2**.**imshow(WINDOW_TITLE, display_frame)
key = cv2**.**waitKey(1) & 0xFF
if key == ord('q'):
break
if key == ord('f') and FOCUS_MODE == "auto":
try:
picam2**.**set_controls({"AfTrigger": controls**.**AfTriggerEnum**.**Start})
except Exception:
pass
cv2**.**destroyAllWindows()
picam2**.**stop()
if _name_ == “_main_”:
main()