Skip to main content

Lab 09-01: ROS2 Fundamentals and Communication

Objectives​

By the end of this lab, you will be able to:

  • Create ROS2 nodes for robot control
  • Implement publishers and subscribers for sensor/actuator data
  • Use ROS2 services for discrete commands
  • Understand and apply QoS settings for real-time robotics

Prerequisites​

  • Completed Module 05 (Dynamics and Control)
  • Basic Python programming skills
  • Docker installed for containerized ROS2 environment

Materials​

TypeNameTierNotes
softwareROS2 HumblerequiredRobot Operating System
softwarePython 3.10+requiredrclpy interface
softwareDockerrequiredContainerized environment
simulationGazebo FortressrequiredSimulation backend

Background​

Why ROS2?​

ROS2 provides:

  • Standardized interfaces for robot components
  • Real-time capable communication (DDS)
  • Multi-platform support (Linux, Windows, macOS)
  • Lifecycle management for production systems

Communication Patterns​

  1. Topics: Continuous data streams (sensors, commands)
  2. Services: Request/response (discrete operations)
  3. Actions: Long-running tasks with feedback

Quality of Service (QoS)​

QoS profiles control:

  • Reliability: Best-effort vs. reliable delivery
  • Durability: Transient vs. volatile data
  • History: How many messages to buffer
  • Deadline: Maximum time between messages

Instructions​

Step 1: Set Up ROS2 Environment​

Launch the Docker container with ROS2:

# Pull the ROS2 Humble image with Gazebo
docker pull ros:humble-ros-base

# Run with display forwarding (Linux)
docker run -it --rm \
-e DISPLAY=$DISPLAY \
-v /tmp/.X11-unix:/tmp/.X11-unix \
-v $(pwd)/textbook:/textbook \
--network host \
ros:humble-ros-base \
bash

Inside the container, verify ROS2:

# Source ROS2
source /opt/ros/humble/setup.bash

# Check installation
ros2 --version
ros2 topic list

Expected: ROS2 command returns version, ros2 topic list shows /rosout.

Step 2: Create a Simple Publisher Node​

Create a joint command publisher:

#!/usr/bin/env python3
"""
joint_publisher.py
Publishes joint position commands to a robot.
"""

import rclpy
from rclpy.node import Node
from std_msgs.msg import Float64MultiArray
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
import numpy as np

class JointCommandPublisher(Node):
"""Publishes joint commands at a fixed rate."""

def __init__(self):
super().__init__('joint_command_publisher')

# QoS profile for real-time control
qos_profile = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10
)

# Create publisher
self.publisher = self.create_publisher(
Float64MultiArray,
'/joint_commands',
qos_profile
)

# Timer for periodic publishing (100 Hz)
self.timer = self.create_timer(0.01, self.timer_callback)

# State
self.time = 0.0
self.joint_positions = np.zeros(7) # 7-DOF arm

self.get_logger().info('Joint command publisher initialized')

def timer_callback(self):
"""Publish joint commands."""
self.time += 0.01

# Generate sinusoidal motion for first two joints
self.joint_positions[0] = 0.5 * np.sin(2 * np.pi * 0.2 * self.time)
self.joint_positions[1] = 0.3 * np.sin(2 * np.pi * 0.3 * self.time)

# Create message
msg = Float64MultiArray()
msg.data = self.joint_positions.tolist()

# Publish
self.publisher.publish(msg)

# Log periodically
```python
```python
if int(self.time * 10) % 50 == 0:
            self.get_logger().info(f't={self.time:.1f}s: q[0]={self.joint_positions[0]:.3f}')

def main(args=None):
rclpy.init(args=args)
node = JointCommandPublisher()

try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

```python
```python
if __name__ == '__main__':
    main()

Run the publisher:

# In one terminal
python3 joint_publisher.py

# In another terminal
ros2 topic list
ros2 topic echo /joint_commands

Expected: Topic /joint_commands appears, echoing shows Float64MultiArray messages at 100 Hz.

Step 3: Create a Subscriber Node​

Create a joint state subscriber:

#!/usr/bin/env python3
"""
joint_subscriber.py
Subscribes to joint states and computes control errors.
"""

import rclpy
from rclpy.node import Node
from std_msgs.msg import Float64MultiArray
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
import numpy as np

class JointStateSubscriber(Node):
"""Subscribes to joint states and commands, computes errors."""

def __init__(self):
super().__init__('joint_state_subscriber')

# QoS profile
qos_profile = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10
)

# Subscribers
self.cmd_subscriber = self.create_subscription(
Float64MultiArray,
'/joint_commands',
self.cmd_callback,
qos_profile
)

self.state_subscriber = self.create_subscription(
Float64MultiArray,
'/joint_states',
self.state_callback,
qos_profile
)

# State storage
self.current_command = None
self.current_state = None

# Error tracking
self.error_history = []
self.max_history = 1000

self.get_logger().info('Joint state subscriber initialized')

def cmd_callback(self, msg):
"""Store latest command."""
self.current_command = np.array(msg.data)

def state_callback(self, msg):
"""Process joint state, compute error."""
self.current_state = np.array(msg.data)

if self.current_command is not None:
error = self.current_command - self.current_state
self.error_history.append(error.copy())

# Trim history
```python
```python
if len(self.error_history) > self.max_history:
                self.error_history.pop(0)

# Log RMS error
rms_error = np.sqrt(np.mean(error**2))
self.get_logger().info(f'RMS tracking error: {rms_error:.4f} rad')

def get_statistics(self):
"""Compute error statistics."""
```python
```python
if len(self.error_history) < 10:
            return None

errors = np.array(self.error_history)
return {
'mean_error': np.mean(np.abs(errors), axis=0),
'max_error': np.max(np.abs(errors), axis=0),
'rms_error': np.sqrt(np.mean(errors**2, axis=0))
}

def main(args=None):
rclpy.init(args=args)
node = JointStateSubscriber()

try:
rclpy.spin(node)
except KeyboardInterrupt:
stats = node.get_statistics()
if stats:
print("\nError Statistics:")
print(f"Mean: {stats['mean_error']}")
print(f"Max: {stats['max_error']}")
print(f"RMS: {stats['rms_error']}")
finally:
node.destroy_node()
rclpy.shutdown()

```python
```python
if __name__ == '__main__':
    main()

Expected: Subscriber receives commands and (simulated) states, computes tracking errors.

Step 4: Implement a ROS2 Service​

Create a service for robot mode switching:

#!/usr/bin/env python3
"""
robot_service.py
Provides a service for robot mode switching.
"""

import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
from rcl_interfaces.msg import SetParametersResult

class RobotModeService(Node):
"""Service node for robot mode control."""

def __init__(self):
super().__init__('robot_mode_service')

# Service
self.srv = self.create_service(
SetBool,
'/robot/enable',
self.enable_callback
)

# State
self.enabled = False

# Parameters
self.declare_parameter('max_velocity', 1.0)
self.declare_parameter('safety_enabled', True)

self.get_logger().info('Robot mode service ready')

def enable_callback(self, request, response):
"""Handle enable/disable requests."""
if request.data:
# Enable robot
safety_ok = self.get_parameter('safety_enabled').value
if safety_ok:
self.enabled = True
response.success = True
response.message = 'Robot enabled'
self.get_logger().info('Robot ENABLED')
else:
response.success = False
response.message = 'Cannot enable: safety checks failed'
self.get_logger().warn('Enable request rejected: safety')
else:
# Disable robot
self.enabled = False
response.success = True
response.message = 'Robot disabled'
self.get_logger().info('Robot DISABLED')

return response

def main(args=None):
rclpy.init(args=args)
node = RobotModeService()

try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

```python
```python
if __name__ == '__main__':
    main()

Test the service:

# In one terminal
python3 robot_service.py

# In another terminal
ros2 service list
ros2 service call /robot/enable std_srvs/srv/SetBool "{data: true}"
ros2 service call /robot/enable std_srvs/srv/SetBool "{data: false}"

Expected: Service responds to enable/disable requests with success/failure.

Step 5: Create Launch File​

Create a launch file to start all nodes:

#!/usr/bin/env python3
"""
robot_launch.py
Launch file for robot control system.
"""

from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration

def generate_launch_description():
"""Generate launch description."""

# Declare arguments
sim_arg = DeclareLaunchArgument(
'use_sim',
default_value='true',
description='Use simulation time'
)

rate_arg = DeclareLaunchArgument(
'control_rate',
default_value='100.0',
description='Control loop rate in Hz'
)

# Nodes
command_publisher = Node(
package='physical_ai_textbook',
executable='joint_publisher',
name='joint_command_publisher',
parameters=[{
'use_sim_time': LaunchConfiguration('use_sim'),
'publish_rate': LaunchConfiguration('control_rate')
}],
output='screen'
)

state_subscriber = Node(
package='physical_ai_textbook',
executable='joint_subscriber',
name='joint_state_subscriber',
parameters=[{
'use_sim_time': LaunchConfiguration('use_sim')
}],
output='screen'
)

mode_service = Node(
package='physical_ai_textbook',
executable='robot_service',
name='robot_mode_service',
parameters=[{
'max_velocity': 1.5,
'safety_enabled': True
}],
output='screen'
)

return LaunchDescription([
sim_arg,
rate_arg,
command_publisher,
state_subscriber,
mode_service
])

Expected: Launch file syntax is valid, all nodes can be started together.

Step 6: QoS Configuration for Real-Time​

Explore QoS settings impact:

#!/usr/bin/env python3
"""
qos_comparison.py
Compare different QoS settings for robot communication.
"""

import rclpy
from rclpy.node import Node
from std_msgs.msg import Float64MultiArray
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy
import time

class QoSComparisonNode(Node):
"""Test different QoS configurations."""

def __init__(self):
super().__init__('qos_comparison')

# QoS profiles to test
self.qos_profiles = {
'best_effort': QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1
),
'reliable': QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
),
'sensor_data': QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=5,
durability=QoSDurabilityPolicy.VOLATILE
),
'control_command': QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
)
}

# Statistics
self.message_counts = {name: 0 for name in self.qos_profiles}
self.latencies = {name: [] for name in self.qos_profiles}

# Create publishers for each profile
self.publishers = {}
self.subscribers = {}

for name, qos in self.qos_profiles.items():
topic = f'/test_{name}'
self.publishers[name] = self.create_publisher(
Float64MultiArray, topic, qos
)
self.subscribers[name] = self.create_subscription(
Float64MultiArray, topic,
lambda msg, n=name: self.callback(msg, n),
qos
)

# Timer for publishing
self.timer = self.create_timer(0.01, self.publish_all) # 100 Hz
self.start_time = time.time()

self.get_logger().info('QoS comparison node started')

def publish_all(self):
"""Publish to all topics."""
msg = Float64MultiArray()
msg.data = [time.time()] # Timestamp for latency measurement

for pub in self.publishers.values():
pub.publish(msg)

def callback(self, msg, profile_name):
"""Handle received message."""
self.message_counts[profile_name] += 1

# Compute latency
send_time = msg.data[0]
recv_time = time.time()
latency = (recv_time - send_time) * 1000 # ms
self.latencies[profile_name].append(latency)

def report(self):
"""Print statistics."""
duration = time.time() - self.start_time
print(f"\nQoS Comparison Results ({duration:.1f}s):")
print("-" * 60)

for name in self.qos_profiles:
count = self.message_counts[name]
lats = self.latencies[name]

if lats:
avg_lat = sum(lats) / len(lats)
max_lat = max(lats)
print(f"{name:20s}: {count:5d} msgs, "
f"avg latency: {avg_lat:.2f}ms, max: {max_lat:.2f}ms")
else:
print(f"{name:20s}: {count:5d} msgs, no latency data")

def main(args=None):
rclpy.init(args=args)
node = QoSComparisonNode()

try:
# Run for 10 seconds
timeout = 10.0
start = time.time()
```python
```python
while time.time() - start < timeout:
            rclpy.spin_once(node, timeout_sec=0.1)
except KeyboardInterrupt:
pass
finally:
node.report()
node.destroy_node()
rclpy.shutdown()

```python
```python
if __name__ == '__main__':
    main()

Expected: Different QoS profiles show different message delivery characteristics.

Expected Outcomes​

After completing this lab, you should have:

  1. Code artifacts:

    • joint_publisher.py: Joint command publisher
    • joint_subscriber.py: Joint state subscriber
    • robot_service.py: Robot mode service
    • robot_launch.py: Launch file
    • qos_comparison.py: QoS analysis tool
  2. Understanding:

    • ROS2 node architecture
    • Topic-based communication for continuous data
    • Service-based communication for discrete operations
    • QoS profile selection for robotics applications
  3. Running system:

    • Multiple nodes communicating via topics and services
    • Understanding of message latency and reliability

Rubric​

CriterionPointsDescription
Publisher Node20Correct topic publishing at specified rate
Subscriber Node20Proper message handling and statistics
Service Node20Working request/response handling
Launch File15Valid launch configuration
QoS Analysis15Understanding of QoS tradeoffs
Documentation10Clear code comments and README
Total100

Common Errors​

Error: "No module named rclpy" Solution: Ensure ROS2 is sourced: source /opt/ros/humble/setup.bash

Error: Topic not found by subscriber Solution: Check topic names match, verify QoS compatibility.

Error: Service timeout Solution: Ensure service node is running, check network configuration.

Extensions​

For advanced students:

  1. Actions: Implement action server for trajectory execution
  2. Parameter Server: Dynamic reconfiguration of controller gains
  3. Lifecycle Nodes: Proper startup/shutdown management
  4. Component Nodes: Efficient single-process multi-node setup
  • Theory: Module 09 theory.md, Section 9.1 (ROS2 Architecture)
  • Next Lab: Lab 09-02 (tf2 and Coordinate Frames)
  • Application: Real robot integration, multi-robot systems