Sending data from a thread to another subprocess

import threading
import subprocess
import os
import signal
from mrr2 import run_radar

stop_flag = False

def run_yolo_pipeline():
    return subprocess.Popen(
        "source setup_env.sh && python3 detection_yr.py --input usb --show-fps --frame-rate 30",
        shell=True,
        executable="/bin/bash",
        preexec_fn=os.setsid
    )

def run_radar_pipeline():
    global stop_flag
    while not stop_flag:
        run_radar()

if __name__ == "__main__":
    radar_thread = threading.Thread(target=run_radar_pipeline)
    radar_thread.start()

    yolo_proc = run_yolo_pipeline()

    try:
        yolo_proc.wait()
    except KeyboardInterrupt:
        print("Shutting down...")

    stop_flag = True
    radar_thread.join()

    try:
        os.killpg(os.getpgid(yolo_proc.pid), signal.SIGTERM)
    except Exception as e:
        print("Error killing YOLO process:", e)
def update():
    global buffer, radar_points
    points = []
    if ser.in_waiting:
        buffer += ser.read(ser.in_waiting)
        ptr = buffer.find(magic_word)
        if ptr != -1:
            try:
                session = MRR_session(buffer, ptr)
                messages = session.get_dict()
                print(messages)
                for msg in messages['messages']:
                    header = msg.get("header", {})
                    if header.get("numTLVs", 0) > 0:
                        for tlv in msg.get("body", []):
                            data = tlv.get('body', {}).get('data', [])
                            timestamp = time.time()
                            for entry in data:
                                x = entry.get('x')
                                y = entry.get('y')
                                xd = entry.get('xd')
                                yd = entry.get('yd')
                                if x is not None and y is not None:
                                    x_scaled = x / (2 ** 7)
                                    y_scaled = y / (2 ** 7)
                                    point = {
                                        "timestamp": timestamp,
                                        "x": x_scaled,
                                        "y": y_scaled,
                                        "z": 1.0,
                                        "xd": xd,
                                        "yd": yd
                                    }
                                    points.append(point)
                buffer = b""
            except Exception as e:
                print("Incomplete or corrupt message:", e)

def run_radar():
    update()
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst

import hailo
from hailo_apps.hailo_app_python.core.common.buffer_utils import get_caps_from_pad
from hailo_apps.hailo_app_python.core.gstreamer.gstreamer_app import app_callback_class
from hailo_apps.hailo_app_python.apps.detection.detection_pipeline import GStreamerDetectionApp

class user_app_callback_class(app_callback_class):
    def __init__(self):
        super().__init__()

def app_callback(pad, info, user_data):
    buffer = info.get_buffer()
    if buffer is None:
        return Gst.PadProbeReturn.OK

    user_data.increment()

    format, width, height = get_caps_from_pad(pad)
    
    frame = None
    user_data.use_frame = True

    roi = hailo.get_roi_from_buffer(buffer)
    detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

    for detection in detections:
        #some processing
                   
    return Gst.PadProbeReturn.OK

if __name__ == "__main__":
    user_data = user_app_callback_class()
    app = GStreamerDetectionApp(app_callback, user_data)
    try:
        app.run()
    except KeyboardInterrupt:
        print("Interrupted by user. Saving detections...")
    except Exception as e:
        print("Unexpected error:", e)

I’m working on a sensor fusion project where the radar outputs target positions and speeds at 15 FPS, and the camera runs YOLO object detection at 30 FPS. I’ve managed to run both in parallel, with the radar running in a thread and YOLO running as a separate subprocess, and also saved the results separately as arrays. Above is the threading script, radar script, yolo script from top to bottom

My goal is to achieve real time fusion by taking the most recent radar points from the update() function, which reads radar data from the serial port, decodes it, and outputs a timestamped list of scaled positions and velocities, and pass them to the YOLO subprocess for fusion processing.

Is this possible? What would be a robust method to share this latest radar data with the YOLO subprocess?

1 Like

Hey @Fong_Zi_Hao,

This sounds like a really cool project you’re working on for the challenge! I’ve been thinking about your setup, and here’s how I’d approach combining your radar data with YOLO detection:

Start with a shared data pipeline: Set up a communication channel between your radar system and YOLO using Python’s multiprocessing tools. Think of it like a conveyor belt that can hold up to 5 pieces of data at once - this prevents your system from getting overwhelmed with old information.

Update your radar processing: Modify how your radar collects data so it timestamps each reading and puts it on that conveyor belt. When the belt gets full, the oldest data automatically gets pushed off, so your YOLO system always works with fresh radar information.

Connect YOLO to the data stream: Expand your existing YOLO callback system to grab the latest radar data from that shared conveyor belt. Make it non-blocking so it doesn’t slow things down - just grab whatever’s newest and keep moving.

Bring it all together: In your main processing function, combine the radar readings with what YOLO is seeing, but only when you have good data from both sensors.

Hope this helps , If you need more info about each of them or help with the coding will be happy to do so!