"""
Modified implementation of PPO to support ADAP
"""
import warnings
from typing import Any, Dict, Optional, Type, Union
import numpy as np
import torch
from gymnasium import spaces
from torch.nn import functional as F
from stable_baselines3.common.on_policy_algorithm import OnPolicyAlgorithm
from stable_baselines3.common.type_aliases import (
GymEnv,
MaybeCallback,
Schedule,
)
from stable_baselines3.common.utils import explained_variance, get_schedule_fn
from stable_baselines3.common.vec_env import VecEnv
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.buffers import RolloutBuffer
from stable_baselines3.common.utils import obs_as_tensor
from .util import SAMPLERS, get_context_kl_loss
from .policies import AdapPolicy
[docs]
class ADAP(OnPolicyAlgorithm):
"""
ADAP
Borrows from Proximal Policy Optimization algorithm (PPO) (clip version)
"""
def __init__(
self,
policy: Union[str, Type[AdapPolicy]],
env: Union[GymEnv, str],
learning_rate: Union[float, Schedule] = 3e-4,
n_steps: int = 2048,
batch_size: int = 64,
n_epochs: int = 10,
gamma: float = 0.99,
gae_lambda: float = 0.95,
clip_range: Union[float, Schedule] = 0.2,
clip_range_vf: Union[None, float, Schedule] = None,
normalize_advantage: bool = True,
ent_coef: float = 0.0,
vf_coef: float = 0.5,
max_grad_norm: float = 0.5,
use_sde: bool = False,
sde_sample_freq: int = -1,
target_kl: Optional[float] = None,
stats_window_size: int = 100,
tensorboard_log: Optional[str] = None,
policy_kwargs: Optional[Dict[str, Any]] = None,
verbose: int = 0,
seed: Optional[int] = None,
device: Union[torch.device, str] = "auto",
_init_setup_model: bool = True,
# New ADAP
context_loss_coeff: float = 0.1,
context_size: int = 3,
num_context_samples: int = 5,
context_sampler: str = "l2",
num_state_samples: int = 32,
):
if policy_kwargs is None:
policy_kwargs = {}
policy_kwargs["context_size"] = context_size
super().__init__(
policy,
env,
learning_rate=learning_rate,
n_steps=n_steps,
gamma=gamma,
gae_lambda=gae_lambda,
ent_coef=ent_coef,
vf_coef=vf_coef,
max_grad_norm=max_grad_norm,
use_sde=use_sde,
sde_sample_freq=sde_sample_freq,
stats_window_size=stats_window_size,
tensorboard_log=tensorboard_log,
policy_kwargs=policy_kwargs,
verbose=verbose,
device=device,
seed=seed,
_init_setup_model=False,
supported_action_spaces=(
spaces.Box,
spaces.Discrete,
spaces.MultiDiscrete,
spaces.MultiBinary,
),
)
# Sanity check, otherwise it will lead to noisy gradient and NaN
# because of the advantage normalization
if normalize_advantage:
assert (
batch_size > 1
), "`batch_size` must be greater than 1. \
See https://github.com/DLR-RM/stable-baselines3/issues/440"
if self.env is not None:
# Check that `n_steps * n_envs > 1` to avoid NaN
# when doing advantage normalization
if self.env.action_space == spaces.Box:
self.action_dist = "gaussian"
else:
self.action_dist = "categorical"
buffer_size = self.env.num_envs * self.n_steps
assert buffer_size > 1 or (
not normalize_advantage
), f"`n_steps * n_envs` must be greater than 1. \
Currently n_steps={self.n_steps} and n_envs={self.env.num_envs}"
# Check that the rollout buffer size is
# a multiple of the mini-batch size
if buffer_size % batch_size > 0:
warnings.warn(
f"You have specified a mini-batch size of {batch_size},"
f" but the `RolloutBuffer` is of size \
`n_steps * n_envs = {buffer_size}`."
)
self.batch_size = batch_size
self.n_epochs = n_epochs
self.clip_range = clip_range
self.clip_range_vf = clip_range_vf
self.normalize_advantage = normalize_advantage
self.target_kl = target_kl
self.context_loss_coeff = context_loss_coeff
self.num_state_samples = num_state_samples
self.num_context_samples = num_context_samples
self.context_sampler = context_sampler
self.context_size = context_size
self.full_obs_shape = None
if _init_setup_model:
self._setup_model()
[docs]
def set_env(self, env, force_reset=True):
"""Set the env to use"""
super().set_env(env, force_reset=force_reset)
if self.env.action_space == spaces.Box:
self.action_dist = "gaussian"
else:
self.action_dist = "categorical"
def _setup_model(self) -> None:
super()._setup_model()
sampled_context = SAMPLERS[self.context_sampler](
ctx_size=self.context_size, num=1, use_torch=True
)
self.policy.set_context(sampled_context)
# Initialize schedules for policy/value clipping
self.clip_range = get_schedule_fn(self.clip_range)
if self.clip_range_vf is not None:
if isinstance(self.clip_range_vf, (float, int)):
assert self.clip_range_vf > 0, (
"`clip_range_vf` must be positive, "
"pass `None` to deactivate vf clipping"
)
self.clip_range_vf = get_schedule_fn(self.clip_range_vf)
[docs]
def collect_rollouts(
self,
env: VecEnv,
callback: BaseCallback,
rollout_buffer: RolloutBuffer,
n_rollout_steps: int,
) -> bool:
"""
Collect rollouts using the current policy and fill a `RolloutBuffer`.
The term rollout here refers to the model-free notion and should not
be used with the concept of rollout used in model-based RL or planning.
:param env: The training environment
:param callback: Callback that will be called at each step
(and at the beginning and end of the rollout)
:param rollout_buffer: Buffer to fill with rollouts
:param n_rollout_steps: Number of experiences to collect per env
:return: True if function returned with at least `n_rollout_steps`
collected, False if callback terminated rollout prematurely.
"""
assert (
self._last_obs is not None
), "No previous observation was provided"
# Switch to eval mode (this affects batch norm / dropout)
self.policy.set_training_mode(False)
n_steps = 0
# ADAP ADDITION
if self.full_obs_shape is None:
self.full_obs_shape = (
rollout_buffer.obs_shape[0] + self.context_size,
)
rollout_buffer.obs_shape = tuple(self.full_obs_shape)
# ADAP END
rollout_buffer.reset()
# Sample new weights for the state dependent exploration
if self.use_sde:
self.policy.reset_noise(env.num_envs)
callback.on_rollout_start()
while n_steps < n_rollout_steps:
if (
self.use_sde
and self.sde_sample_freq > 0
and n_steps % self.sde_sample_freq == 0
):
# Sample a new noise matrix
self.policy.reset_noise(env.num_envs)
with torch.no_grad():
# Convert to pytorch tensor or to TensorDict
obs_tensor = torch.cat(
(
obs_as_tensor(self._last_obs, self.device).reshape(
(1, -1)
),
self.policy.get_context(),
),
dim=1,
)
actions, values, log_probs = self.policy(obs_tensor)
actions = actions.cpu().numpy()
# Rescale and perform action
clipped_actions = actions
if isinstance(self.action_space, spaces.Box):
if self.policy.squash_output:
# Unscale the actions to match env bounds
# if they were previously squashed (scaled in [-1, 1])
clipped_actions = self.policy.unscale_action(
clipped_actions
)
else:
# Otherwise, clip the actions to avoid out of bound error
# as we are sampling from an unbounded Gaussian
clipped_actions = np.clip(
actions, self.action_space.low, self.action_space.high
)
new_obs, rewards, dones, infos = env.step(clipped_actions)
self.num_timesteps += env.num_envs
# Give access to local variables
callback.update_locals(locals())
if not callback.on_step():
return False
self._update_info_buffer(infos)
n_steps += 1
if isinstance(self.action_space, spaces.Discrete):
# Reshape in case of discrete action
actions = actions.reshape(-1, 1)
# Handle timeout by bootstraping with value function
# see GitHub issue #633
for idx, done in enumerate(dones):
if (
done
and infos[idx].get("terminal_observation") is not None
and infos[idx].get("TimeLimit.truncated", False)
):
terminal_obs = self.policy.obs_to_tensor(
infos[idx]["terminal_observation"]
)[0].reshape((1, -1))
terminal_obs = torch.cat(
(terminal_obs, self.policy.get_context()), dim=1
)
with torch.no_grad():
terminal_value = self.policy.predict_values(
terminal_obs
)[0]
rewards[idx] += self.gamma * terminal_value
rollout_buffer.add(
np.concatenate(
(self._last_obs, self.policy.get_context()), axis=None
),
# self._last_obs, # type: ignore[arg-type]
actions,
rewards,
self._last_episode_starts, # type: ignore[arg-type]
values,
log_probs,
)
self._last_obs = new_obs # type: ignore[assignment]
self._last_episode_starts = dones
# ADAP CHANGE: resample context
if dones[0]:
sampled_context = SAMPLERS[self.context_sampler](
ctx_size=self.context_size, num=1, use_torch=True
)
self.policy.set_context(sampled_context)
with torch.no_grad():
# Compute value for the last timestep
values = self.policy.predict_values(torch.cat((
obs_as_tensor(self._last_obs, self.device).reshape((1, -1)),
self.policy.get_context()), dim=1)
)
rollout_buffer.compute_returns_and_advantage(
last_values=values, dones=dones
)
callback.update_locals(locals())
callback.on_rollout_end()
return True
[docs]
def train(self) -> None:
"""
Update policy using the currently gathered rollout buffer.
"""
# Switch to train mode (this affects batch norm / dropout)
self.policy.set_training_mode(True)
# Update optimizer learning rate
self._update_learning_rate(self.policy.optimizer)
# Compute current clip range
clip_range = self.clip_range(self._current_progress_remaining)
# Optional: clip range for the value function
if self.clip_range_vf is not None:
clip_range_vf = self.clip_range_vf(
self._current_progress_remaining
)
entropy_losses = []
pg_losses, value_losses = [], []
clip_fractions = []
continue_training = True
# train for n_epochs epochs
for epoch in range(self.n_epochs):
approx_kl_divs = []
context_kl_divs = []
# Do a complete pass on the rollout buffer
for rollout_data in self.rollout_buffer.get(self.batch_size):
actions = rollout_data.actions
if isinstance(self.action_space, spaces.Discrete):
# Convert discrete action from float to long
actions = rollout_data.actions.long().flatten()
# Re-sample the noise matrix because the log_std has changed
if self.use_sde:
self.policy.reset_noise(self.batch_size)
values, log_prob, entropy = self.policy.evaluate_actions(
rollout_data.observations, actions
)
values = values.flatten()
# Normalize advantage
advantages = rollout_data.advantages
# Normalization does not make sense if mini batchsize == 1
if self.normalize_advantage and len(advantages) > 1:
advantages = (advantages - advantages.mean()) / (
advantages.std() + 1e-8
)
# ratio between old and new policy
ratio = torch.exp(log_prob - rollout_data.old_log_prob)
# clipped surrogate loss
policy_loss_1 = advantages * ratio
policy_loss_2 = advantages * torch.clamp(
ratio, 1 - clip_range, 1 + clip_range
)
policy_loss = -torch.min(policy_loss_1, policy_loss_2).mean()
# Logging
pg_losses.append(policy_loss.item())
clip_fraction = torch.mean(
(torch.abs(ratio - 1) > clip_range).float()
).item()
clip_fractions.append(clip_fraction)
if self.clip_range_vf is None:
# No clipping
values_pred = values
else:
# Clip the difference between old and new value
# NOTE: this depends on the reward scaling
values_pred = rollout_data.old_values + torch.clamp(
values - rollout_data.old_values,
-clip_range_vf,
clip_range_vf,
)
# Value loss using the TD(gae_lambda) target
value_loss = F.mse_loss(rollout_data.returns, values_pred)
value_losses.append(value_loss.item())
# Entropy loss favor exploration
if entropy is None:
# Approximate entropy when no analytical form
entropy_loss = -torch.mean(-log_prob)
else:
entropy_loss = -torch.mean(entropy)
entropy_losses.append(entropy_loss.item())
# Context loss for ADAP
context_loss = get_context_kl_loss(
self, self.policy, rollout_data
)
context_kl_divs.append(context_loss.item())
loss = (
policy_loss
+ self.ent_coef * entropy_loss
+ self.vf_coef * value_loss
+ self.context_loss_coeff * context_loss
)
with torch.no_grad():
log_ratio = log_prob - rollout_data.old_log_prob
approx_kl_div = (
torch.mean((torch.exp(log_ratio) - 1) - log_ratio)
.cpu()
.numpy()
)
approx_kl_divs.append(approx_kl_div)
if (
self.target_kl is not None
and approx_kl_div > 1.5 * self.target_kl
):
continue_training = False
if self.verbose >= 1:
print(
f"Early stopping at step {epoch} due to \
reaching max kl: {approx_kl_div:.2f}"
)
break
# Optimization step
self.policy.optimizer.zero_grad()
loss.backward()
# Clip grad norm
torch.nn.utils.clip_grad_norm_(
self.policy.parameters(), self.max_grad_norm
)
self.policy.optimizer.step()
self._n_updates += 1
if not continue_training:
break
explained_var = explained_variance(
self.rollout_buffer.values.flatten(),
self.rollout_buffer.returns.flatten(),
)
# Logs
self.logger.record("train/entropy_loss", np.mean(entropy_losses))
self.logger.record("train/policy_gradient_loss", np.mean(pg_losses))
self.logger.record("train/value_loss", np.mean(value_losses))
self.logger.record("train/approx_kl", np.mean(approx_kl_divs))
self.logger.record("train/clip_fraction", np.mean(clip_fractions))
self.logger.record("train/loss", loss.item())
self.logger.record("train/explained_variance", explained_var)
if hasattr(self.policy, "log_std"):
self.logger.record(
"train/std", torch.exp(self.policy.log_std).mean().item()
)
self.logger.record(
"train/n_updates", self._n_updates, exclude="tensorboard"
)
self.logger.record("train/clip_range", clip_range)
if self.clip_range_vf is not None:
self.logger.record("train/clip_range_vf", clip_range_vf)
[docs]
def learn(
self,
total_timesteps: int,
callback: MaybeCallback = None,
log_interval: int = 1,
tb_log_name: str = "ADAP",
reset_num_timesteps: bool = True,
progress_bar: bool = False,
):
return super().learn(
total_timesteps=total_timesteps,
callback=callback,
log_interval=log_interval,
tb_log_name=tb_log_name,
reset_num_timesteps=reset_num_timesteps,
progress_bar=progress_bar,
)