Using CCTV Camera with AI Kit Pi 5 - Input and RTSP Streaming

Hi everyone,

I’m working on a project that involves using a CCTV camera with the AI Kit Pi 5. I’ve managed to get my camera’s RTSP stream working, so I can view it remotely. However, I’m not sure how to best integrate this into my AI Kit projects.

Hey @abhi83166

This is a good place to start.

1 Like

Thanks,…Made it worked by adding

if self.source_type == “rtsp”:
source_element = (
f"rtspsrc location={self.video_source} name=src_0 ! "
"rtph264depay ! h264parse ! avdec_h264 max-threads=2 ! "
"video/x-raw, format=I420 ! "

To those whom it may help, the program below is just for RTSP

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import sys
import hailo

class HailoRTSPDetection:
    def _init_(self):
        self.rtsp_url = "rtsp://username:[email protected].**:554/Streaming/Channels/1"
        self.network = "yolov6n"
        self.batch_size = 1
        self.network_width = 640
        self.network_height = 640
        self.network_format = "RGB"
        
        self.current_path = os.path.dirname(os.path.abspath(_file_))
        self.postprocess_dir = os.environ.get('TAPPAS_POST_PROC_DIR', '')
        if not self.postprocess_dir:
            raise EnvironmentError("TAPPAS_POST_PROC_DIR environment variable is not set.")
        
        self.setup_network()
        self.create_pipeline()

    def setup_network(self):
        new_postprocess_path = os.path.join(self.current_path, '../resources/libyolo_hailortpp_post.so')
        self.default_postprocess_so = new_postprocess_path if os.path.exists(new_postprocess_path) else os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')

        self.hef_path = os.path.join(self.current_path, '../resources/yolov6n.hef')

        self.thresholds_str = (
            f"nms-score-threshold=0.3 "
            f"nms-iou-threshold=0.45 "
            f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
        )

    def create_pipeline(self):
        Gst.init(None)
        pipeline_str = (
            f"rtspsrc location={self.rtsp_url} name=src_0 ! "
            "rtph264depay ! h264parse ! "
            "decodebin ! "
            "videoscale ! videoconvert ! "
            f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height} ! "
            f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} ! "
            f"hailofilter so-path={self.default_postprocess_so} qos=false ! "
            "queue ! hailooverlay ! "
            "videoconvert ! fpsdisplaysink video-sink=xvimagesink sync=false name=sink"
        )
        try:
            self.pipeline = Gst.parse_launch(pipeline_str)
        except GLib.Error as e:
            print(f"Error creating pipeline: {e}")
            sys.exit(1)

        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect("message", self.on_bus_message)

        # Add probe to hailofilter src pad
        hailofilter = self.pipeline.get_by_name("hailofilter0")
        if hailofilter:
            pad = hailofilter.get_static_pad("src")
            pad.add_probe(Gst.PadProbeType.BUFFER, self.on_hailofilter_src_probe)
        else:
            print("Warning: hailofilter element not found in the pipeline")

    def on_hailofilter_src_probe(self, pad, info):
        buffer = info.get_buffer()
        if buffer:
            roi = hailo.get_roi_from_buffer(buffer)
            detections = roi.get_objects_typed(hailo.HAILO_DETECTION)
            print(f"Number of detections: {len(detections)}")
            for detection in detections:
                label = detection.get_label()
                confidence = detection.get_confidence()
                bbox = detection.get_bbox()
                print(f"Detection: {label}, Confidence: {confidence:.2f}, BBox: {bbox}")
        return Gst.PadProbeReturn.OK

    def on_bus_message(self, bus, message):
        t = message.type
        if t == Gst.MessageType.EOS:
            print("End-of-stream")
            self.stop()
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            print(f"Error: {err.message}")
            print(f"Debug info: {debug}")
            self.stop()
        elif t == Gst.MessageType.WARNING:
            err, debug = message.parse_warning()
            print(f"Warning: {err.message}")
            print(f"Debug info: {debug}")
        elif t == Gst.MessageType.STATE_CHANGED:
            if message.src == self.pipeline:
                old_state, new_state, pending_state = message.parse_state_changed()
                print(f"Pipeline state changed from {Gst.Element.state_get_name(old_state)} to {Gst.Element.state_get_name(new_state)}")

    def run(self):
        print("Starting Hailo RTSP Detection...")
        print(f"RTSP URL: {self.rtsp_url}")
        print(f"Network: {self.network}")
        print(f"HEF Path: {self.hef_path}")
        
        ret = self.pipeline.set_state(Gst.State.PLAYING)
        if ret == Gst.StateChangeReturn.FAILURE:
            print("Unable to set the pipeline to the playing state")
            sys.exit(1)

        self.loop = GLib.MainLoop()
        try:
            self.loop.run()
        except KeyboardInterrupt:
            pass
        finally:
            self.stop()

    def stop(self):
        print("Stopping pipeline...")
        self.pipeline.set_state(Gst.State.NULL)
        self.loop.quit()

def main():
    detection = HailoRTSPDetection()
    detection.run()

if _name_ == "_main_":
    main()
1 Like

Hey I have been trying to add rtsp url but I am getting errors.

import gi
gi.require_version(‘Gst’, ‘1.0’)
from gi.repository import Gst, GLib
import os
import argparse
import multiprocessing
import numpy as np
import setproctitle
import cv2
import time
import hailo
from hailo_rpi_common import (
get_default_parser,
QUEUE,
get_caps_from_pad,
get_numpy_from_buffer,
GStreamerApp,
app_callback_class,
)

-----------------------------------------------------------------------------------------------

User-defined class to be used in the callback function

-----------------------------------------------------------------------------------------------

Inheritance from the app_callback_class

class user_app_callback_class(app_callback_class):
def init(self):
super().init()
self.new_variable = 42 # New variable example

def new_function(self):  # New function example
    return "The meaning of life is: "

-----------------------------------------------------------------------------------------------

User-defined callback function

-----------------------------------------------------------------------------------------------

This is the callback function that will be called when data is available from the pipeline

def app_callback(pad, info, user_data):
# Get the GstBuffer from the probe info
buffer = info.get_buffer()
# Check if the buffer is valid
if buffer is None:
return Gst.PadProbeReturn.OK

# Using the user_data to count the number of frames
user_data.increment()
string_to_print = f"Frame count: {user_data.get_count()}\n"

# Get the caps from the pad
format, width, height = get_caps_from_pad(pad)

# If the user_data.use_frame is set to True, we can get the video frame from the buffer
frame = None
if user_data.use_frame and format is not None and width is not None and height is not None:
    # Get video frame
    frame = get_numpy_from_buffer(buffer, format, width, height)

# Get the detections from the buffer
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

# Parse the detections
detection_count = 0
for detection in detections:
    label = detection.get_label()
    bbox = detection.get_bbox()
    confidence = detection.get_confidence()
    if label == "person":
        string_to_print += f"Detection: {label} {confidence:.2f}\n"
        detection_count += 1
if user_data.use_frame:
    # Note: using imshow will not work here, as the callback function is not running in the main thread
    # Let's print the detection count to the frame
    cv2.putText(frame, f"Detections: {detection_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    # Example of how to use the new_variable and new_function from the user_data
    # Let's print the new_variable and the result of the new_function to the frame
    cv2.putText(frame, f"{user_data.new_function()} {user_data.new_variable}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    # Convert the frame to BGR
    frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
    user_data.set_frame(frame)

print(string_to_print)
return Gst.PadProbeReturn.OK

-----------------------------------------------------------------------------------------------

User Gstreamer Application

-----------------------------------------------------------------------------------------------

This class inherits from the hailo_rpi_common.GStreamerApp class

class GStreamerDetectionApp(GStreamerApp):
def init(self, args, user_data):
# Call the parent class constructor
super().init(args, user_data)
# Additional initialization code can be added here
# Set Hailo parameters these parameters should be set based on the model used
self.rtsp_url = “rtsp://admin3:[email protected]:554/cam/realmonitor?channel=1&subtype=0&unicast=true&proto=Onvif”
self.batch_size = 2
self.network_width = 640
self.network_height = 640
self.network_format = “RGB”
nms_score_threshold = 0.3
nms_iou_threshold = 0.45

    # Temporary code: new postprocess will be merged to TAPPAS.
    # Check if new postprocess so file exists
    new_postprocess_path = os.path.join(self.current_path, '../resources/libyolo_hailortpp_post.so')
    if os.path.exists(new_postprocess_path):
        self.default_postprocess_so = new_postprocess_path
    else:
        self.default_postprocess_so = os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')

    if args.hef_path is not None:
        self.hef_path = args.hef_path
    # Set the HEF file path based on the network
    elif args.network == "yolov6n":
        self.hef_path = os.path.join(self.current_path, '../resources/yolov6n.hef')
    elif args.network == "yolov8s":
        self.hef_path = os.path.join(self.current_path, '../resources/yolov8s_h8l.hef')
    elif args.network == "yolox_s_leaky":
        self.hef_path = os.path.join(self.current_path, '../resources/yolox_s_leaky_h8l_mz.hef')
    else:
        assert False, "Invalid network type"

    # User-defined label JSON file
    if args.labels_json is not None:
        self.labels_config = f' config-path={args.labels_json} '
        # Temporary code
        if not os.path.exists(new_postprocess_path):
            print("New postprocess so file is missing. It is required to support custom labels. Check documentation for more information.")
            exit(1)
    else:
        self.labels_config = ''

    self.app_callback = app_callback

    self.thresholds_str = (
        f"nms-score-threshold={nms_score_threshold} "
        f"nms-iou-threshold={nms_iou_threshold} "
        f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
    )

    # Set the process title
    setproctitle.setproctitle("Hailo Detection App")

    self.create_pipeline()

def get_pipeline_string(self):
    if self.source_type == "rpi":
        source_element = (
            "libcamerasrc name=src_0 auto-focus-mode=2 ! "
            f"video/x-raw, format={self.network_format}, width=1536, height=864 ! "
            + QUEUE("queue_src_scale")
            + "videoscale ! "
           f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, framerate=30/1 ! "
        )
    if self.source_type == "rtsp":
        source_element = (
        f"rtspsrc location={self.video_source} name=src_0 ! "
        "rtph264depay ! h264parse ! avdec_h264 max-threads=2 ! "
        "video/x-raw, format=I420 ! "
        )
    elif self.source_type == "usb":
        source_element = (
            f"v4l2src device={self.video_source} name=src_0 ! "
            "video/x-raw, width=640, height=480, framerate=30/1 ! "
        )
    else:
        source_element = (
            f"filesrc location={self.video_source} name=src_0 ! "
            + QUEUE("queue_dec264")
            + " qtdemux ! h264parse ! avdec_h264 max-threads=2 ! "
            " video/x-raw, format=I420 ! "
        )
    source_element += QUEUE("queue_scale")
    source_element += "videoscale n-threads=2 ! "
    source_element += QUEUE("queue_src_convert")
    source_element += "videoconvert n-threads=3 name=src_convert qos=false ! "
    source_element += f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height}, pixel-aspect-ratio=1/1 ! "

    pipeline_string = (
        "hailomuxer name=hmux "
        + source_element
        + "tee name=t ! "
        + QUEUE("bypass_queue", max_size_buffers=20)
        + "hmux.sink_0 "
        + "t. ! "
        + QUEUE("queue_hailonet")
        + "videoconvert n-threads=3 ! "
        f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} force-writable=true ! "
        + QUEUE("queue_hailofilter")
        + f"hailofilter so-path={self.default_postprocess_so} {self.labels_config} qos=false ! "
        + QUEUE("queue_hmuc")
        + "hmux.sink_1 "
        + "hmux. ! "
        + QUEUE("queue_hailo_python")
        + QUEUE("queue_user_callback")
        + "identity name=identity_callback ! "
        + QUEUE("queue_hailooverlay")
        + "hailooverlay ! "
        + QUEUE("queue_videoconvert")
        + "videoconvert n-threads=3 qos=false ! "
        + QUEUE("queue_hailo_display")
        + f"fpsdisplaysink video-sink={self.video_sink} name=hailo_display sync={self.sync} text-overlay={self.options_menu.show_fps} signal-fps-measurements=true "
    )
    print(pipeline_string)
    return pipeline_string

if name == “main”:
# Create an instance of the user app callback class
user_data = user_app_callback_class()
parser = get_default_parser()
# Add additional arguments here
parser.add_argument(
“–network”,
default=“yolov6n”,
choices=[‘yolov6n’, ‘yolov8s’, ‘yolox_s_leaky’],
help=“Which Network to use, default is yolov6n”,
)
parser.add_argument(
“–hef-path”,
default=None,
help=“Path to HEF file”,
)
parser.add_argument(
“–labels-json”,
default=None,
help=“Path to costume labels JSON file”,
)
args = parser.parse_args()
app = GStreamerDetectionApp(args, user_data)
app.run()

Error: gst-resource-error-quark: Cannot identify device ‘/dev/video0’. (3), …/sys/v4l2/v4l2_calls.c(608): gst_v4l2_open (): /GstPipeline:pipeline0/GstV4l2Src:src_0:
system error: No such file or directory

Do you know what maybe going wrong?

Is it was real-time with FSP >= 30, when decode frames and inference those frames with yolov6n??