Hi,
I am trying to use hailopython to retrieve detection data from my custom hef model. The postprocess.py will run once and then the stream will throw the UnicodeDecodeError.
However, if I use hailofilter and the built-in postprocess .so files, the script works perfectly fine.
I have quite limited knowledge on hailo and gstreamer, so any help is appreciated.
Here is my rtsp.py:
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
import os
import sys
import hailo
import threading
class HailoRTSPDetection:
def __init__(self):
self.rtsp_url = "***" # RTSP URL
self.network = "yolo8s"
self.batch_size = 4
self.network_width = 640
self.network_height = 640
self.network_format = "RGB"
self.current_path = os.path.dirname(os.path.abspath(__file__))
self.postprocess_dir = os.environ.get('TAPPAS_POST_PROC_DIR', '')
if not self.postprocess_dir:
raise EnvironmentError("TAPPAS_POST_PROC_DIR environment variable is not set.")
self.source_type = "rtsp"
self.setup_network()
self.create_pipeline()
def setup_network(self):
new_postprocess_path = os.path.join(self.current_path, '../resources/libyolo_hailortpp_post.so')
self.default_postprocess_so = new_postprocess_path if os.path.exists(new_postprocess_path) else os.path.join(self.postprocess_dir, 'libyolo_hailortpp_post.so')
self.hef_path = os.path.join(self.current_path, '/home/pi/hailo-rpi5-examples/g40.hef')
self.py_postprocess_path = os.path.join(self.current_path, '/home/pi/ken_repo/mp-rpi/postprocess.py')
self.thresholds_str = (
f"nms-score-threshold=0.3 "
f"nms-iou-threshold=0.45 "
f"output-format-type=HAILO_FORMAT_TYPE_FLOAT32"
)
def create_pipeline(self):
Gst.init(None)
source_element = (
f"rtspsrc location={self.rtsp_url} latency=0 ! "
"queue max-size-buffers=1 ! "
"rtph265depay ! h265parse ! avdec_h265 ! videoconvert ! videoscale ! "
f"video/x-raw, format={self.network_format}, width={self.network_width}, height={self.network_height} ! "
f"hailonet hef-path={self.hef_path} batch-size={self.batch_size} {self.thresholds_str} ! "
f"hailopython module={self.py_postprocess_path} ! "
"queue ! hailooverlay ! "
"videoconvert ! xvimagesink sync=false name=sink" )
try:
self.pipeline = Gst.parse_launch(source_element)
except GLib.Error as e:
print(f"Error creating pipeline: {e}")
sys.exit(1)
print("Pipeline created successfully.")
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect("message", self.on_bus_message)
def on_bus_message(self, bus, message):
t = message.type
if t == Gst.MessageType.EOS:
print("End-of-stream")
self.stop()
elif t == Gst.MessageType.ERROR:
print(f'message: {message}')
err, debug = message.parse_error()
print(f"Error: {err.message}")
print(f"Debug info: {debug}")
self.stop()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
print(f"Warning: {err.message}")
print(f"Debug info: {debug}")
elif t == Gst.MessageType.STATE_CHANGED:
if message.src == self.pipeline:
old_state, new_state, pending_state = message.parse_state_changed()
print(f"Pipeline state changed from {Gst.Element.state_get_name(old_state)} to {Gst.Element.state_get_name(new_state)}")
def run(self):
print("Starting Hailo RTSP Detection...")
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Unable to set the pipeline to the playing state")
sys.exit(1)
self.loop = GLib.MainLoop()
try:
self.loop.run()
except KeyboardInterrupt:
pass
finally:
self.stop()
def stop(self):
print("Stopping pipeline...")
self.pipeline.set_state(Gst.State.NULL)
self.loop.quit()
def start_pipeline_thread(self):
self.pipeline_thread = threading.Thread(target=self.run)
self.pipeline_thread.start()
def main():
detection = HailoRTSPDetection()
detection.start_pipeline_thread()
if __name__ == "__main__":
main()
postprocess.py:
import hailo
from gsthailo import VideoFrame
from gi.repository import Gst
def run(video_frame: VideoFrame):
print(f'hailo.get_hailo_detections(video_frame.roi): {hailo.get_hailo_detections(video_frame.roi)}')
Output of rtsp.py with GST_DEBUG=2: