Hi,
I want to use my own pipeline from my cameras (intel realsense D435) to get the depth measurement.
This is done with pyrealsense that is a pipeline for depth format in space.
Thus, I tried this code and many others but I can’t compile because I have the following error :
**for detection in result.predictions: # Retrieve detected objects**
** ^^^^^^^^^^^^^^^^^^**
**AttributeError: 'DetectionResults' object has no attribute 'predictions'**
Here is my code :
import degirum as dg
import degirum_tools
import degirum_tools.streams as dgstreams
import pyrealsense2 as rs
import numpy as np
import cv2
# Serial numbers of D435 cameras
serial_numbers = ['317422075525', '335622071790']
inference_host_address = "@local"
zoo_url = 'degirum/hailo'
token = ''
device_type = ['HAILORT/HAILO8']
# AI model configurations
configurations = [
{
"model_name": "yolov8n_relu6_coco_pose--640x640_quant_hailort_hailo8_1",
"display_name": "Traffic Camera",
},
{
"model_name": "yolov8n_relu6_face--640x640_quant_hailort_hailo8_1",
"display_name": "Webcam Feed",
},
]
# Wrapper class for RealSense with a cv2.VideoCapture compatible interface
class RealSenseWrapper:
def __init__(self, serial):
self.pipeline = rs.pipeline()
config = rs.config()
config.enable_device(serial)
config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 30)
self.pipeline.start(config)
def read(self):
frames = self.pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
if not color_frame:
return False, None
img = np.asanyarray(color_frame.get_data())
return True, img # Simulates cv2.VideoCapture().read()
def release(self):
self.pipeline.stop()
# Load models
models = [
dg.load_model(
model_name=cfg["model_name"],
inference_host_address=inference_host_address,
zoo_url=zoo_url,
token=token,
device_type=device_type
)
for cfg in configurations
]
# Initialize RealSense cameras
cameras = [RealSenseWrapper(serial) for serial in serial_numbers]
# Manual capture loop (instead of dgstreams.VideoSourceGizmo)
while True:
for i, camera in enumerate(cameras):
ret, frame = camera.read()
if not ret:
print(f"Error: No image for camera {serial_numbers[i]}")
continue
# Apply inference model
result = models[i].predict(frame)
# Draw detections on the image
if result: # Check if result contains detections
print(result._inference_results)
for detection in result.predictions: # Retrieve detected objects
x1, y1, x2, y2 = map(int, detection.bbox) # Convert to integers
label = detection.label
conf = detection.confidence
# Draw the bounding box on the image
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
# Add the label and confidence score
cv2.putText(frame, f"{label} {conf:.2f}", (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
frame_result = frame # Now, frame_result contains the annotated image
# Display results
cv2.imshow(f"Camera {i+1}", frame_result)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
#Release resources
for camera in cameras:
camera.release()
cv2.destroyAllWindows()
I had to delete the following code from your example to be able to use my pipeline :
*# define gizmos
sources = [dgstreams.VideoSourceGizmo(cfg["source"]) for cfg in configurations]
detectors = [dgstreams.AiSimpleGizmo(model) for model in models]
display = dgstreams.VideoDisplayGizmo(
[cfg["display_name"] for cfg in configurations], show_ai_overlay=True, show_fps=True
)
# create pipeline
pipeline = (
(source >> detector for source, detector in zip(sources, detectors)),
(detector >> display[di] for di, detector in enumerate(detectors)),
)
# start composition
dgstreams.Composition(*pipeline).start()*
Do you have any clue ?