Lab 09-01: ROS2 Fundamentals and Communication
Objectives​
By the end of this lab, you will be able to:
- Create ROS2 nodes for robot control
- Implement publishers and subscribers for sensor/actuator data
- Use ROS2 services for discrete commands
- Understand and apply QoS settings for real-time robotics
Prerequisites​
- Completed Module 05 (Dynamics and Control)
- Basic Python programming skills
- Docker installed for containerized ROS2 environment
Materials​
| Type | Name | Tier | Notes |
|---|---|---|---|
| software | ROS2 Humble | required | Robot Operating System |
| software | Python 3.10+ | required | rclpy interface |
| software | Docker | required | Containerized environment |
| simulation | Gazebo Fortress | required | Simulation backend |
Background​
Why ROS2?​
ROS2 provides:
- Standardized interfaces for robot components
- Real-time capable communication (DDS)
- Multi-platform support (Linux, Windows, macOS)
- Lifecycle management for production systems
Communication Patterns​
- Topics: Continuous data streams (sensors, commands)
- Services: Request/response (discrete operations)
- Actions: Long-running tasks with feedback
Quality of Service (QoS)​
QoS profiles control:
- Reliability: Best-effort vs. reliable delivery
- Durability: Transient vs. volatile data
- History: How many messages to buffer
- Deadline: Maximum time between messages
Instructions​
Step 1: Set Up ROS2 Environment​
Launch the Docker container with ROS2:
# Pull the ROS2 Humble image with Gazebo
docker pull ros:humble-ros-base
# Run with display forwarding (Linux)
docker run -it --rm \
-e DISPLAY=$DISPLAY \
-v /tmp/.X11-unix:/tmp/.X11-unix \
-v $(pwd)/textbook:/textbook \
--network host \
ros:humble-ros-base \
bash
Inside the container, verify ROS2:
# Source ROS2
source /opt/ros/humble/setup.bash
# Check installation
ros2 --version
ros2 topic list
Expected: ROS2 command returns version, ros2 topic list shows /rosout.
Step 2: Create a Simple Publisher Node​
Create a joint command publisher:
#!/usr/bin/env python3
"""
joint_publisher.py
Publishes joint position commands to a robot.
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float64MultiArray
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
import numpy as np
class JointCommandPublisher(Node):
"""Publishes joint commands at a fixed rate."""
def __init__(self):
super().__init__('joint_command_publisher')
# QoS profile for real-time control
qos_profile = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10
)
# Create publisher
self.publisher = self.create_publisher(
Float64MultiArray,
'/joint_commands',
qos_profile
)
# Timer for periodic publishing (100 Hz)
self.timer = self.create_timer(0.01, self.timer_callback)
# State
self.time = 0.0
self.joint_positions = np.zeros(7) # 7-DOF arm
self.get_logger().info('Joint command publisher initialized')
def timer_callback(self):
"""Publish joint commands."""
self.time += 0.01
# Generate sinusoidal motion for first two joints
self.joint_positions[0] = 0.5 * np.sin(2 * np.pi * 0.2 * self.time)
self.joint_positions[1] = 0.3 * np.sin(2 * np.pi * 0.3 * self.time)
# Create message
msg = Float64MultiArray()
msg.data = self.joint_positions.tolist()
# Publish
self.publisher.publish(msg)
# Log periodically
```python
```python
if int(self.time * 10) % 50 == 0:
self.get_logger().info(f't={self.time:.1f}s: q[0]={self.joint_positions[0]:.3f}')
def main(args=None):
rclpy.init(args=args)
node = JointCommandPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
```python
```python
if __name__ == '__main__':
main()
Run the publisher:
# In one terminal
python3 joint_publisher.py
# In another terminal
ros2 topic list
ros2 topic echo /joint_commands
Expected: Topic /joint_commands appears, echoing shows Float64MultiArray messages at 100 Hz.
Step 3: Create a Subscriber Node​
Create a joint state subscriber:
#!/usr/bin/env python3
"""
joint_subscriber.py
Subscribes to joint states and computes control errors.
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float64MultiArray
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
import numpy as np
class JointStateSubscriber(Node):
"""Subscribes to joint states and commands, computes errors."""
def __init__(self):
super().__init__('joint_state_subscriber')
# QoS profile
qos_profile = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10
)
# Subscribers
self.cmd_subscriber = self.create_subscription(
Float64MultiArray,
'/joint_commands',
self.cmd_callback,
qos_profile
)
self.state_subscriber = self.create_subscription(
Float64MultiArray,
'/joint_states',
self.state_callback,
qos_profile
)
# State storage
self.current_command = None
self.current_state = None
# Error tracking
self.error_history = []
self.max_history = 1000
self.get_logger().info('Joint state subscriber initialized')
def cmd_callback(self, msg):
"""Store latest command."""
self.current_command = np.array(msg.data)
def state_callback(self, msg):
"""Process joint state, compute error."""
self.current_state = np.array(msg.data)
if self.current_command is not None:
error = self.current_command - self.current_state
self.error_history.append(error.copy())
# Trim history
```python
```python
if len(self.error_history) > self.max_history:
self.error_history.pop(0)
# Log RMS error
rms_error = np.sqrt(np.mean(error**2))
self.get_logger().info(f'RMS tracking error: {rms_error:.4f} rad')
def get_statistics(self):
"""Compute error statistics."""
```python
```python
if len(self.error_history) < 10:
return None
errors = np.array(self.error_history)
return {
'mean_error': np.mean(np.abs(errors), axis=0),
'max_error': np.max(np.abs(errors), axis=0),
'rms_error': np.sqrt(np.mean(errors**2, axis=0))
}
def main(args=None):
rclpy.init(args=args)
node = JointStateSubscriber()
try:
rclpy.spin(node)
except KeyboardInterrupt:
stats = node.get_statistics()
if stats:
print("\nError Statistics:")
print(f"Mean: {stats['mean_error']}")
print(f"Max: {stats['max_error']}")
print(f"RMS: {stats['rms_error']}")
finally:
node.destroy_node()
rclpy.shutdown()
```python
```python
if __name__ == '__main__':
main()
Expected: Subscriber receives commands and (simulated) states, computes tracking errors.
Step 4: Implement a ROS2 Service​
Create a service for robot mode switching:
#!/usr/bin/env python3
"""
robot_service.py
Provides a service for robot mode switching.
"""
import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
from rcl_interfaces.msg import SetParametersResult
class RobotModeService(Node):
"""Service node for robot mode control."""
def __init__(self):
super().__init__('robot_mode_service')
# Service
self.srv = self.create_service(
SetBool,
'/robot/enable',
self.enable_callback
)
# State
self.enabled = False
# Parameters
self.declare_parameter('max_velocity', 1.0)
self.declare_parameter('safety_enabled', True)
self.get_logger().info('Robot mode service ready')
def enable_callback(self, request, response):
"""Handle enable/disable requests."""
if request.data:
# Enable robot
safety_ok = self.get_parameter('safety_enabled').value
if safety_ok:
self.enabled = True
response.success = True
response.message = 'Robot enabled'
self.get_logger().info('Robot ENABLED')
else:
response.success = False
response.message = 'Cannot enable: safety checks failed'
self.get_logger().warn('Enable request rejected: safety')
else:
# Disable robot
self.enabled = False
response.success = True
response.message = 'Robot disabled'
self.get_logger().info('Robot DISABLED')
return response
def main(args=None):
rclpy.init(args=args)
node = RobotModeService()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
```python
```python
if __name__ == '__main__':
main()
Test the service:
# In one terminal
python3 robot_service.py
# In another terminal
ros2 service list
ros2 service call /robot/enable std_srvs/srv/SetBool "{data: true}"
ros2 service call /robot/enable std_srvs/srv/SetBool "{data: false}"
Expected: Service responds to enable/disable requests with success/failure.
Step 5: Create Launch File​
Create a launch file to start all nodes:
#!/usr/bin/env python3
"""
robot_launch.py
Launch file for robot control system.
"""
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
def generate_launch_description():
"""Generate launch description."""
# Declare arguments
sim_arg = DeclareLaunchArgument(
'use_sim',
default_value='true',
description='Use simulation time'
)
rate_arg = DeclareLaunchArgument(
'control_rate',
default_value='100.0',
description='Control loop rate in Hz'
)
# Nodes
command_publisher = Node(
package='physical_ai_textbook',
executable='joint_publisher',
name='joint_command_publisher',
parameters=[{
'use_sim_time': LaunchConfiguration('use_sim'),
'publish_rate': LaunchConfiguration('control_rate')
}],
output='screen'
)
state_subscriber = Node(
package='physical_ai_textbook',
executable='joint_subscriber',
name='joint_state_subscriber',
parameters=[{
'use_sim_time': LaunchConfiguration('use_sim')
}],
output='screen'
)
mode_service = Node(
package='physical_ai_textbook',
executable='robot_service',
name='robot_mode_service',
parameters=[{
'max_velocity': 1.5,
'safety_enabled': True
}],
output='screen'
)
return LaunchDescription([
sim_arg,
rate_arg,
command_publisher,
state_subscriber,
mode_service
])
Expected: Launch file syntax is valid, all nodes can be started together.
Step 6: QoS Configuration for Real-Time​
Explore QoS settings impact:
#!/usr/bin/env python3
"""
qos_comparison.py
Compare different QoS settings for robot communication.
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float64MultiArray
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy
import time
class QoSComparisonNode(Node):
"""Test different QoS configurations."""
def __init__(self):
super().__init__('qos_comparison')
# QoS profiles to test
self.qos_profiles = {
'best_effort': QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1
),
'reliable': QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
),
'sensor_data': QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=5,
durability=QoSDurabilityPolicy.VOLATILE
),
'control_command': QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
)
}
# Statistics
self.message_counts = {name: 0 for name in self.qos_profiles}
self.latencies = {name: [] for name in self.qos_profiles}
# Create publishers for each profile
self.publishers = {}
self.subscribers = {}
for name, qos in self.qos_profiles.items():
topic = f'/test_{name}'
self.publishers[name] = self.create_publisher(
Float64MultiArray, topic, qos
)
self.subscribers[name] = self.create_subscription(
Float64MultiArray, topic,
lambda msg, n=name: self.callback(msg, n),
qos
)
# Timer for publishing
self.timer = self.create_timer(0.01, self.publish_all) # 100 Hz
self.start_time = time.time()
self.get_logger().info('QoS comparison node started')
def publish_all(self):
"""Publish to all topics."""
msg = Float64MultiArray()
msg.data = [time.time()] # Timestamp for latency measurement
for pub in self.publishers.values():
pub.publish(msg)
def callback(self, msg, profile_name):
"""Handle received message."""
self.message_counts[profile_name] += 1
# Compute latency
send_time = msg.data[0]
recv_time = time.time()
latency = (recv_time - send_time) * 1000 # ms
self.latencies[profile_name].append(latency)
def report(self):
"""Print statistics."""
duration = time.time() - self.start_time
print(f"\nQoS Comparison Results ({duration:.1f}s):")
print("-" * 60)
for name in self.qos_profiles:
count = self.message_counts[name]
lats = self.latencies[name]
if lats:
avg_lat = sum(lats) / len(lats)
max_lat = max(lats)
print(f"{name:20s}: {count:5d} msgs, "
f"avg latency: {avg_lat:.2f}ms, max: {max_lat:.2f}ms")
else:
print(f"{name:20s}: {count:5d} msgs, no latency data")
def main(args=None):
rclpy.init(args=args)
node = QoSComparisonNode()
try:
# Run for 10 seconds
timeout = 10.0
start = time.time()
```python
```python
while time.time() - start < timeout:
rclpy.spin_once(node, timeout_sec=0.1)
except KeyboardInterrupt:
pass
finally:
node.report()
node.destroy_node()
rclpy.shutdown()
```python
```python
if __name__ == '__main__':
main()
Expected: Different QoS profiles show different message delivery characteristics.
Expected Outcomes​
After completing this lab, you should have:
-
Code artifacts:
joint_publisher.py: Joint command publisherjoint_subscriber.py: Joint state subscriberrobot_service.py: Robot mode servicerobot_launch.py: Launch fileqos_comparison.py: QoS analysis tool
-
Understanding:
- ROS2 node architecture
- Topic-based communication for continuous data
- Service-based communication for discrete operations
- QoS profile selection for robotics applications
-
Running system:
- Multiple nodes communicating via topics and services
- Understanding of message latency and reliability
Rubric​
| Criterion | Points | Description |
|---|---|---|
| Publisher Node | 20 | Correct topic publishing at specified rate |
| Subscriber Node | 20 | Proper message handling and statistics |
| Service Node | 20 | Working request/response handling |
| Launch File | 15 | Valid launch configuration |
| QoS Analysis | 15 | Understanding of QoS tradeoffs |
| Documentation | 10 | Clear code comments and README |
| Total | 100 |
Common Errors​
Error: "No module named rclpy"
Solution: Ensure ROS2 is sourced: source /opt/ros/humble/setup.bash
Error: Topic not found by subscriber Solution: Check topic names match, verify QoS compatibility.
Error: Service timeout Solution: Ensure service node is running, check network configuration.
Extensions​
For advanced students:
- Actions: Implement action server for trajectory execution
- Parameter Server: Dynamic reconfiguration of controller gains
- Lifecycle Nodes: Proper startup/shutdown management
- Component Nodes: Efficient single-process multi-node setup
Related Content​
- Theory: Module 09 theory.md, Section 9.1 (ROS2 Architecture)
- Next Lab: Lab 09-02 (tf2 and Coordinate Frames)
- Application: Real robot integration, multi-robot systems