Pose_estimation.py in example & how to generate txt label over bounding box with result

Hello,
Newbie here
I’m trying to figure out ways to use pose_estimation.py in example for actual human fall prevention & fall detection application for my aging mom when it happen and able to send notification via telebot or email to me or family member.

For example when wrist position is below knee this triggers fall detection alarm & send fall notification or when head is tilting over 35 degree, this triggers fall danger notification.

I’m assuming there has be a txt box or txt over bounding box indicating status of “fall” or “fall danger” in txt format over display output bounding box somewhere and this resulting txt or label would trigger send notification

How can I accomplish this task with Hailo & any help from community will be appreciated

Hey @Martin_Kim,

Welcome to the Hailo Community!

To implement fall detection, here are the recommended steps to add to your pose_estimation.py file:

Inside the app_callback function, you can implement the following features:

  1. Extract human body keypoints (wrist, knee, head, etc.)
  2. Set up fall detection rules using conditions such as:
    • Wrist position falling below knee level
    • Head tilt exceeding 35 degrees
  3. Add status indicators on the display (e.g., “FALL” or “FALL DANGER”)
  4. Configure alert notifications via your preferred method (Telegram, email, SMS, etc.)

You can integrate all of these features within the app_callback function, which processes incoming detection data.

Let me know if you need any clarification or additional help!

Hello omria,

Thank you so much for response & help!

I saw this guide that appears closest code I can find in my extensive internet search and it is from following;

As a newbie, I realized what I’m trying to do requires way more than short learning curve & very limited resources available.

While I tried to figure out & modify this code (in wtype example that uses body key-point), I realized there is an issue in this code, this code supposed to trigger type letter K when both elbow are raised higher than nose but actually triggered by either one of elbow are higher than nose although if statement in the code clearly shows both elbow need to be higher to trigger type letter K

In the process, I modified code so it can send telegram & also made if statement to loop 5 times to minimize false detection and got so excited until I realized those were false detection.

And I’m still working on if statements to make this as fall detector but I’m stuck here @ no progress, since actual body key-point is not represented by if statement and triggers false detection

Can you please advise,

My setup (Pi5 Bookworm 8Gb fresh install with Hailo & picam3 noir

code;
#MK-to run this code; python basic_pipelines/pose_simple.py --input rpi
#MK-this code will run simple pose estimation from the picam’s video feed
import gi
gi.require_version(‘Gst’, ‘1.0’)
from gi.repository import Gst, GLib
import os
import numpy as np
import cv2
import hailo
import threading
import time
import telebot
from queue import Queue, Empty

from hailo_apps_infra.hailo_rpi_common import (
get_caps_from_pad,
get_numpy_from_buffer,
app_callback_class,
)
from hailo_apps_infra.pose_estimation_pipeline import GStreamerPoseEstimationApp

import subprocess

def custom_processing_thread(pose_estimator):

# This gives enough time for the HAT to fire up and start detecting
time.sleep(2)

while True:
    # get the positions of all the relevant body parts
    left_wrist = pose_estimator.get_body_part_coordinates('left_wrist')
    right_wrist = pose_estimator.get_body_part_coordinates('right_wrist')
    #left_knee = pose_estimator.get_body_part_coordinates('left_knee')
    #right_knee = pose_estimator.get_body_part_coordinates('right_knee')
    #left_eye = pose_estimator.get_body_part_coordinates('left_eye')
    #right_eye = pose_estimator.get_body_part_coordinates('right_eye')
    #left_hip = pose_estimator.get_body_part_coordinates('left_hip')
    #right_hip = pose_estimator.get_body_part_coordinates('right_hip')
    #left_shoulder = pose_estimator.get_body_part_coordinates('left_shoulder')
    #right_shoulder = pose_estimator.get_body_part_coordinates('right_shoulder')
    nose = pose_estimator.get_body_part_coordinates('nose')
    
    #count = 0
    #for _ in range(5): #Example: loop 5 times
    if left_wrist[1] < nose[1] and right_wrist[1] < nose [1]:#this is to
            #test trigger function by raising both wrist above nose
            # following are general human pose when fall actually occurred in my opinion
        #if left_wrist[1] < left_knee[1]:# or left_wrist[1] < right_knee[1]:
        #else if (right_wrist[1] > left_knee[1]) or (right_wrist[1] > right_knee[1]);
        #else if (left_eye[1] > left_knee[1]) or (right_eye[1] > left_knee[1]){
        #}
        #else if (right_eye[1] > left_knee[1]) or (right_eye[1] > right_knee[1]){
        #}
        #else if (left_shoulder[1] > left_hip[1]) or (right_shoulder[1] > left_hip[1]){
        #}
        #else if (right_shoulder[1] > left_hip[1]) or (right_shoulder[1] > roght_hip[1]){
        #}
       
            #count += 1
            #if count == 4:
                # count function was put here to minimize false notification
                # this provide; if certain condition met at least 4 times in a
                # raw prior to send notification
                
                # use of multiple if condition example for referance
        # if (x > 0 and y > 0) or (x < 0 and y < 0):
        
        # Send Telegram notification when person is detected            
                user_data.send_notification()

                print("test pose detected")
                #count = 0 # Reset the counter if needed
    #else:
           #count = 0 # Reset if the condition is not met

        # sleep for 2 seconds so it doesn't trigger this hundreds of times when we raise arms
    time.sleep(2)
    
#time.sleep(0.1)

class PoseDataManager:
def init(self):
“”"
Manages pose estimation data across threads
Allows safe access to the latest detection data
“”"
self.latest_detection_lock = threading.Lock()
self.latest_detection = None
self.latest_width = None
self.latest_height = None

def update_detection(self, detection, width, height):
    """
    Update the latest detection data thread-safely
    
    :param detection: Hailo detection object
    :param width: Frame width
    :param height: Frame height
    """
    with self.latest_detection_lock:
        self.latest_detection = detection
        self.latest_width = width
        self.latest_height = height

def get_latest_detection(self):
    """
    Retrieve the latest detection data thread-safely
    
    :return: Tuple of (detection, width, height) or (None, None, None)
    """
    with self.latest_detection_lock:
        return (
            self.latest_detection, 
            self.latest_width, 
            self.latest_height
        )

class PoseEstimator:
def init(self, pose_data_manager):
“”"
Initialize PoseEstimator with a PoseDataManager

    :param pose_data_manager: Shared data management object
    """
    self.pose_data_manager = pose_data_manager
    self.keypoints = self._get_keypoints()

def _get_keypoints(self):
    """Get the COCO keypoints correspondence map."""
    return {
        'nose': 0,
        'left_eye': 1,
        'right_eye': 2,
        'left_ear': 3,
        'right_ear': 4,
        'left_shoulder': 5,
        'right_shoulder': 6,
        'left_elbow': 7,
        'right_elbow': 8,
        'left_wrist': 9,
        'right_wrist': 10,
        'left_hip': 11,
        'right_hip': 12,
        'left_knee': 13,
        'right_knee': 14,
        'left_ankle': 15,
        'right_ankle': 16,
    }
    
def get_body_part_coordinates(self, body_part, significant_figures=4):
    """
    Get normalized coordinates for a specific body part from latest detection
    
    :param body_part: Name of the body part (e.g., 'left_eye')
    :param significant_figures: Number of decimal places to round to
    :return: Tuple of normalized (x, y) coordinates or None
    """
    # Get latest detection
    detection, width, height = self.pose_data_manager.get_latest_detection()
    
    if detection is None or width is None or height is None:
        return None
    
    # If no landmarks, return None
    landmarks = detection.get_objects_typed(hailo.HAILO_LANDMARKS)
    if len(landmarks) == 0:
        return None
    
    # Get bbox and points
    bbox = detection.get_bbox()
    points = landmarks[0].get_points()
    
    # Get the specific keypoint
    keypoint_index = self.keypoints[body_part]
    point = points[keypoint_index]
    
    # Directly use the normalized coordinates from the point
    # Clamp the values between 0 and 1, then round to specified significant figures
    norm_x = round(max(0, min(1, point.x())), significant_figures)
    norm_y = round(max(0, min(1, point.y())), significant_figures)
    
    return (norm_x, norm_y)
    
def calculate_body_part_angle(self, point_a_name, point_b_name, point_c_name):
    """
    Calculate angle between three body parts directly by name, returning an angle in the full 0 to 360 degree range.
    
    :param point_a_name: First body part name (e.g., 'left_shoulder')
    :param point_b_name: Vertex body part name (e.g., 'left_elbow')
    :param point_c_name: Third body part name (e.g., 'left_wrist')
    :return: Angle in degrees or None if coordinates can't be retrieved
    """
    # Get coordinates for each body part
    point_a = self.get_body_part_coordinates(point_a_name)
    point_b = self.get_body_part_coordinates(point_b_name)
    point_c = self.get_body_part_coordinates(point_c_name)
    
    # Check if any coordinates are None
    if any(point is None for point in [point_a, point_b, point_c]):
        return None
    
    # Convert to numpy arrays
    a = np.array(point_a)
    b = np.array(point_b)
    c = np.array(point_c)
    
    # Calculate vectors
    ba = a - b
    bc = c - b
    
    # Calculate angle using arctan2 for full 360-degree range
    angle = np.degrees(np.arctan2(np.linalg.det([ba, bc]), np.dot(ba, bc)))
    
    # Ensure the angle is between 0 and 360 degrees
    if angle < 0:
        angle += 360
        
    return angle

class user_app_callback_class(app_callback_class):
def init(self, pose_data_manager):
“”"
Initialize with a PoseDataManager

    :param pose_data_manager: Shared data management object
    """        
    super().__init__()
    self.pose_data_manager = pose_data_manager
            # Telegram configuration
    self.bot = telebot.TeleBot('my telebot id')
    self.chat_id = 'xxxxxxxxxx'
    self.last_notification_time = 0
    self.cooldown_seconds = 30
    
def send_notification(self):
    current_time = time.time()
    if current_time - self.last_notification_time >= self.cooldown_seconds:
        try:
            self.bot.send_message(self.chat_id, "🚨 abnormal pose detected!")
            self.last_notification_time = current_time
            print("Telegram notification sent!")
        except Exception as e:
            print(f"Error sending Telegram notification: {str(e)}")        

def app_callback(pad, info, user_data):
# Get the GstBuffer from the probe info
buffer = info.get_buffer()
if buffer is None:
return Gst.PadProbeReturn.OK

# Get the caps from the pad
format, width, height = get_caps_from_pad(pad)

# Get the detections from the buffer
roi = hailo.get_roi_from_buffer(buffer)
detections = roi.get_objects_typed(hailo.HAILO_DETECTION)

# Find the person detection
person_detection = None
for detection in detections:
    if detection.get_label() == "person":
        person_detection = detection
        break

# If a person is detected, update the shared data
if person_detection is not None:
    user_data.pose_data_manager.update_detection(person_detection, width, height)

return Gst.PadProbeReturn.OK

if name == “main”:
# Create PoseDataManager first
pose_data_manager = PoseDataManager()

# Create an instance of the user app callback class with pose_data_manager
user_data = user_app_callback_class(pose_data_manager)

# Create pose estimator
pose_estimator = PoseEstimator(pose_data_manager)

# Start the custom processing thread
processing_thread = threading.Thread(
    target=custom_processing_thread, 
    args=(pose_estimator,), 
    daemon=True
)
processing_thread.start()

# Run the GStreamer pipeline
app = GStreamerPoseEstimationApp(app_callback, user_data)
app.run()

I think I found what is causing of this false detection.

Today, I tested this same code in standing up position that camera can see hip & whole upper body and then it worked; that condition on if statement met & triggered correct detection (that is only when both wrist are higher than nose) but this same code was not working when camera can not see hip & below, in sitting position with leg under table.

Looking at video feed while I was sitting on chair in front of camera that my hip & below were covered by desk and positioned under, I noticed pose detection on this model is actually projecting or estimating hip & below position in strange way that human body can not normally duplicate (see picture attached) and this perhaps caused false detection?

If so, what can I do to solve this issue?
Pose Detection will more likely produce false detection if people lying on bed or sitting on chair while covered by blanket over lower body, although detection calls for upper body pose detection using if statement based on x,y position coordinate of body key-point just like in this code case.

This code is very much possible to use for elderly people fall detection or pre warning only if there is work around preventing false detection while lower body is covered by blanket or sort

Help will be appreciated!

Old but newbie here