legged_robot.py - LeggedRobot Class API Reference#
This document provides comprehensive API documentation for the LeggedRobot class, the base environment class for all legged robot tasks in the LeggedGym-Ex framework.
Overview#
The LeggedRobot class extends BaseTask and provides the core functionality for training legged robot locomotion policies using reinforcement learning. It manages simulation environments, handles observations and rewards, processes termination conditions, and orchestrates the training loop.
All robot-specific environments (Go2, G1, K1, TRON1) inherit from this base class, either directly or through specialized variants like LeggedRobotTS (Teacher-Student), LeggedRobotEE (Explicit Estimator), or LeggedRobotAMP (Adversarial Motion Priors).
Type Aliases#
The class uses several type aliases for clarity and documentation:
ObsBuf = Tensor # Shape: (num_envs, obs_dim)
Action = Tensor # Shape: (num_envs, num_actions)
Reward = Tensor # Shape: (num_envs,)
EnvIds = Tensor # Shape: (num_reset_envs,) - integer tensor of environment indices
Class Definition#
class LeggedRobot(BaseTask):
"""Base environment for legged robot locomotion tasks.
Provides core functionality for:
- Multi-environment simulation management
- Observation and reward computation
- Termination checking and environment resetting
- Curriculum learning support
- Domain randomization
"""
Initialization#
__init__()#
def __init__(
self,
cfg: LeggedRobotCfg,
sim_params: dict[str, Any],
sim_device: str | int,
headless: bool
) -> None
Initialize the legged robot environment.
Parameters:
cfg (
LeggedRobotCfg): Environment configuration containing robot, terrain, control, and reward parameters. Must includeenv,normalization,sim, andcontrolsections.sim_params (
dict[str, Any]): Dictionary of simulation parameters passed to the simulator backend (IsaacGym, Genesis, or IsaacSim).sim_device (
str | int): Device for running the simulation. Can be'cuda','cpu', or a device ID integer like0or'cuda:0'.headless (
bool): IfTrue, run without rendering for faster training.
Raises:
AssertionError: If configuration is missing required sections or has invalid values.
Example:
from legged_gym.envs.go2.go2_config import GO2Cfg
cfg = GO2Cfg()
sim_params = {"dt": 0.005, "substeps": 1}
env = LeggedRobot(cfg, sim_params, "cuda:0", headless=True)
Validation:
The initialization performs extensive configuration validation to catch errors early:
Validates required config sections (
env,normalization,sim,control)Checks observation and action dimensions are positive
Ensures simulator creates valid number of environments
Prepares reward functions and initializes buffers
Core Methods#
step()#
def step(
self,
actions: Action
) -> Tuple[ObsBuf, ObsBuf | None, Reward, Tensor, Dict[str, Any]]
Execute one simulation step with the given actions.
Applies actions to the robots, advances the physics simulation, then processes observations, rewards, and termination states. This is the main interface for the RL training loop.
Parameters:
actions (
Action): Action tensor of shape(num_envs, num_actions)containing target joint positions or torques depending on control mode. Must befloat32dtype.
Returns:
A tuple containing:
obs_buf (
ObsBuf): Observation buffer of shape(num_envs, obs_dim).privileged_obs_buf (
ObsBuf | None): Privileged observations of shape(num_envs, privileged_obs_dim)orNoneif not using asymmetric actor-critic.rew_buf (
Reward): Reward buffer of shape(num_envs,).reset_buf (
Tensor): Reset flags of shape(num_envs,)indicating which environments need reset (1 for reset, 0 otherwise).extras (
Dict[str, Any]): Dictionary of additional information including episode statistics and curriculum data.
Raises:
AssertionError: If action shape or dtype is incorrect.
Example:
# Standard RL training loop
actions = policy(obs) # Policy network output
obs, priv_obs, rew, done, info = env.step(actions)
# Check for episode statistics
if "episode" in info:
for key, value in info["episode"].items():
print(f"{key}: {value}")
Validation:
Verifies action shape matches
(num_envs, num_actions)Ensures actions are
float32dtypeClips observations to configured range
reset_idx()#
def reset_idx(self, env_ids: EnvIds) -> None
Reset specified environments to initial states.
Performs complete reset of specified environments including curriculum updates, command resampling, DOF state reset, and buffer resets. This method is called automatically for terminated environments, but can also be called manually.
Parameters:
env_ids (
EnvIds): Integer tensor of shape(num_reset_envs,)containing indices of environments to reset. Can be an empty tensor.
Example:
# Reset specific environments
env_ids = torch.tensor([0, 5, 10], device="cuda")
env.reset_idx(env_ids)
# Reset all environments
env_ids = torch.arange(env.num_envs, device="cuda")
env.reset_idx(env_ids)
Reset Process:
Updates terrain curriculum if enabled
Updates command curriculum if enabled
Resamples velocity commands
Resets DOF positions with small random perturbations
Resets root states (base position, orientation, velocities)
Clears action history buffers
Resets episode length and failure counters
Logs episode statistics to
extrasdict
reset()#
Inherited from BaseTask. Resets all environments.
def reset(self) -> Tuple[ObsBuf, ObsBuf | None]:
"""Reset all environments."""
env_ids = torch.arange(self.num_envs, device=self.device)
self.reset_idx(env_ids)
return self.obs_buf, self.privileged_obs_buf
compute_observations()#
def compute_observations(self) -> None
Compute observations for all environments.
Constructs observation tensors from simulator states and applies normalization scales. This method is called automatically after each physics step, but can be overridden to customize observation structure.
Default Observation Components:
Component |
Shape |
Description |
Scale Factor |
|---|---|---|---|
Base linear velocity |
(3,) |
Base velocity in body frame |
|
Projected gravity |
(3,) |
Body z-axis in world frame |
1.0 |
Base angular velocity |
(3,) |
Angular velocity in body frame |
|
Commands |
(3,) |
Linear x, linear y, angular yaw velocities |
|
DOF positions |
(num_dofs,) |
Deviation from default position |
|
DOF velocities |
(num_dofs,) |
Joint velocities |
|
Actions |
(num_actions,) |
Previous action for history |
1.0 |
Optional Components:
Height measurements: Added if
cfg.terrain.measure_heightsisTrue. Shape:(num_height_points,), scaled byobs_scales.height_measurements.Observation noise: Added if
cfg.noise.add_noiseisTrue. Uses uniform noise scaled bynoise_scale_vec.
Privileged Observations:
If num_privileged_obs is set, computes privileged_obs_buf with additional information:
Component |
Shape |
Description |
|---|---|---|
Standard observations |
- |
All components from regular obs |
Last actions |
(num_actions,) |
Action two steps ago |
Friction values |
(1,) |
Domain randomized friction |
Added base mass |
(1,) |
Domain randomized mass |
Base CoM bias |
(3,) |
Domain randomized CoM offset |
Push velocities |
(2,) |
Domain randomized push velocities |
Updates:
self.obs_buf: Shape(num_envs, obs_dim)self.privileged_obs_buf: Shape(num_envs, privileged_obs_dim)orNone
Example Override:
class MyRobot(LeggedRobot):
def compute_observations(self):
# Call parent to get base observations
super().compute_observations()
# Add custom observations
custom_obs = self._compute_custom_features()
self.obs_buf = torch.cat([self.obs_buf, custom_obs], dim=-1)
Important:
When overriding this method, ensure the final observation size matches cfg.env.num_observations. The class includes validation that raises AssertionError if sizes don’t match.
get_observations()#
Inherited from BaseTask. Returns current observations without recomputing.
def get_observations(self) -> Tuple[ObsBuf, ObsBuf | None]:
"""Get current observations."""
return self.obs_buf, self.privileged_obs_buf
check_termination()#
def check_termination(self) -> None
Check termination conditions and update reset buffer.
Evaluates three termination conditions for each environment:
Contact termination: Body contacts with termination bodies exceed force threshold (10.0N).
Orientation termination: Projected gravity z-component exceeds
cfg.env.max_projected_gravity.Timeout termination: Episode exceeds
max_episode_length.
Updates:
self.fail_buf: Tracks consecutive failures for graceful termination logic.self.time_out_buf: Indicates episodes that timed out (not actual failures).self.reset_buf: Indicates environments needing reset.
Graceful Termination:
The fail_buf counter allows for graceful termination: environments are only reset after failures persist for cfg.env.fail_to_terminal_time_s / dt steps, preventing premature termination from brief perturbations.
check_timeout()#
Implicitly handled in check_termination(). Checks if episode length exceeds maximum.
self.time_out_buf = self.episode_length_buf > self.max_episode_length
compute_reward()#
def compute_reward(self) -> None
Compute rewards for all environments.
Iterates through all reward functions with non-zero scales (defined in cfg.rewards.scales), computes each reward term, scales it by dt, and accumulates into the total reward.
Reward Structure:
Rewards are defined by the configuration’s scales dictionary:
cfg.rewards.scales.tracking_lin_vel = 1.0
cfg.rewards.scales.lin_vel_z = -0.5
cfg.rewards.scales.termination = -0.0
Updates:
self.rew_buf: Total reward of shape(num_envs,).self.episode_sums: Dictionary tracking cumulative rewards per term for logging.
Processing:
Zero-scales are removed from consideration
Non-zero scales are multiplied by
dtfor proper integrationEach reward function
_reward_<name>()is calledOptionally clips rewards to positive values if
cfg.rewards.only_positive_rewardsAdds termination penalty after clipping
post_physics_step()#
def post_physics_step(self) -> None
Process environment state after physics step.
This method orchestrates the RL training loop logic after each simulation step. It’s called automatically by step() and should not be called directly.
Execution Order:
Update episode and step counters
Process simulator post-physics callbacks
Run custom callbacks (
_post_physics_step_callback)Check termination conditions
Compute rewards
Reset terminated environments
Update sensors
Compute observations
Draw debug visualizations (if enabled)
set_viewer_camera()#
def set_viewer_camera(
self,
pos: np.ndarray,
lookat: np.ndarray
) -> None
Set viewer camera position and orientation for rendering.
Parameters:
pos (
np.ndarray): Camera position in world frame, shape(3,).lookat (
np.ndarray): Point to look at in world frame, shape(3,).
Example:
# Position camera above and behind robot
pos = np.array([2.0, 0.0, 1.0])
lookat = np.array([0.0, 0.0, 0.5])
env.set_viewer_camera(pos, lookat)
Key Attributes#
Observation and State Buffers#
Attribute |
Type |
Shape |
Description |
|---|---|---|---|
|
|
|
Main observation buffer |
|
|
|
Privileged observations for asymmetric training |
|
|
|
Reward buffer |
|
|
|
Reset flags |
|
|
|
Timeout flags |
Robot State#
Attribute |
Type |
Shape |
Description |
|---|---|---|---|
|
|
|
Joint positions via |
|
|
|
Joint velocities via |
|
|
|
Joint accelerations (computed) |
|
|
|
Base position in world frame |
|
|
|
Base orientation quaternion (x, y, z, w) |
|
|
|
Base linear velocity in world frame |
|
|
|
Base angular velocity in body frame |
Control and Actions#
Attribute |
Type |
Shape |
Description |
|---|---|---|---|
|
|
|
Current actions |
|
|
|
Actions from previous step |
|
|
|
Actions from two steps ago |
|
|
|
Velocity commands |
Episode Tracking#
Attribute |
Type |
Shape |
Description |
|---|---|---|---|
|
|
|
Current episode length in steps |
|
|
|
Consecutive failure counter |
|
|
- |
Cumulative rewards per term |
|
|
- |
Additional episode information |
Reward Auto-Discovery Mechanism#
The LeggedRobot class implements a powerful auto-discovery mechanism for reward functions. This allows you to define reward terms by simply implementing methods with the naming convention _reward_<name>().
How It Works#
Configuration: Define reward scales in your config:
class MyRobotCfg(LeggedRobotCfg):
class rewards:
class scales:
tracking_lin_vel = 1.0
lin_vel_z = -0.5
orientation = -0.5
termination = -0.0
Implementation: Implement corresponding methods:
class MyRobot(LeggedRobot):
def _reward_tracking_lin_vel(self) -> Reward:
"""Reward for tracking linear velocity commands."""
lin_vel_error = torch.sum(torch.square(
self.commands[:, :2] - self.simulator.base_lin_vel[:, :2]), dim=1)
return torch.exp(-lin_vel_error / self.cfg.rewards.tracking_sigma)
def _reward_lin_vel_z(self) -> Reward:
"""Penalize z-axis linear velocity."""
return torch.square(self.simulator.base_lin_vel[:, 2])
Auto-Discovery: During initialization,
_prepare_reward_function()automatically:Finds all reward scales with non-zero values
Looks for corresponding
_reward_<name>()methodsValidates that methods exist
Creates a list of callable reward functions
Multiplies scales by
dtfor proper integration
Built-in Reward Functions#
The base class provides many reward functions ready to use:
Method |
Description |
Typical Use |
|---|---|---|
|
Track linear velocity commands |
Velocity tracking tasks |
|
Track angular velocity commands |
Turning behavior |
|
Penalize vertical velocity |
Stable locomotion |
|
Penalize roll/pitch rates |
Stable locomotion |
|
Penalize non-flat base |
Stability |
|
Maintain target height |
Posture control |
|
Penalize large torques |
Energy efficiency |
|
Penalize joint velocities |
Smooth motion |
|
Penalize joint accelerations |
Smooth motion |
|
Penalize action changes |
Smooth control |
|
Penalize body collisions |
Safety |
|
Reward proper swing phase |
Gait quality |
|
Penalize joint limits |
Safety |
|
Penalize torque limits |
Hardware safety |
Custom Rewards#
To add custom rewards:
class MyRobot(LeggedRobot):
def _reward_custom_balance(self) -> Reward:
"""Custom reward for balance."""
# Implement your reward logic
balance_error = torch.abs(self.simulator.projected_gravity[:, 2] - 1.0)
return torch.exp(-balance_error)
Then add to config:
class rewards:
class scales:
custom_balance = 0.5
Validation#
The system includes validation that raises AssertionError if:
No reward scales are defined
A reward scale is defined but no corresponding method exists
Method doesn’t return correct shape
(num_envs,)
Callback Methods (Protected)#
These methods can be overridden to customize behavior:
_pre_sim_step()#
def _pre_sim_step(self, actions: Action) -> Action
Called at the beginning of step(), before simulation. Default implementation clips actions and updates action history.
_post_physics_step_callback()#
def _post_physics_step_callback(self) -> None
Called after physics step, before termination checks. Default implementation:
Resamples commands periodically
Handles heading commands
Applies random pushes to robots
Override Example:
class MyRobot(LeggedRobot):
def _post_physics_step_callback(self):
# Call parent first
super()._post_physics_step_callback()
# Add custom logic
self._update_custom_curriculum()
_resample_commands()#
def _resample_commands(self, env_ids: EnvIds) -> None
Resample velocity commands for specified environments. Called during reset and periodically during episodes.
_reset_dofs()#
def _reset_dofs(self, env_ids: EnvIds) -> None
Reset DOF states for specified environments. Default adds small random perturbations to default positions.
_reset_root_states()#
def _reset_root_states(self, env_ids: EnvIds) -> None
Reset root states (base position, orientation, velocities) for specified environments.
_update_terrain_curriculum()#
def _update_terrain_curriculum(self, env_ids: EnvIds) -> None
Update terrain difficulty based on robot performance. Implements game-inspired curriculum where robots that travel far advance to harder terrains.
_update_command_curriculum()#
def _update_command_curriculum(self, env_ids: EnvIds) -> None
Increase command difficulty based on tracking performance.
Common Override Patterns#
Custom Observations#
class MyRobot(LeggedRobot):
def compute_observations(self):
# Get base observations
super().compute_observations()
# Add custom sensor data
sensor_data = self._get_sensor_readings() # Shape: (num_envs, sensor_dim)
self.obs_buf = torch.cat([self.obs_buf, sensor_data], dim=-1)
# Update noise vector if needed
if self.add_noise:
self.noise_scale_vec = torch.cat([
self.noise_scale_vec,
torch.zeros(self.num_envs, sensor_dim, device=self.device)
], dim=-1)
Important: When overriding compute_observations(), you must also update cfg.env.num_observations to match the new observation dimension.
Custom Termination#
class MyRobot(LeggedRobot):
def check_termination(self):
# Call parent for default termination conditions
super().check_termination()
# Add custom termination
custom_failure = self._check_custom_failure_condition()
self.fail_buf += custom_failure
# Update reset buffer
self.reset_buf = (
(self.fail_buf > self.cfg.env.fail_to_terminal_time_s / self.dt)
| self.time_out_buf
)
Custom Reward#
class MyRobot(LeggedRobot):
def _reward_gait_quality(self) -> Reward:
"""Reward smooth, natural gait patterns."""
# Compute phase-based gait metric
gait_error = self._compute_gait_phase_error()
return torch.exp(-gait_error / 0.1)
Domain Randomization Customization#
class MyRobot(LeggedRobot):
def _post_physics_step_callback(self):
super()._post_physics_step_callback()
# Add custom randomization
if self.common_step_counter % 1000 == 0:
self._randomize_custom_parameters()
Best Practices#
Configuration Validation#
The base class includes extensive validation. Always ensure:
Config has required sections:
env,normalization,sim,controlnum_observationsmatches actual observation sizenum_actionsmatches DOF countReward scales have corresponding
_reward_*()methods
Observation Modifications#
When modifying observations:
Update
cfg.env.num_observationsUpdate
_get_noise_scale_vec()if using noiseTest with assertions enabled to catch size mismatches early
Reward Function Implementation#
Always return shape
(num_envs,)Use exponential rewards for tracking tasks:
torch.exp(-error/sigma)Use squared penalties for regularization:
torch.square(quantity)Document reward purpose and parameters
Performance Considerations#
Use vectorized operations instead of loops
Avoid Python control flow in reward functions
Cache frequently accessed attributes
Use
torch.no_grad()for non-training computations
See Also#
legged_robot_config.py - Configuration parameters
Adding a New Robot to LeggedGym-Ex - Guide on adding new robots
Developing Custom Reward Functions - Guide on custom reward functions