Hi everyone,
I’m working on a project that involves using a CCTV camera with the AI Kit Pi 5. I’ve managed to get my camera’s RTSP stream working, so I can view it remotely. However, I’m not sure how to best integrate this into my AI Kit projects.
Hi everyone,
I’m working on a project that involves using a CCTV camera with the AI Kit Pi 5. I’ve managed to get my camera’s RTSP stream working, so I can view it remotely. However, I’m not sure how to best integrate this into my AI Kit projects.
Hey @abhi83166
This is a good place to start.
Thanks,…Made it worked by adding
if self.source_type == “rtsp”:
source_element = (
f"rtspsrc location={self.video_source} name=src_0 ! "
"rtph264depay ! h264parse ! avdec_h264 max-threads=2 ! "
"video/x-raw, format=I420 ! "
To those whom it may help, the program below is just for RTSP
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import sys
import hailo
class HailoRTSPDetection:
def _init_(self):
self.rtsp_url = "rtsp://username:passwrd@192.168.1.**:554/Streaming/Channels/1"
self.network = "yolov6n"
self.batch_size = 1
self.network_width = 640
self.network_height = 640
self.network_format = "RGB"
self.current_path = os.path.dirname(os.path.abspath(_file_))
self.postprocess_dir = os.environ.get('TAPPAS_POST_PROC_DIR', '')
if not self.postprocess_dir:
raise EnvironmentError("TAPPAS_POST_PROC_DIR environment variable is not set.")
self.setup_network()
self.create_pipeline()
def setup_network(self):
new_postprocess_path = os.path.join(self.current_path, '../resources/libyolo_hailortpp_post.so')
self.default_postprocess_so = new_postprocess_path if os.path.exists(new_postprocess_path) else os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')
self.hef_path = os.path.join(self.current_path, '../resources/yolov6n.hef')
self.thresholds_str = (
f"nms-score-threshold=0.3 "
f"nms-iou-threshold=0.45 "
f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
)
def create_pipeline(self):
Gst.init(None)
pipeline_str = (
f"rtspsrc location={self.rtsp_url} name=src_0 ! "
"rtph264depay ! h264parse ! "
"decodebin ! "
"videoscale ! videoconvert ! "
f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height} ! "
f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} ! "
f"hailofilter so-path={self.default_postprocess_so} qos=false ! "
"queue ! hailooverlay ! "
"videoconvert ! fpsdisplaysink video-sink=xvimagesink sync=false name=sink"
)
try:
self.pipeline = Gst.parse_launch(pipeline_str)
except GLib.Error as e:
print(f"Error creating pipeline: {e}")
sys.exit(1)
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect("message", self.on_bus_message)
# Add probe to hailofilter src pad
hailofilter = self.pipeline.get_by_name("hailofilter0")
if hailofilter:
pad = hailofilter.get_static_pad("src")
pad.add_probe(Gst.PadProbeType.BUFFER, self.on_hailofilter_src_probe)
else:
print("Warning: hailofilter element not found in the pipeline")
def on_hailofilter_src_probe(self, pad, info):
buffer = info.get_buffer()
if buffer:
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
print(f"Number of detections: {len(detections)}")
for detection in detections:
label = detection.get_label()
confidence = detection.get_confidence()
bbox = detection.get_bbox()
print(f"Detection: {label}, Confidence: {confidence:.2f}, BBox: {bbox}")
return Gst.PadProbeReturn.OK
def on_bus_message(self, bus, message):
t = message.type
if t == Gst.MessageType.EOS:
print("End-of-stream")
self.stop()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Error: {err.message}")
print(f"Debug info: {debug}")
self.stop()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
print(f"Warning: {err.message}")
print(f"Debug info: {debug}")
elif t == Gst.MessageType.STATE_CHANGED:
if message.src == self.pipeline:
old_state, new_state, pending_state = message.parse_state_changed()
print(f"Pipeline state changed from {Gst.Element.state_get_name(old_state)} to {Gst.Element.state_get_name(new_state)}")
def run(self):
print("Starting Hailo RTSP Detection...")
print(f"RTSP URL: {self.rtsp_url}")
print(f"Network: {self.network}")
print(f"HEF Path: {self.hef_path}")
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Unable to set the pipeline to the playing state")
sys.exit(1)
self.loop = GLib.MainLoop()
try:
self.loop.run()
except KeyboardInterrupt:
pass
finally:
self.stop()
def stop(self):
print("Stopping pipeline...")
self.pipeline.set_state(Gst.State.NULL)
self.loop.quit()
def main():
detection = HailoRTSPDetection()
detection.run()
if _name_ == "_main_":
main()
Hey I have been trying to add rtsp url but I am getting errors.
import gi
gi.require_version(‘Gst’, ‘1.0’)
from gi.repository import Gst, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import hailo
from hailo_rpi_common import (
get_default_parser,
QUEUE,
get_caps_from_pad,
get_numpy_from_buffer,
GStreamerApp,
app_callback_class,
)
class user_app_callback_class(app_callback_class):
def init(self):
super().init()
self.new_variable = 42 # New variable example
def new_function(self): # New function example
return "The meaning of life is: "
def app_callback(pad, info, user_data):
# Get the GstBuffer from the probe info
buffer = info.get_buffer()
# Check if the buffer is valid
if buffer is None:
return Gst.PadProbeReturn.OK
# Using the user_data to count the number of frames
user_data.increment()
string_to_print = f"Frame count: {user_data.get_count()}\n"
# Get the caps from the pad
format, width, height = get_caps_from_pad(pad)
# If the user_data.use_frame is set to True, we can get the video frame from the buffer
frame = None
if user_data.use_frame and format is not None and width is not None and height is not None:
# Get video frame
frame = get_numpy_from_buffer(buffer, format, width, height)
# Get the detections from the buffer
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
# Parse the detections
detection_count = 0
for detection in detections:
label = detection.get_label()
bbox = detection.get_bbox()
confidence = detection.get_confidence()
if label == "person":
string_to_print += f"Detection: {label} {confidence:.2f}\n"
detection_count += 1
if user_data.use_frame:
# Note: using imshow will not work here, as the callback function is not running in the main thread
# Let's print the detection count to the frame
cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# Example of how to use the new_variable and new_function from the user_data
# Let's print the new_variable and the result of the new_function to the frame
cv2.putText(frame, f"{user_data.new_function()} {user_data.new_variable}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# Convert the frame to BGR
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
user_data.set_frame(frame)
print(string_to_print)
return Gst.PadProbeReturn.OK
class GStreamerDetectionApp(GStreamerApp):
def init(self, args, user_data):
# Call the parent class constructor
super().init(args, user_data)
# Additional initialization code can be added here
# Set Hailo parameters these parameters should be set based on the model used
self.rtsp_url = “rtsp://admin3:Auminfotech7893@192.168.100.117:554/cam/realmonitor?channel=1&subtype=0&unicast=true&proto=Onvif”
self.batch_size = 2
self.network_width = 640
self.network_height = 640
self.network_format = “RGB”
nms_score_threshold = 0.3
nms_iou_threshold = 0.45
# Temporary code: new postprocess will be merged to TAPPAS.
# Check if new postprocess so file exists
new_postprocess_path = os.path.join(self.current_path, '../resources/libyolo_hailortpp_post.so')
if os.path.exists(new_postprocess_path):
self.default_postprocess_so = new_postprocess_path
else:
self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')
if args.hef_path is not None:
self.hef_path = args.hef_path
# Set the HEF file path based on the network
elif args.network == "yolov6n":
self.hef_path = os.path.join(self.current_path, '../resources/yolov6n.hef')
elif args.network == "yolov8s":
self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_h8l.hef')
elif args.network == "yolox_s_leaky":
self.hef_path = os.path.join(self.current_path, '../resources/yolox_s_leaky_h8l_mz.hef')
else:
assert False, "Invalid network type"
# User-defined label JSON file
if args.labels_json is not None:
self.labels_config = f' config-path={args.labels_json} '
# Temporary code
if not os.path.exists(new_postprocess_path):
print("New postprocess so file is missing. It is required to support custom labels. Check documentation for more information.")
exit(1)
else:
self.labels_config = ''
self.app_callback = app_callback
self.thresholds_str = (
f"nms-score-threshold={nms_score_threshold} "
f"nms-iou-threshold={nms_iou_threshold} "
f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
)
# Set the process title
setproctitle.setproctitle("Hailo Detection App")
self.create_pipeline()
def get_pipeline_string(self):
if self.source_type == "rpi":
source_element = (
"libcamerasrc name=src_0 auto-focus-mode=2 ! "
f"video/x-raw, format={self.network_format}, width=1536, height=864 ! "
+ QUEUE("queue_src_scale")
+ "videoscale ! "
f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "
)
if self.source_type == "rtsp":
source_element = (
f"rtspsrc location={self.video_source} name=src_0 ! "
"rtph264depay ! h264parse ! avdec_h264 max-threads=2 ! "
"video/x-raw, format=I420 ! "
)
elif self.source_type == "usb":
source_element = (
f"v4l2src device={self.video_source} name=src_0 ! "
"video/x-raw, width=640, height=480, framerate=30/1 ! "
)
else:
source_element = (
f"filesrc location={self.video_source} name=src_0 ! "
+ QUEUE("queue_dec264")
+ " qtdemux ! h264parse ! avdec_h264 max-threads=2 ! "
" video/x-raw, format=I420 ! "
)
source_element += QUEUE("queue_scale")
source_element += "videoscale n-threads=2 ! "
source_element += QUEUE("queue_src_convert")
source_element += "videoconvert n-threads=3 name=src_convert qos=false ! "
source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! "
pipeline_string = (
"hailomuxer name=hmux "
+ source_element
+ "tee name=t ! "
+ QUEUE("bypass_queue", max_size_buffers=20)
+ "hmux.sink_0 "
+ "t. ! "
+ QUEUE("queue_hailonet")
+ "videoconvert n-threads=3 ! "
f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! "
+ QUEUE("queue_hailofilter")
+ f"hailofilter so-path={self.default_postprocess_so} {self.labels_config} qos=false ! "
+ QUEUE("queue_hmuc")
+ "hmux.sink_1 "
+ "hmux. ! "
+ QUEUE("queue_hailo_python")
+ QUEUE("queue_user_callback")
+ "identity name=identity_callback ! "
+ QUEUE("queue_hailooverlay")
+ "hailooverlay ! "
+ QUEUE("queue_videoconvert")
+ "videoconvert n-threads=3 qos=false ! "
+ QUEUE("queue_hailo_display")
+ f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true "
)
print(pipeline_string)
return pipeline_string
if name == “main”:
# Create an instance of the user app callback class
user_data = user_app_callback_class()
parser = get_default_parser()
# Add additional arguments here
parser.add_argument(
“–network”,
default=“yolov6n”,
choices=[‘yolov6n’, ‘yolov8s’, ‘yolox_s_leaky’],
help=“Which Network to use, default is yolov6n”,
)
parser.add_argument(
“–hef-path”,
default=None,
help=“Path to HEF file”,
)
parser.add_argument(
“–labels-json”,
default=None,
help=“Path to costume labels JSON file”,
)
args = parser.parse_args()
app = GStreamerDetectionApp(args, user_data)
app.run()
Error: gst-resource-error-quark: Cannot identify device ‘/dev/video0’. (3), …/sys/v4l2/v4l2_calls.c(608): gst_v4l2_open (): /GstPipeline:pipeline0/GstV4l2Src:src_0:
system error: No such file or directory
Do you know what maybe going wrong?