Lab 10-01: Domain Randomization Fundamentals
Objectives​
By the end of this lab, you will be able to:
- Implement physics parameter randomization
- Apply visual domain randomization to camera inputs
- Configure dynamics randomization for robust control
- Analyze policy robustness across parameter distributions
Prerequisites​
- Completed Module 09 (ROS2 Integration)
- Understanding of control systems (Module 05)
- Familiarity with MuJoCo simulation
Materials​
| Type | Name | Tier | Notes |
|---|---|---|---|
| software | MuJoCo 3.0+ | required | Physics simulation |
| software | Python 3.10+ | required | Programming environment |
| software | NumPy, SciPy | required | Randomization |
| software | OpenCV | optional | Visual randomization |
Background​
The Sim-to-Real Gap​
Simulations differ from reality in:
- Physics (friction, damping, contact dynamics)
- Visuals (lighting, textures, reflections)
- Sensors (noise, latency, calibration)
- Actuation (delays, limits, backlash)
Domain Randomization Strategy​
Train on a distribution of environments so that reality appears as "just another sample" from the distribution.
Types of Randomization​
- Physics: Mass, friction, damping, joint dynamics
- Visual: Textures, lighting, camera parameters
- Dynamics: Actuator delays, gains, noise
- Observations: Sensor noise, calibration errors
Instructions​
Step 1: Set Up Randomization Framework​
Create a domain randomization system:
import mujoco
import numpy as np
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Optional
import copy
@dataclass
class RandomizationRange:
"""Defines a randomization range for a parameter."""
min_val: float
max_val: float
distribution: str = 'uniform' # 'uniform', 'normal', 'loguniform'
mean: float = 0.0 # For normal distribution
std: float = 1.0 # For normal distribution
```python
```python
def sample(self) -> float:
"""Sample a value from this range."""
```python
```python
if self.distribution == 'uniform':
return np.random.uniform(self.min_val, self.max_val)
elif self.distribution == 'normal':
val = np.random.normal(self.mean, self.std)
return np.clip(val, self.min_val, self.max_val)
elif self.distribution == 'loguniform':
log_min = np.log(self.min_val)
log_max = np.log(self.max_val)
return np.exp(np.random.uniform(log_min, log_max))
else:
raise ValueError(f"Unknown distribution: {self.distribution}")
class DomainRandomizer:
"""Apply domain randomization to MuJoCo models."""
def __init__(self, model_path: str):
"""
Initialize randomizer.
Args:
model_path: Path to MuJoCo XML model
"""
self.model_path = model_path
self.base_model = mujoco.MjModel.from_xml_path(model_path)
# Store default values
self._store_defaults()
# Randomization parameters
self.physics_ranges: Dict[str, RandomizationRange] = {}
self.visual_ranges: Dict[str, RandomizationRange] = {}
self.dynamics_ranges: Dict[str, RandomizationRange] = {}
# Current randomization values
self.current_params: Dict[str, float] = {}
def _store_defaults(self):
"""Store default model parameters."""
self.defaults = {
'body_mass': self.base_model.body_mass.copy(),
'geom_friction': self.base_model.geom_friction.copy(),
'dof_damping': self.base_model.dof_damping.copy(),
'actuator_gainprm': self.base_model.actuator_gainprm.copy(),
}
def add_physics_randomization(self, param_name: str, range_config: RandomizationRange):
"""Add physics parameter randomization."""
self.physics_ranges[param_name] = range_config
def add_dynamics_randomization(self, param_name: str, range_config: RandomizationRange):
"""Add dynamics parameter randomization."""
self.dynamics_ranges[param_name] = range_config
```python
```python
def randomize(self) -> mujoco.MjModel:
"""
Create a randomized model.
Returns:
Randomized MjModel
"""
# Create fresh model
model = mujoco.MjModel.from_xml_path(self.model_path)
self.current_params = {}
# Apply physics randomization
self._randomize_physics(model)
# Apply dynamics randomization
self._randomize_dynamics(model)
return model
def _randomize_physics(self, model: mujoco.MjModel):
"""Apply physics randomization."""
# Mass randomization
if 'mass_scale' in self.physics_ranges:
scale = self.physics_ranges['mass_scale'].sample()
model.body_mass[:] = self.defaults['body_mass'] * scale
self.current_params['mass_scale'] = scale
# Friction randomization
if 'friction_scale' in self.physics_ranges:
scale = self.physics_ranges['friction_scale'].sample()
model.geom_friction[:, 0] = self.defaults['geom_friction'][:, 0] * scale
self.current_params['friction_scale'] = scale
# Damping randomization
if 'damping_scale' in self.physics_ranges:
scale = self.physics_ranges['damping_scale'].sample()
model.dof_damping[:] = self.defaults['dof_damping'] * scale
self.current_params['damping_scale'] = scale
def _randomize_dynamics(self, model: mujoco.MjModel):
"""Apply dynamics randomization."""
# Actuator gain randomization
if 'actuator_gain_scale' in self.dynamics_ranges:
scale = self.dynamics_ranges['actuator_gain_scale'].sample()
model.actuator_gainprm[:, 0] = self.defaults['actuator_gainprm'][:, 0] * scale
self.current_params['actuator_gain_scale'] = scale
```python
```python
def get_current_params(self) -> Dict[str, float]:
"""Get current randomization parameters."""
return self.current_params.copy()
# Test the randomizer
print("Creating domain randomizer...")
randomizer = DomainRandomizer("textbook/assets/robot-models/mujoco/2dof-arm.xml")
# Configure randomization ranges
randomizer.add_physics_randomization(
'mass_scale',
RandomizationRange(0.8, 1.2, 'uniform')
)
randomizer.add_physics_randomization(
'friction_scale',
RandomizationRange(0.5, 2.0, 'loguniform')
)
randomizer.add_physics_randomization(
'damping_scale',
RandomizationRange(0.5, 1.5, 'uniform')
)
randomizer.add_dynamics_randomization(
'actuator_gain_scale',
RandomizationRange(0.9, 1.1, 'normal', mean=1.0, std=0.05)
)
print("Randomizer configured")
Expected: DomainRandomizer created with configurable parameter ranges.
Step 2: Implement Visual Randomization​
Add visual domain randomization:
import cv2
class VisualRandomizer:
"""Apply visual domain randomization to images."""
def __init__(self):
"""Initialize visual randomizer."""
self.randomization_enabled = {
'brightness': True,
'contrast': True,
'saturation': True,
'noise': True,
'blur': True,
'color_jitter': True
}
# Default ranges
self.brightness_range = (-0.3, 0.3)
self.contrast_range = (0.7, 1.3)
self.saturation_range = (0.7, 1.3)
self.noise_std_range = (0, 0.05)
self.blur_range = (0, 2)
```python
```python
def randomize(self, image: np.ndarray) -> np.ndarray:
"""
Apply visual randomization to image.
Args:
image: Input image (H, W, 3) uint8
Returns:
Randomized image
"""
# Convert to float for processing
img = image.astype(np.float32) / 255.0
# Brightness
if self.randomization_enabled['brightness']:
delta = np.random.uniform(*self.brightness_range)
img = img + delta
img = np.clip(img, 0, 1)
# Contrast
if self.randomization_enabled['contrast']:
factor = np.random.uniform(*self.contrast_range)
mean = np.mean(img)
img = (img - mean) * factor + mean
img = np.clip(img, 0, 1)
# Color jitter
if self.randomization_enabled['color_jitter']:
for c in range(3):
jitter = np.random.uniform(-0.1, 0.1)
img[:, :, c] = np.clip(img[:, :, c] + jitter, 0, 1)
# Saturation (HSV manipulation)
if self.randomization_enabled['saturation']:
hsv = cv2.cvtColor((img * 255).astype(np.uint8), cv2.COLOR_RGB2HSV)
factor = np.random.uniform(*self.saturation_range)
hsv[:, :, 1] = np.clip(hsv[:, :, 1] * factor, 0, 255)
img = cv2.cvtColor(hsv, cv2.COLOR_HSV2RGB).astype(np.float32) / 255.0
# Gaussian noise
if self.randomization_enabled['noise']:
std = np.random.uniform(*self.noise_std_range)
noise = np.random.normal(0, std, img.shape)
img = img + noise
img = np.clip(img, 0, 1)
# Gaussian blur
if self.randomization_enabled['blur']:
sigma = np.random.uniform(*self.blur_range)
```python
```python
if sigma > 0.1:
ksize = int(sigma * 4) | 1 # Ensure odd
img = cv2.GaussianBlur(img, (ksize, ksize), sigma)
# Convert back to uint8
return (img * 255).astype(np.uint8)
```python
```python
def randomize_camera_params(self) -> Dict[str, float]:
"""
Generate randomized camera parameters.
Returns:
Dict of camera parameter adjustments
"""
return {
'fov_scale': np.random.uniform(0.95, 1.05),
'position_noise': np.random.normal(0, 0.01, 3),
'rotation_noise': np.random.normal(0, 0.02, 3), # Euler angles
}
# Test visual randomization
visual_rand = VisualRandomizer()
# Create test image
test_image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
test_image[:, :, 0] = 100 # Red background
test_image[200:280, 280:360, :] = [0, 255, 0] # Green square
# Apply randomization
randomized = visual_rand.randomize(test_image)
print(f"Original range: [{test_image.min()}, {test_image.max()}]")
print(f"Randomized range: [{randomized.min()}, {randomized.max()}]")
Expected: VisualRandomizer applies brightness, contrast, noise, and blur.
Step 3: Implement Sensor Noise Randomization​
Add realistic sensor noise models:
class SensorNoiseRandomizer:
"""Apply randomized sensor noise."""
def __init__(self):
"""Initialize sensor noise models."""
# Joint encoder noise
self.encoder_noise_std = RandomizationRange(0.0001, 0.001, 'loguniform')
# IMU noise
self.gyro_noise_std = RandomizationRange(0.001, 0.01, 'loguniform')
self.gyro_bias = RandomizationRange(-0.01, 0.01, 'uniform')
self.accel_noise_std = RandomizationRange(0.01, 0.1, 'loguniform')
self.accel_bias = RandomizationRange(-0.1, 0.1, 'uniform')
# Force sensor noise
self.force_noise_std = RandomizationRange(0.1, 1.0, 'loguniform')
# Current noise parameters (resampled each episode)
self.current_noise_params = {}
def resample_noise_params(self):
"""Resample noise parameters for new episode."""
self.current_noise_params = {
'encoder_noise_std': self.encoder_noise_std.sample(),
'gyro_noise_std': self.gyro_noise_std.sample(),
'gyro_bias': np.array([self.gyro_bias.sample() for _ in range(3)]),
'accel_noise_std': self.accel_noise_std.sample(),
'accel_bias': np.array([self.accel_bias.sample() for _ in range(3)]),
'force_noise_std': self.force_noise_std.sample()
}
return self.current_noise_params
```python
```python
def add_encoder_noise(self, positions: np.ndarray) -> np.ndarray:
"""Add noise to joint encoder readings."""
std = self.current_noise_params.get('encoder_noise_std', 0.0005)
noise = np.random.normal(0, std, positions.shape)
return positions + noise
```python
```python
def add_imu_noise(self, gyro: np.ndarray, accel: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Add noise to IMU readings."""
gyro_std = self.current_noise_params.get('gyro_noise_std', 0.005)
gyro_bias = self.current_noise_params.get('gyro_bias', np.zeros(3))
gyro_noisy = gyro + np.random.normal(0, gyro_std, 3) + gyro_bias
accel_std = self.current_noise_params.get('accel_noise_std', 0.05)
accel_bias = self.current_noise_params.get('accel_bias', np.zeros(3))
accel_noisy = accel + np.random.normal(0, accel_std, 3) + accel_bias
return gyro_noisy, accel_noisy
```python
```python
def add_force_noise(self, force: np.ndarray) -> np.ndarray:
"""Add noise to force sensor readings."""
std = self.current_noise_params.get('force_noise_std', 0.5)
noise = np.random.normal(0, std, force.shape)
return force + noise
# Test sensor noise
sensor_noise = SensorNoiseRandomizer()
params = sensor_noise.resample_noise_params()
print("Sensor noise parameters:")
for k, v in params.items():
if isinstance(v, np.ndarray):
print(f" {k}: {v}")
else:
print(f" {k}: {v:.6f}")
# Test noise application
positions = np.array([0.0, 1.57])
noisy_positions = sensor_noise.add_encoder_noise(positions)
print(f"\nOriginal positions: {positions}")
print(f"Noisy positions: {noisy_positions}")
Expected: SensorNoiseRandomizer adds configurable noise to all sensor types.
Step 4: Run Randomized Training Episodes​
Create training loop with randomization:
class RandomizedEnvironment:
"""Environment with domain randomization."""
def __init__(self, model_path: str):
self.physics_rand = DomainRandomizer(model_path)
self.visual_rand = VisualRandomizer()
self.sensor_rand = SensorNoiseRandomizer()
# Configure physics randomization
self.physics_rand.add_physics_randomization(
'mass_scale', RandomizationRange(0.8, 1.2)
)
self.physics_rand.add_physics_randomization(
'friction_scale', RandomizationRange(0.5, 2.0, 'loguniform')
)
self.physics_rand.add_physics_randomization(
'damping_scale', RandomizationRange(0.5, 1.5)
)
self.physics_rand.add_dynamics_randomization(
'actuator_gain_scale', RandomizationRange(0.9, 1.1)
)
# Current model and data
self.model = None
self.data = None
# Episode statistics
self.episode_params = {}
```python
```python
def reset(self, randomize: bool = True) -> np.ndarray:
"""
Reset environment with optional randomization.
Args:
randomize: Whether to apply domain randomization
Returns:
Initial observation
"""
if randomize:
# Create randomized model
self.model = self.physics_rand.randomize()
self.episode_params = {
'physics': self.physics_rand.get_current_params(),
'sensor': self.sensor_rand.resample_noise_params()
}
else:
# Use base model
self.model = mujoco.MjModel.from_xml_path(self.physics_rand.model_path)
self.episode_params = {'physics': {}, 'sensor': {}}
self.data = mujoco.MjData(self.model)
mujoco.mj_resetData(self.model, self.data)
mujoco.mj_forward(self.model, self.data)
return self._get_observation()
```python
```python
def step(self, action: np.ndarray) -> Tuple[np.ndarray, float, bool, dict]:
"""
Execute action in environment.
Returns:
obs, reward, done, info
"""
# Apply action
self.data.ctrl[:] = action
# Step simulation
mujoco.mj_step(self.model, self.data)
# Get observation with sensor noise
obs = self._get_observation()
# Compute reward (example: distance to target)
reward = self._compute_reward()
# Check termination
done = self._check_done()
info = {'params': self.episode_params}
return obs, reward, done, info
```python
```python
def _get_observation(self) -> np.ndarray:
"""Get observation with sensor noise."""
# Joint positions and velocities
pos = self.sensor_rand.add_encoder_noise(self.data.qpos.copy())
vel = self.sensor_rand.add_encoder_noise(self.data.qvel.copy())
return np.concatenate([pos, vel])
```python
```python
def _compute_reward(self) -> float:
"""Compute reward."""
# Example: negative distance to origin
return -np.linalg.norm(self.data.qpos)
```python
```python
def _check_done(self) -> bool:
"""Check if episode is done."""
return False
def run_randomized_training(env, n_episodes=10, steps_per_episode=100):
"""Run training with domain randomization."""
all_rewards = []
all_params = []
for ep in range(n_episodes):
obs = env.reset(randomize=True)
episode_reward = 0
for step in range(steps_per_episode):
# Random action for testing
action = np.random.uniform(-1, 1, env.model.nu)
obs, reward, done, info = env.step(action)
episode_reward += reward
if done:
break
all_rewards.append(episode_reward)
all_params.append(env.episode_params)
print(f"Episode {ep}: reward={episode_reward:.2f}, "
f"mass_scale={env.episode_params['physics'].get('mass_scale', 1.0):.2f}")
return all_rewards, all_params
# Run training
print("\nRunning randomized training...")
env = RandomizedEnvironment("textbook/assets/robot-models/mujoco/2dof-arm.xml")
rewards, params = run_randomized_training(env, n_episodes=5)
Expected: Multiple episodes run with different randomization parameters.
Step 5: Analyze Robustness​
Evaluate policy robustness across parameter space:
def analyze_robustness(env, policy_fn, param_name: str, param_range: Tuple[float, float],
n_samples: int = 10, steps: int = 100):
"""
Analyze policy robustness across a parameter range.
Args:
env: Environment
policy_fn: Policy function (obs -> action)
param_name: Parameter to vary
param_range: (min, max) range
n_samples: Number of samples
steps: Steps per evaluation
Returns:
results: Dict with analysis results
"""
param_values = np.linspace(param_range[0], param_range[1], n_samples)
rewards = []
success_rates = []
for param_val in param_values:
episode_rewards = []
# Run multiple episodes at this parameter
for _ in range(5):
# Configure specific parameter
```python
```python
if param_name == 'mass_scale':
env.physics_rand.physics_ranges['mass_scale'] = RandomizationRange(
param_val, param_val # Fixed value
)
obs = env.reset(randomize=True)
ep_reward = 0
success = True
for _ in range(steps):
action = policy_fn(obs)
obs, reward, done, info = env.step(action)
ep_reward += reward
if done:
break
episode_rewards.append(ep_reward)
rewards.append(np.mean(episode_rewards))
success_rates.append(sum(1 for r in episode_rewards if r > -50) / len(episode_rewards))
results = {
'param_name': param_name,
'param_values': param_values,
'rewards': np.array(rewards),
'success_rates': np.array(success_rates)
}
# Compute robustness metrics
results['mean_reward'] = np.mean(rewards)
results['std_reward'] = np.std(rewards)
results['min_reward'] = np.min(rewards)
results['robustness_ratio'] = results['min_reward'] / results['mean_reward']
return results
# Simple policy for testing
def simple_policy(obs):
"""PD control policy."""
target = np.array([0.5, 0.5])
pos = obs[:2]
vel = obs[2:4]
kp, kd = 10.0, 1.0
return kp * (target - pos) - kd * vel
# Analyze robustness
print("\nAnalyzing robustness to mass variation...")
results = analyze_robustness(
env,
simple_policy,
'mass_scale',
(0.5, 1.5),
n_samples=5,
steps=50
)
print(f"\nRobustness Analysis Results:")
print(f" Parameter: {results['param_name']}")
print(f" Mean reward: {results['mean_reward']:.2f}")
print(f" Std reward: {results['std_reward']:.2f}")
print(f" Min reward: {results['min_reward']:.2f}")
print(f" Robustness ratio: {results['robustness_ratio']:.2f}")
Expected: Policy performance measured across parameter variations.
Step 6: Curriculum Randomization​
Implement progressive randomization:
class CurriculumRandomizer:
"""Progressive domain randomization curriculum."""
def __init__(self, base_randomizer: DomainRandomizer):
self.base_rand = base_randomizer
# Curriculum stages (0 = no randomization, 1 = full)
self.stage = 0.0
self.stage_increment = 0.1
# Target ranges at full curriculum
self.target_ranges = {
'mass_scale': (0.5, 1.5),
'friction_scale': (0.3, 3.0),
'damping_scale': (0.3, 2.0),
'actuator_gain_scale': (0.8, 1.2)
}
# Current ranges
self.current_ranges = {}
self._update_ranges()
def _update_ranges(self):
"""Update randomization ranges based on current stage."""
for param, (target_min, target_max) in self.target_ranges.items():
# Interpolate from no randomization to target range
center = 1.0
current_min = center - self.stage * (center - target_min)
current_max = center + self.stage * (target_max - center)
self.current_ranges[param] = (current_min, current_max)
# Update base randomizer
if param in self.base_rand.physics_ranges:
self.base_rand.physics_ranges[param] = RandomizationRange(
current_min, current_max
)
elif param in self.base_rand.dynamics_ranges:
self.base_rand.dynamics_ranges[param] = RandomizationRange(
current_min, current_max
)
def advance_curriculum(self, success_rate: float, threshold: float = 0.8):
"""
Advance curriculum if performance is good.
Args:
success_rate: Recent success rate
threshold: Required success rate to advance
Returns:
advanced: Whether curriculum was advanced
"""
```python
```python
if success_rate >= threshold and self.stage < 1.0:
self.stage = min(1.0, self.stage + self.stage_increment)
self._update_ranges()
return True
return False
```python
```python
def get_stage_info(self) -> dict:
"""Get current curriculum information."""
return {
'stage': self.stage,
'ranges': self.current_ranges.copy()
}
# Test curriculum
curriculum = CurriculumRandomizer(randomizer)
print("Curriculum progression:")
for i in range(12):
info = curriculum.get_stage_info()
print(f"Stage {info['stage']:.1f}: mass_scale range = {info['ranges']['mass_scale']}")
# Simulate advancement
curriculum.advance_curriculum(success_rate=0.85)
Expected: Curriculum progressively increases randomization range.
Expected Outcomes​
After completing this lab, you should have:
-
Code artifacts:
domain_randomizer.py: Physics and dynamics randomizationvisual_randomizer.py: Image augmentationsensor_noise.py: Sensor noise modelscurriculum.py: Progressive curriculum
-
Understanding:
- Types of domain randomization
- Distribution selection for parameters
- Robustness analysis methods
- Curriculum design principles
Rubric​
| Criterion | Points | Description |
|---|---|---|
| Physics Randomization | 25 | Mass, friction, damping variation |
| Visual Randomization | 20 | Image augmentation pipeline |
| Sensor Noise | 20 | Realistic noise models |
| Robustness Analysis | 20 | Parameter sweep evaluation |
| Curriculum | 15 | Progressive randomization |
| Total | 100 |
Common Errors​
Error: Simulation unstable after randomization Solution: Limit extreme parameter values, check physics timestep.
Error: Training doesn't converge with randomization Solution: Start with smaller ranges, use curriculum.
Error: Visual randomization too aggressive Solution: Tune brightness/contrast ranges based on task.
Extensions​
For advanced students:
- Automatic Domain Randomization (ADR): Adapt ranges based on performance
- Adversarial Randomization: Find hardest parameter combinations
- Multi-Task Randomization: Different randomization per task
- Real2Sim: Estimate real-world parameters for calibration
Related Content​
- Theory: Module 10 theory.md, Section 10.1 (Domain Gap)
- Next Lab: Lab 10-02 (System Identification)
- Application: Sim-to-real robot transfer