- 
                Notifications
    You must be signed in to change notification settings 
- Fork 1.6k
Description
I’m working on outdoor navigation and have successfully tuned my robot’s parameters. I’m now able to generate both global and local paths. I’ve also set up the transformed_global_path, which represents the pruned version of the global path. This pruned path contains many (x, y) points forming a dense trajectory. I’d like to know if it’s possible to extract only a few points from it — essentially, to downsample the path.
I tried to downsample it and convert the resulting points into markers to represent waypoints. However, when my robot moves, the markers move along with it instead of staying fixed in the map frame.
Additionally, I’d like some guidance on which path (global, local, or transformed/pruned path) should be used to handle obstacle avoidance more effectively — ideally allowing the robot to detect and react to obstacles from a greater distance.
here is my code.
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Path
from visualization_msgs.msg import Marker, MarkerArray
from geometry_msgs.msg import Point
import math
class NavigationConfig(Node):
    def __init__(self):
        super().__init__('navigation_config_node')
        
        # Declare and get parameters
        self.declare_parameter('downsample_spacing', 15.0)  # meters
        self.declare_parameter('path_topic', '/transformed_global_plan')
        self.declare_parameter('marker_topic', '/waypoint_markers')
        
        self.spacing = self.get_parameter('downsample_spacing').value
        path_topic = self.get_parameter('path_topic').value
        marker_topic = self.get_parameter('marker_topic').value
        
        # Create subscriber and publisher
        self.path_sub = self.create_subscription(
            Path,
            path_topic,
            self.path_callback,
            10
        )
        
        self.marker_pub = self.create_publisher(
            MarkerArray,
            marker_topic,
            10
        )
        
        self.get_logger().info(f'Path downsampler initialized with spacing: {self.spacing}m')
        self.get_logger().info(f'Subscribing to: {path_topic}')
        self.get_logger().info(f'Publishing markers to: {marker_topic}')
    
    def calculate_distance(self, p1, p2):
        """Calculate Euclidean distance between two points"""
        dx = p2.pose.position.x - p1.pose.position.x
        dy = p2.pose.position.y - p1.pose.position.y
        dz = p2.pose.position.z - p1.pose.position.z
        return math.sqrt(dx*dx + dy*dy + dz*dz)
    
    def downsample_path(self, path):
        """Downsample path based on distance threshold"""
        if not path.poses or len(path.poses) == 0:
            return []
        
        downsampled = [path.poses[0]]  # Always include first point
        accumulated_dist = 0.0
        
        for i in range(1, len(path.poses)):
            dist = self.calculate_distance(path.poses[i-1], path.poses[i])
            accumulated_dist += dist
            
            if accumulated_dist >= self.spacing:
                downsampled.append(path.poses[i])
                accumulated_dist = 0.0
        
        # Optionally include last point if not already included
        if downsampled[-1] != path.poses[-1]:
            downsampled.append(path.poses[-1])
        
        return downsampled
    
    def create_waypoint_markers(self, downsampled_poses, frame_id, timestamp):
        """Convert downsampled poses to waypoint markers"""
        marker_array = MarkerArray()
        
        for i, pose_stamped in enumerate(downsampled_poses):
            # Create sphere marker for waypoint
            marker = Marker()
            marker.header.frame_id = frame_id
            marker.header.stamp = timestamp
            marker.ns = "waypoints"
            marker.id = i
            marker.type = Marker.SPHERE
            marker.action = Marker.ADD
            
            marker.pose = pose_stamped.pose
            marker.scale.x = 0.3
            marker.scale.y = 0.3
            marker.scale.z = 0.3
            
            # Color: gradient from green to red
            ratio = i / max(len(downsampled_poses) - 1, 1)
            marker.color.r = ratio
            marker.color.g = 1.0 - ratio
            marker.color.b = 0.0
            marker.color.a = 1.0
            
            marker_array.markers.append(marker)
            
            # Create text marker for waypoint number
            text_marker = Marker()
            text_marker.header.frame_id = frame_id
            text_marker.header.stamp = timestamp
            text_marker.ns = "waypoint_labels"
            text_marker.id = i + 10000  # Offset to avoid ID conflicts
            text_marker.type = Marker.TEXT_VIEW_FACING
            text_marker.action = Marker.ADD
            
            text_marker.pose = pose_stamped.pose
            text_marker.pose.position.z = 0.0
            text_marker.scale.z = 0.3
            
            text_marker.color.r = 1.0
            text_marker.color.g = 1.0
            text_marker.color.b = 1.0
            text_marker.color.a = 1.0
            
            text_marker.text = str(i)
            
            marker_array.markers.append(text_marker)
        
        # Remove the first marker (sphere) from the array
        if marker_array.markers:
            marker_array.markers.pop(0)
        
        return marker_array
    
    def path_callback(self, msg):
        """Callback for path subscription"""
        self.get_logger().info(f'Received path with {len(msg.poses)} points')
        
        # Downsample the path
        downsampled = self.downsample_path(msg)
        self.get_logger().info(f'Downsampled to {len(downsampled)} waypoints')
        
        # Create and publish markers
        if downsampled:
            marker_array = self.create_waypoint_markers(downsampled, msg.header.frame_id, msg.header.stamp)
            self.marker_pub.publish(marker_array)
            self.get_logger().info('Published waypoint markers')
def main(args=None):
    rclpy.init(args=args)
    node = NavigationConfig()
    
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()
if __name__ == '__main__':
    main()