# Tutorial 3

# Wall Follower

Wall following is one of many solutions to vehicle path planning. In real life, the wall following method is incredibly unreliable since it is highly unlikely the environment will be isolated (e.g. consistent wall to follow). However, in a more racing-oriented environment, such as the one you are developing in right now, it is a ton more practical. As long as there is a reliable and constant point of reference, the vehicle path should be accurate.


# Step 0: Setting up the docker environment

In the previous tutorial, you were directed to build the simulation package. The same sequence applies to your development throughout future tutorials. Just as a reminder, the build sequence is as follows:

In docker, in your workspace root, ros2_ws, execute the following:

colcon build
source install/local_setup.bash

This will build the pre-existing packages.


# Step 1: Creating the necessary resources

For this step, you need to create multiple folders/resources within the ~/ros2_ws/src directory. Call it wall_follower_pid. This is the 'package' where all of the resources for the node will be.

Execute the pkg create command within the ~/ros2_ws/src directory, and make sure to build it using ament_python.

ros2 pkg create --build-type ament_python wall_follower_pid

You will then see multiple new files/folders:


After creating the above files, you can begin setting up the structure for the messages you will be creating. In this tutorial, you will be creating a PID package.

Execute the pkg create command once again within the ~/ros2_ws/src directory, but this time, build it using ament_cmake.

ros2 pkg create --build-type ament_cmake pid_msgs

In ~/ros2_ws/src/pid_msgs, create a folder called msg. We want this package to have associated messages. In the msg folder make a file called PIDInput.msg.


# Configuring the files (under pid_msgs)

# pid_msgs/msg/PIDInput.msg

Within PIDInput.msg, define 2 messages as pid_vel and pid_error. Both of these will contain 32-bit values.

Your PIDInput.msg file should look like the following:

float32 pid_vel
float32 pid_error

# pid_msgs/CMakeLists.txt

Your CMakeLists.txt should look like the following:

cmake_minimum_required(VERSION 3.5)

project(pid_msgs)

# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
  set(CMAKE_CXX_STANDARD 14)
endif()

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)

find_package(std_msgs REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/PIDInput.msg"
  DEPENDENCIES std_msgs
  ADD_LINTER_TESTS
)

ament_export_dependencies(rosidl_default_runtime)

ament_package()

# pid_msgs/package.xml

Your package.xml should look like the following:

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>pid_msgs</name>
  <version>0.0.0</version>
  <description>ROS2 messages for PID</description>
  <maintainer email="serga@todo.todo">serga</maintainer>
  <license>TODO: License declaration</license>

  <buildtool_depend>ament_cmake</buildtool_depend>
  <buildtool_depend>rosidl_default_generators</buildtool_depend>

  <build_depend>std_msgs</build_depend>

  <exec_depend>rosidl_default_runtime</exec_depend>
  <exec_depend>std_msgs</exec_depend>

  <test_depend>ament_lint_common</test_depend>

  <member_of_group>rosidl_interface_packages</member_of_group>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

# Configuring the files (under wall_follower_pid)

# wall_follower_pid/package.xml

Your package.xml file should look like the following:

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>wall_follower_pid</name>
  <version>0.0.0</version>
  <description>Wall Follower</description>
  <maintainer email="serga@todo.todo">serga</maintainer>
  <license>TODO: License declaration</license>

  <exec_depend>rclpy</exec_depend>
  
  <exec_depend>std_msgs</exec_depend>             <!--import the standard messages-->
  <exec_depend>sensor_msgs</exec_depend>          <!--import the sensor messages-->
  <exec_depend>ackermann_msgs</exec_depend>       <!--import the ackermann messages-->
  <exec_depend>pid_msgs</exec_depend>             <!--import the PID messages-->

  <test_depend>ament_copyright</test_depend>
  <test_depend>ament_flake8</test_depend>
  <test_depend>ament_pep257</test_depend>
  <test_depend>python3-pytest</test_depend>

  <export>
    <build_type>ament_python</build_type>    
  </export>
</package>

Here, you are including more dependencies the wall follower node requires.


# wall_follower_pid/setup.py

Your setup.py file should look like the following:

from setuptools import find_packages, setup

package_name = 'wall_follower_pid'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='serga',
    maintainer_email='serga@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'wall_distance_finder = wall_follower_pid.wall_distance_finder:main',
            'control = wall_follower_pid.control:main',
        ],
    },
)

The main region of interest that will change between nodes within the setup.py file is the entry_points section. Here, you are defining the subscriber (listener) as the wall_distance_finder class, and the publisher (talker) as the control. You will create both of these later in this tutorial.


# Configuring the folders

# wall_follower_pid/wall_follower_pid

Create 2 files named wall_distance_finder.py and control.py.


Overall, your file structure should look similar to the following:


# Step 2: wall_distance_finder.py

To begin, import the necessary assets:

import rclpy
import math
from rclpy.node import Node

from sensor_msgs.msg import LaserScan
from pid_msgs.msg import PIDInput

Next, create a class called WallDistanceFinder that takes an argument of type Node:

class WallDistanceFinder(Node):

Create a subscriber to the LaserScan topic:

def __init__(self):
    super().__init__('wall_distance_finder')
    self.subscription = self.create_subscription(
        LaserScan, 
        '/scan', 
        self.callback,
        10
    )

Create a publisher that publishes messages to the PIDInput topic. You can call the channel /error:

self.publisher_ = self.create_publisher(PIDInput, '/error', 10)

Initialize the class members:

# lidar beam index from the rightmost side
self.wall_projection_r = 350

# since the reference has been halved, the target needs to follow suit
self.target_distance = 0.7

# initialize the steering error
self.error = 0.0

Include the callback loop:

def callback(self, msg): 

    # reducing the sensitivity of the reference by halving the distance of the beam
    wall_distance_r = (0.5)*((msg.ranges)[self.wall_projection_r])

    # calculate the error - difference between the value target and the real one
    steering_error = self.target_distance - wall_distance_r

    # An empty msg is created of the type pid_input
    pid_msg = PIDInput()
    
    # this is the error that you want to send to the PID for steering correction.
    pid_msg.pid_error = steering_error
    
    self.publisher_.publish(pid_msg)

Overall, your wall_distance_finder.py file should look like the following:

import rclpy
import math
from rclpy.node import Node

from sensor_msgs.msg import LaserScan
from pid_msgs.msg import PIDInput

class WallDistanceFinder(Node):

    def __init__(self):
        super().__init__('wall_distance_finder')
        self.subscription = self.create_subscription(
            LaserScan, 
            '/scan', 
            self.callback,
            10
        )

        self.publisher_ = self.create_publisher(PIDInput, '/error', 10)
        
        # lidar beam index from the rightmost side
        self.wall_projection_r = 350

        # since the reference has been halved, the target needs to follow suit
        self.target_distance = 0.7
        
        # initialize the steering error
        self.error = 0.0

    def callback(self, msg): 

        # reducing the sensitivity of the reference by halving the distance of the beam
        wall_distance_r = (0.5)*((msg.ranges)[self.wall_projection_r])

        # calculate the error - difference between the value target and the real one
        steering_error = self.target_distance - wall_distance_r

        # An empty msg is created of the type pid_input
        pid_msg = PIDInput()
        
        # this is the error that you want to send to the PID for steering correction.
        pid_msg.pid_error = steering_error
        
        self.publisher_.publish(pid_msg)
        
        self.get_logger().info(f"Publishing Steering Error: \"{pid_msg.pid_error}\"")
    
def main(args=None):
    rclpy.init(args=args)

    wall_distance_finder = WallDistanceFinder()

    rclpy.spin(wall_distance_finder)

    wall_distance_finder.destroy_node()
    rclpy.shutdown()

if __name__ == "__main__":
    main()

# Step 3: control.py

For the control.py file you will follow a very comparable process to the previous step.

Start by importing the necessary assets:

import rclpy
import math
from rclpy.node import Node

from ackermann_msgs.msg import AckermannDriveStamped
from pid_msgs.msg import PIDInput

Create the PIDController class:

class PIDController(Node):

Create the subscriber to the messages you will be publishing from the previous step:

def __init__(self):
    super().__init__('pid_controller')
    self.subscription = self.create_subscription(
        PIDInput, 
        '/error', 
        self.callback,
        10
    )

Create the publisher that will send all the goodies to the f1tenth environment:

self.publisher_ = self.create_publisher(AckermannDriveStamped, '/drive', 1)

Initialize the following variables:

self.prev_steering_error = 0.0
        
# constant velocity value
self.vel_input = 4.0

# PID params
self.s_kp = 0.0 #TODO
self.s_kd = 0.0 #TODO
self.s_ki = 0.0 #TODO

s_kp - this is the proportional parameter, which is the immediate scalar factor

s_kd - this is the derivative parameter, which impacts the model's transient states (i.e. change over time)

s_ki - this is the integral parameter, which is integral 😁 for mitigating steady state errors

Don't forget the callback loop:

# grab the error from the WallDistanceFinder node
steering_error = msg.pid_error

# Calculate the error derivative (change in error)
steering_error_diff = steering_error - self.prev_steering_error

# Update the previous error for the next iteration
self.prev_steering_error = steering_error

# calculate desired steering angle from steering error (P and D)
angle = self.s_kp * steering_error + self.s_kd * steering_error_diff

# saturate the steering angle
angle = max(min(angle, 0.85), -0.85)

# initialize the Ackermann message
stamped_command = AckermannDriveStamped()

# modify the steering angle element of the /drive message
stamped_command.drive.steering_angle = angle

# modify the velocity of the /drive message 
stamped_command.drive.speed = self.vel_input

# publish the message (includes the modified elements)
self.publisher_.publish(stamped_command)

Overall, your control.py file should look like the following:

import rclpy
import math
from rclpy.node import Node

from ackermann_msgs.msg import AckermannDriveStamped
from pid_msgs.msg import PIDInput

class PIDController(Node):

    def __init__(self):
        super().__init__('pid_controller')
        self.subscription = self.create_subscription(
            PIDInput, 
            '/error', 
            self.callback,
            10
        )

        self.publisher_ = self.create_publisher(AckermannDriveStamped, '/drive', 1)

        self.prev_steering_error = 0.0
        
        # velocity value
        self.vel_input = 4.0

        # PID params
        self.s_kp = 0.0 #TODO
        self.s_kd = 0.0 #TODO
        self.s_ki = 0.0 #TODO

    def callback(self, msg):
        
        self.get_logger().info(f"Subscribing Steering Error: \"{msg.pid_error}\"")

        # grab the error from the WallDistanceFinder node
        steering_error = msg.pid_error
        
        # Calculate the error derivative (change in error)
        steering_error_diff = steering_error - self.prev_steering_error
        
        # Update the previous error for the next iteration
        self.prev_steering_error = steering_error
        
        # calculate desired steering angle from steering error (P and D)
        angle = self.s_kp * steering_error + self.s_kd * steering_error_diff
        
        # saturate the steering angle
        angle = max(min(angle, 0.85), -0.85)

        # initialize the Ackermann message
        stamped_command = AckermannDriveStamped()

        # modify the steering angle element of the /drive message
        stamped_command.drive.steering_angle = angle

        # modify the velocity of the /drive message 
        stamped_command.drive.speed = self.vel_input

        # publish the message (includes the modified elements)
        self.publisher_.publish(stamped_command)

        self.get_logger().info(f"Publishing Steering Angle: \"{stamped_command.drive.steering_angle}\"")
        
        self.get_logger().info(f"Publishing Speed: \"{stamped_command.drive.speed}\"")

def main(args=None):
    rclpy.init(args=args)

    pid_controller = PIDController()

    rclpy.spin(pid_controller)

    pid_controller.destroy_node()
    rclpy.shutdown()

if __name__ == "__main__":
    main()

That's it! you have successfully set up the node to get right to testing!


# Step 4: 🏎️

To execute the python files within the node, just use ros2 run wall_follower_pid wall_distance_finder for the wall distance finder, and ros2 run wall_follower_pid control for the actuating signals.

Your wall_distance_finder execution should look like this:

... and your control execution should look like this:

As a note, control relies on the signals published to wall_distance_finder, so nothing will be output to your terminal if you execute control before wall_distance_finder