# Tutorial 4

# Waypoint Publishing and Following


# Waypoint Publisher

To begin, we need first need to set up our file structure.

Set up a new C++ package with a node. Call them both waypoint_publisher.

ros2 pkg create --build-type ament_cmake --node-name <node_name> <package_name>

Inside your project root, create a folder called data. This is where our waypoints will be stored. You should populate it with waypoints files (.csv) from f1tenth_racetracks. We will be creating our waypoint lines later. We will be using the centerline data for this tutorial. Check src/f1tenth_gym_ros/config/sim.yaml to see what map you currently have active.

Once completed, your file structure should look something like this:

You do not need all of these tracks, just the active one's centerline (see above).

Code source: https://gitlab.msu.edu/canvas/soar/public/f1tenth/-/blob/main/src/waypoint_publisher/src/waypoint_publisher.cpp

We are going to work through the code. Note that it is C++ but the ROS convention is very similar to Python.

waypoint_publisher.cpp
class WaypointPublisher : public rclcpp::Node {
  public:
    WaypointPublisher()
    : Node("waypoint_publisher")
    {
      publisher_ = this->create_publisher<nav_msgs::msg::Path>("waypoints", 10);
      timer_ = this->create_wall_timer(1s, std::bind(&WaypointPublisher::timer_callback, this));

      std::string file_name = "Spa_slammed_mincurv.csv";

      std::string file_loc = get_file_loc(file_name);
      RCLCPP_INFO(this->get_logger(), "File location: %s", file_loc.c_str());

      path = get_waypoints(file_loc, file_name);
      RCLCPP_INFO(this->get_logger(), "Loaded %d waypoints", (int)path.poses.size());

      this->timer_callback();
    }

    nav_msgs::msg::Path path;

Shown above is the public portion of our node. Lines 1-5 handle defining our node class. We are declaring it as a child of "Node" (a ROS class) and passing our node's name ("waypoint_publisher"). Pretty simple stuff, most C++ ROS nodes will begin in this fashion.

On line 6 we declare a publisher which communicates on the "/waypoints" topic with "Path" messages (which is just an array of waypoints). Line 7 creates a 1 second timer for a function called timer_callback which we will define soon. This is conventional code that is used in ROS C++ nodes.

Line 9 declares a string that describes the filename that contains the set of waypoints we want to publish. Line 17 calls our timer_callback function so that we don't have to wait 1 second when our node launches to get our waypoints published.

timer_callback is simply:

    void timer_callback() {
      publisher_->publish(path);
      RCLCPP_INFO(this->get_logger(), "Publishing %d waypoints", (int)path.poses.size());
    }

Line 11 contains a simple function which gets the absolute path of our waypoint file after it is compiled. It is defined as:

    std::string get_file_loc(std::string file_name) {
       return std::string((std::string)ament_index_cpp::get_package_share_directory("waypoint_publisher") + "/data/" + file_name);
    }

Last but not least, line 14 calls an essential function: get_waypoints. This function reads our .csv waypoint file and converts it to a Path object.

    nav_msgs::msg::Path get_waypoints(std::string filepath, std::string filename) {
      nav_msgs::msg::Path new_path;
      path.header.frame_id = "map";
      path.header.stamp = this->now();

      FILE *fp;
      fp = fopen(filepath.c_str(), "r");
      if (fp == NULL) {
        RCLCPP_ERROR(this->get_logger(), "Failed to open file: %s", filepath.c_str());
        return new_path;
      }

      char buf[256];
      while (fgets(buf, sizeof(buf), fp) != NULL) {

        double x, y, v;
        if(filename.find("centerline") != std::string::npos) {
          sscanf(buf, "%lf, %lf", &x, &y);
          v = 0;
        } else if(filename.find("mincurv") != std::string::npos) {
          sscanf(buf, "%lf,%lf,%lf", &x, &y, &v);
        } else { // raceline
          sscanf(buf, "%*s %lf %lf", &x, &y);
          v = 0;
        }
        geometry_msgs::msg::PoseStamped pose;
        pose.header = path.header;
        pose.pose.position.x = x;
        pose.pose.position.y = y;
        pose.pose.position.z = v;

        // orientation
        if (new_path.poses.size() > 0) {
          std::vector<double> orientation = get_orientation(new_path.poses[new_path.poses.size()-1], pose);
          pose.pose.orientation.x = orientation[0];
          pose.pose.orientation.y = orientation[1];
          pose.pose.orientation.z = orientation[2];
          pose.pose.orientation.w = orientation[3];
        } else {
          pose.pose.orientation.x = 0.0;
          pose.pose.orientation.y = 0.0;
          pose.pose.orientation.z = 0.0;
          pose.pose.orientation.w = 1.0;
        }
        new_path.poses.push_back(pose);
      }
      return new_path;
    }

We begin by creating our Path object then opening our file. We then read through each line and extract variables x, y, and v (if available). Then we simply assign the coordinates to a PoseStamped, get the orientation (based on the direction from the last point) and push it to our Path's poses vector. Finally, at the end, we return the Path.

The get_orientation function is simple and can be found in the source code.

Lastly, our cpp file needs a main function. Once again, this is boilerplate code and can be copied for future ROS C++ nodes.

int main(int argc, char ** argv)
{
  rclcpp::init(argc, argv);
  rclcpp::spin(std::make_shared<WaypointPublisher>());
  rclcpp::shutdown();
  return 0;
}

The necessary imports for this node are:

#include <chrono>
#include <cstdio>
#include <memory>
#include <string>

#include <rclcpp/rclcpp.hpp>
#include <ament_index_cpp/get_package_share_directory.hpp>

#include <nav_msgs/msg/path.hpp>

using namespace std::chrono_literals;

The using chrono literals namespace is so we can use "1s" for our timer.

We then update package.xml to look like so:

package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>waypoint_publisher</name>
  <version>0.0.0</version>
  <description>TODO: Package description</description>
  <maintainer email="alexdalat@todo.todo">alexdalat</maintainer>
  <license>TODO: License declaration</license>

  <buildtool_depend>ament_cmake</buildtool_depend>

  <depend>rclcpp</depend>
  <depend>std_msgs</depend>
  <depend>nav_msgs</depend>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

Then, make sure the package compiles the data directory so our node can read them at runtime. Simply append:

# Include data/ folder
install(DIRECTORY
  data
  DESTINATION share/${PROJECT_NAME})
)

Additionally, we need to link any libraries we are using such as std_msgs and nav_msgs.

Your CMakeLists.txt should look like the following:

CMakeLists.txt
cmake_minimum_required(VERSION 3.8)
project(waypoint_publisher)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)

find_package(rclcpp REQUIRED)
find_package(std_msgs REQUIRED)
find_package(nav_msgs REQUIRED)

# Create executable
add_executable(waypoint_publisher src/waypoint_publisher.cpp)

# Link dependencies
ament_target_dependencies(waypoint_publisher rclcpp std_msgs nav_msgs)

# Include data/ folder
install(DIRECTORY
  data
  DESTINATION share/${PROJECT_NAME})

# Include header files
target_include_directories(waypoint_publisher PUBLIC
  $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
  $<INSTALL_INTERFACE:include>)

# Require C99 and C++17
target_compile_features(waypoint_publisher PUBLIC c_std_99 cxx_std_17)

# Install executable to lib/ for ros2 run
install(TARGETS
  waypoint_publisher
  DESTINATION lib/${PROJECT_NAME})

ament_package()

When you compile and run the node, the following output should appear and continue logging every second.

That's it for our waypoint publisher package! You can visualize it by toggling the waypoints topic in Foxglove Studio.


# Waypoint Follow

Set up a new Python (yay!) package with a node. Call them both waypoint_follow.

ros2 pkg create --build-type ament_python --node-name <node_name> <package_name>

Programming this node will be more interpretive but we will provide a template and the solution can be found on the GitLab.

By adding a raw messages panel to foxglove we can observe our /waypoints messages format for the first couple of waypoints:

We can create a subscriber and store the points into our waypoint handler:

waypoint_follow.py
self.got_waypoints = False
# subscribe to waypoint publisher
self.create_subscription(
    Path, "/waypoints", self.waypoint_callback, 10
)

...

def waypoint_callback(self, msg):
    if(not self.got_waypoints):
        self.get_logger().info("Received waypoints!")
        self.got_waypoints = True

    # continue updating in case path changes for obstacles
    self.navigator.waypoint_handler.update_waypoints(msg)

Then iterate through our waypoints like so:

WaypointHandler.py
    point_in_front = (car_pos[0] + math.cos(car_yaw) * dist, car_pos[1] + math.sin(car_yaw) * dist)
    
    for waypoint in self.waypoints.poses:
        distance = math.sqrt((waypoint.pose.position.x - point_in_front[0])**2 + (waypoint.pose.position.y - point_in_front[1])**2)
    
        if distance < closest_distance:
            closest_distance = distance
            closest_waypoint = waypoint

The rest of the waypoint handler can be found below.

The meat of the code comes by creating a ROS timer which will act out our car's commands. Create a timer for your drive function that is executed 100 times per second. Inside the drive method, check that you have waypoints, otherwise you should skip (return).

Lastly, you must create a file named WaypointNavigator.py that defines a class called WaypointNavigator. The base should look as such:

class WaypointNavigator:
    def __init__(self):
        self.car_length = 0.3302

        self.current_steering = 0.0
        self.current_speed = 0.0

        self.waypoint_handler = WaypointHandler()

    def get_drive(self, car_odom):
        pass

Using https://thomasfermi.github.io/Algorithms-for-Automated-Driving/Control/PurePursuit.html and methods in WaypointHandler (specifically, get_next_waypoint), implement steering controls so that the car successfully steers towards the given waypoint.

The essentials to pure pursuit are as follows. The vehicle's leftmost wheels are portrayed in blue. TC = target point, ICR = instantaneous center of rotation.

Through trigonometry, a steering angle can be derived to direct the vehicle towards the point at distance l_d.

Once complete, attempt to implement velocity regulation through lidar sensing or other techniques.

import math

class WaypointHandler:
    def __init__(self):
        self.waypoints = None
        self.car_length = 0.3302

    def get_next_waypoint(self, car_pos, car_yaw, dist = 0.6604):
        """
        Returns next waypoint in front of the car
        """
        if(self.waypoints is None):
            return None, 0.0, 0.0

        # distance of point inf front of car based
        closest_waypoint = None
        closest_distance = float('inf')

        point_in_front = (car_pos[0] + math.cos(car_yaw) * dist, car_pos[1] + math.sin(car_yaw) * dist)

        for waypoint in self.waypoints.poses:
            distance = math.sqrt((waypoint.pose.position.x - point_in_front[0])**2 + (waypoint.pose.position.y - point_in_front[1])**2)

            if distance < closest_distance:
                closest_distance = distance
                closest_waypoint = waypoint

        if closest_waypoint is None:
            return None, 0.0, 0.0, 0.0

        angle, distance = self.get_target(car_pos, closest_waypoint)
        waypoint_angle = euler_from_quaternion((closest_waypoint.pose.orientation.x, closest_waypoint.pose.orientation.y, closest_waypoint.pose.orientation.z, closest_waypoint.pose.orientation.w))[2]

        return closest_waypoint, angle, waypoint_angle, distance

    def angle_check(self,angle):
        while angle > math.pi:
            angle -= 2 * math.pi
        while angle < -math.pi:
            angle += 2 * math.pi
        return angle
    
    def get_relative_angle(self, car_yaw, angle):
        relative_angle = angle - car_yaw
        relative_angle = self.angle_check(relative_angle)
        return relative_angle

    def get_target(self, car_pos, next_waypoint):
        """
        Returns angle from car position to waypoint position and distance between
        """
        dx = next_waypoint.pose.position.x - car_pos[0]
        dy = next_waypoint.pose.position.y - car_pos[1]
        return math.atan2(dy, dx), math.sqrt(dx**2 + dy**2)

    def update_waypoints(self, waypoints):
        self.waypoints = waypoints
import rclpy
from rclpy.node import Node

from nav_msgs.msg import Odometry, Path

from ackermann_msgs.msg import AckermannDriveStamped

from .WaypointNavigator import WaypointNavigator


class FollowWaypoint(Node):
    """
    Minimal publisher class
    """

    def __init__(self):
        super().__init__("waypoint_follow")
        self.get_logger().info("Waypoint Follow Node Started")

        # publish to a car's drive
        self.drive_topic = "/drive"
        self.publisher = self.create_publisher(
            AckermannDriveStamped, self.drive_topic, 1
        )
        self.get_logger().info("Publishing to %s" % self.drive_topic)

        # subscribe to a car's odom
        self.odom_topic = "/odom"
        self.subscriber = self.create_subscription(
            Odometry, self.odom_topic, self.odom_callback, 10
        )

        # subscribe to waypoint publisher
        self.waypoint_topic = "/waypoints"
        self.waypoint_subscriber = self.create_subscription(
            Path, self.waypoint_topic, self.waypoint_callback, 10
        )
        self.get_logger().info("Waiting for waypoints...")

        # how often we will send commands to the car
        timer_period = 0.01
        self.timer = self.create_timer(timer_period, self.timer_callback)

        # waypoint navigator
        self.navigator = WaypointNavigator()

        self.car_odom = Odometry()
        self.got_odom = False
        self.got_waypoints = False

    def timer_callback(self):
        """
        Drive timer callback
        """
        if not self.got_odom or not self.got_waypoints:
            return

        timestamp = self.get_clock().now()

        speed, steering_angle = self.navigator.get_drive(self.car_odom)

        msg = AckermannDriveStamped()
        msg.header.stamp = timestamp.to_msg()
        msg.drive.speed = speed
        msg.drive.steering_angle = steering_angle

        self.publisher.publish(msg)

        self.got_odom = False  # we want odom to be fresh every time we run

    def odom_callback(self, msg):
        self.car_odom = msg
        self.got_odom = True

    def waypoint_callback(self, msg):
        if(not self.got_waypoints):
            self.get_logger().info("Received waypoints for the first time!")
            self.got_waypoints = True

        # continue updating in case path changes for obstacles
        self.navigator.waypoint_handler.update_waypoints(msg)


def main(args=None):
    """
    Main
    """
    rclpy.init(args=args)

    follow_waypoint = FollowWaypoint()
    rclpy.spin(follow_waypoint)

    follow_waypoint.destroy_node()
    rclpy.shutdown()


if __name__ == "__main__":
    main()