"""
Implementation of the Modular Algorithm.
"""
import time
from typing import Any, Dict, Optional, Type, Union
import warnings
import gymnasium as gym
import numpy as np
import torch
from gymnasium import spaces
from torch.nn import functional as F
from stable_baselines3.common.on_policy_algorithm import OnPolicyAlgorithm
from stable_baselines3.common.buffers import RolloutBuffer
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.policies import ActorCriticPolicy
from stable_baselines3.common.type_aliases import (
GymEnv,
MaybeCallback,
Schedule,
)
from stable_baselines3.common.utils import safe_mean
from stable_baselines3.common.vec_env import VecEnv
from stable_baselines3.common.utils import get_schedule_fn
[docs]
class ModularAlgorithm(OnPolicyAlgorithm):
"""
The base for On-Policy algorithms (ex: A2C/PPO).
"""
def __init__(
self,
policy: Union[str, Type[ActorCriticPolicy]],
env: Union[GymEnv, str],
learning_rate: Union[float, Schedule] = 3e-4,
n_steps: int = 2048,
batch_size: Optional[int] = 64,
n_epochs: int = 10,
gamma: float = 0.99,
gae_lambda: float = 0.95,
clip_range: Union[float, Schedule] = 0.2,
clip_range_vf: Union[None, float, Schedule] = None,
ent_coef: float = 0.0,
vf_coef: float = 0.5,
max_grad_norm: float = 0.5,
use_sde: bool = False,
sde_sample_freq: int = -1,
target_kl: Optional[float] = None,
tensorboard_log: Optional[str] = None,
policy_kwargs: Optional[Dict[str, Any]] = None,
verbose: int = 0,
seed: Optional[int] = None,
device: Union[torch.device, str] = "auto",
_init_setup_model: bool = True,
# my additional arguments
marginal_reg_coef: float = 0.0,
):
super().__init__(
policy,
env,
learning_rate=learning_rate,
n_steps=n_steps,
gamma=gamma,
gae_lambda=gae_lambda,
ent_coef=ent_coef,
vf_coef=vf_coef,
max_grad_norm=max_grad_norm,
use_sde=use_sde,
sde_sample_freq=sde_sample_freq,
tensorboard_log=tensorboard_log,
policy_kwargs=policy_kwargs,
verbose=verbose,
device=device,
seed=seed,
_init_setup_model=False,
supported_action_spaces=(
spaces.Box,
spaces.Discrete,
spaces.MultiDiscrete,
spaces.MultiBinary,
),
)
self.marginal_reg_coef = marginal_reg_coef
# Sanity check, otherwise it will lead to noisy gradient and NaN
# because of the advantage normalization
assert (
batch_size > 1
), "`batch_size` must be greater than 1. \
See https://github.com/DLR-RM/stable-baselines3/issues/440"
if self.env is not None:
# Check that `n_steps * n_envs > 1` to avoid NaN
# when doing advantage normalization
buffer_size = self.env.num_envs * self.n_steps
assert (
buffer_size > 1
), f"`n_steps * n_envs` must be greater than 1. \
Currently n_steps={self.n_steps} and n_envs={self.env.num_envs}"
# Check that the rollout buffer size is
# a multiple of the mini-batch size
if buffer_size % batch_size > 0:
warnings.warn(
f"You have specified a mini-batch size of {batch_size},"
f" but the `RolloutBuffer` is of size \
`n_steps * n_envs = {buffer_size}`."
)
self.batch_size = batch_size
self.n_epochs = n_epochs
self.clip_range = clip_range
self.clip_range_vf = clip_range_vf
self.target_kl = target_kl
self._last_dones = None
if _init_setup_model:
self._setup_model()
def _setup_model(self) -> None:
# OnPolicyAlgorithm's _setup_model
self._setup_lr_schedule()
self.set_random_seed(self.seed)
self.policy = self.policy_class( # pytype:disable=not-instantiable
self.observation_space,
self.action_space,
self.lr_schedule,
use_sde=self.use_sde,
**self.policy_kwargs, # pytype:disable=not-instantiable
)
self.policy = self.policy.to(self.device)
self.rollout_buffer = [
RolloutBuffer(
self.n_steps,
self.observation_space,
self.action_space,
self.device,
gamma=self.gamma,
gae_lambda=self.gae_lambda,
n_envs=self.n_envs,
)
for _ in range(self.policy.num_partners)
]
# PPO's _setup_model
# Initialize schedules for policy/value clipping
self.clip_range = get_schedule_fn(self.clip_range)
if self.clip_range_vf is not None:
if isinstance(self.clip_range_vf, (float, int)):
assert self.clip_range_vf > 0, (
"`clip_range_vf` must be positive, "
"pass `None` to deactivate vf clipping"
)
self.clip_range_vf = get_schedule_fn(self.clip_range_vf)
[docs]
def collect_rollouts(
self,
env: VecEnv,
callback: BaseCallback,
rollout_buffer: RolloutBuffer,
n_rollout_steps: int,
partner_idx: int = 0,
) -> bool:
"""
Collect rollouts using the current policy and fill a `RolloutBuffer`.
:param env: (VecEnv) The training environment
:param callback: (BaseCallback) Callback that will be called at each
step (and at the beginning and end of the rollout)
:param rollout_buffer: (RolloutBuffer) Buffer to fill with rollouts
:param n_steps: (int) Number of experiences to collect per environment
:return: (bool) True if function returned with at least
`n_rollout_steps` collected, False if callback terminated rollout
prematurely.
"""
assert (
self._last_obs is not None
), "No previous observation was provided"
n_steps = 0
rollout_buffer.reset()
# Sample new weights for the state dependent exploration
if self.use_sde:
self.policy.reset_noise(env.num_envs)
callback.on_rollout_start()
self._last_dones = None
while n_steps < n_rollout_steps:
self.env.envs[0].set_partnerid(partner_idx)
if (
self.use_sde
and self.sde_sample_freq > 0
and n_steps % self.sde_sample_freq == 0
):
# Sample a new noise matrix
self.policy.reset_noise(env.num_envs)
with torch.no_grad():
# Convert to pytorch tensor
obs_tensor = torch.as_tensor(self._last_obs).to(self.device)
# actions, values, log_probs = self.policy.forward(obs_tensor)
actions, values, log_probs = self.policy.forward(
obs_tensor, partner_idx=partner_idx
)
actions = actions.cpu().numpy()
# Rescale and perform action
clipped_actions = actions
# Clip the actions to avoid out of bound error
if isinstance(self.action_space, gym.spaces.Box):
clipped_actions = np.clip(
actions, self.action_space.low, self.action_space.high
)
env.envs[0].set_partnerid(partner_idx)
new_obs, rewards, dones, infos = env.step(clipped_actions)
if callback.on_step() is False:
return False
self._update_info_buffer(infos)
n_steps += 1
self.num_timesteps += env.num_envs
if isinstance(self.action_space, gym.spaces.Discrete):
# Reshape in case of discrete action
actions = actions.reshape(-1, 1)
rollout_buffer.add(
self._last_obs,
actions,
rewards,
self._last_dones,
values,
log_probs,
)
self._last_obs = new_obs
self._last_dones = dones
rollout_buffer.compute_returns_and_advantage(values, dones=dones)
callback.on_rollout_end()
return True
[docs]
def train(self) -> None:
"""
Update policy using the currently gathered
rollout buffer.
"""
# Update optimizer learning rate
self._update_learning_rate(self.policy.optimizer)
# Compute current clip range
clip_range = self.clip_range(self._current_progress_remaining)
# Optional: clip range for the value function
if self.clip_range_vf is not None:
clip_range_vf = self.clip_range_vf(
self._current_progress_remaining
)
entropy_losses, all_kl_divs = [], []
pg_losses, value_losses = [], []
clip_fractions = []
for partner_idx in range(self.policy.num_partners):
# train for gradient_steps epochs
for epoch in range(self.n_epochs):
approx_kl_divs = []
# Do a complete pass on the rollout buffer
# for rollout_data in self.rollout_buffer.get(self.batch_size):
for rollout_data in self.rollout_buffer[partner_idx].get(
self.batch_size
):
actions = rollout_data.actions
if isinstance(self.action_space, spaces.Discrete):
# Convert discrete action from float to long
actions = rollout_data.actions.long().flatten()
# Re-sample the noise matrix because the log_std changed
# investigate why there is no issue with the gradient
# if that line is commented (as in SAC)
if self.use_sde:
self.policy.reset_noise(self.batch_size)
values, log_prob, entropy = self.policy.evaluate_actions(
rollout_data.observations,
actions,
partner_idx=partner_idx,
)
values = values.flatten()
# Normalize advantage
advantages = rollout_data.advantages
advantages = (advantages - advantages.mean()) / (
advantages.std() + 1e-8
)
# ratio between old and new policy, should be
# one at the first iteration
ratio = torch.exp(log_prob - rollout_data.old_log_prob)
# clipped surrogate loss
policy_loss_1 = advantages * ratio
policy_loss_2 = advantages * torch.clamp(
ratio, 1 - clip_range, 1 + clip_range
)
policy_loss = -torch.min(policy_loss_1, policy_loss_2).mean()
# Logging
pg_losses.append(policy_loss.item())
clip_fraction = torch.mean(
(torch.abs(ratio - 1) > clip_range).float()
).item()
clip_fractions.append(clip_fraction)
if self.clip_range_vf is None:
# No clipping
values_pred = values
else:
# Clip the different between old and new value
# NOTE: this depends on the reward scaling
values_pred = rollout_data.old_values + torch.clamp(
values - rollout_data.old_values,
-clip_range_vf,
clip_range_vf,
)
# Value loss using the TD(gae_lambda) target
value_loss = F.mse_loss(rollout_data.returns, values_pred)
value_losses.append(value_loss.item())
# Entropy loss favor exploration
if entropy is None:
# Approximate entropy when no analytical form
entropy_loss = log_prob.mean()
else:
entropy_loss = -torch.mean(entropy)
entropy_losses.append(entropy_loss.item())
###########
# Marginal Regularization
###########
# each action_dist is a Distribution object containing
# self.batch_size observations dist.distribution.probs
# returns shape (self.batch_size, self.action_space)
# dist.sample() returns a tensor of shape (self.batch_size)
# careful: must extract torch distribution object from
# stable_baseline Distribution object, otherwise old
# references get overwritten
main_logits, partner_logits = zip(
*[
self.policy.get_action_logits_from_obs(
rollout_data.observations, partner_idx=idx
)
for idx in range(self.policy.num_partners)
]
)
main_logits = torch.stack(
list(main_logits)
) # (num_partners, self.batch_size, self.action_space)
partner_logits = torch.stack(
list(partner_logits)
) # (num_partners, self.batch_size, self.action_space)
composed_logits = main_logits + partner_logits
# Regularize main prob to be the marginals
# Wasserstein metric with unitary distances
# (for categorical actions)
main_probs = torch.mean(
torch.exp(
main_logits
- main_logits.logsumexp(dim=-1, keepdim=True)
),
dim=0,
)
composed_probs = torch.mean(
torch.exp(
composed_logits
- composed_logits.logsumexp(dim=-1, keepdim=True)
),
dim=0,
)
marginal_regularization_loss = torch.mean(
torch.sum(torch.abs(main_probs - composed_probs), dim=1)
)
###########
loss = (
policy_loss
+ self.ent_coef * entropy_loss
+ self.vf_coef * value_loss
+ self.marginal_reg_coef * marginal_regularization_loss
)
# Optimization step
self.policy.optimizer.zero_grad()
loss.backward()
# Clip grad norm
torch.nn.utils.clip_grad_norm_(
self.policy.parameters(), self.max_grad_norm
)
self.policy.optimizer.step()
approx_kl_divs.append(
torch.mean(rollout_data.old_log_prob - log_prob)
.detach()
.cpu()
.numpy()
)
all_kl_divs.append(np.mean(approx_kl_divs))
if (
self.target_kl is not None
and np.mean(approx_kl_divs) > 1.5 * self.target_kl
):
print(
f"Early stopping at step {epoch} due to reaching \
max kl: {np.mean(approx_kl_divs):.2f}"
)
break
self._n_updates += self.n_epochs
# Logs
self.logger.record("train/entropy_loss", np.mean(entropy_losses))
self.logger.record("train/policy_gradient_loss", np.mean(pg_losses))
self.logger.record("train/value_loss", np.mean(value_losses))
[docs]
def learn(
self,
total_timesteps: int,
callback: MaybeCallback = None,
log_interval: int = 1,
tb_log_name: str = "OnPolicyAlgorithm",
reset_num_timesteps: bool = True,
progress_bar: bool = False
) -> "OnPolicyAlgorithm":
iteration = 0
self.env.envs[0].set_resample_policy("null")
total_timesteps, callback = self._setup_learn(
total_timesteps,
callback,
reset_num_timesteps,
tb_log_name,
progress_bar
)
callback.on_training_start(locals(), globals())
while self.num_timesteps < total_timesteps:
for partner_idx in range(self.policy.num_partners):
self.env.envs[0].set_partnerid(partner_idx)
continue_training = self.collect_rollouts(
self.env,
callback,
self.rollout_buffer[partner_idx],
n_rollout_steps=self.n_steps,
partner_idx=partner_idx,
)
if continue_training is False:
break
iteration += 1
self._update_current_progress_remaining(
self.num_timesteps, total_timesteps
)
# Display training infos
if log_interval is not None and iteration % log_interval == 0:
fps = int(self.num_timesteps / (time.time() - self.start_time))
self.logger.record(
"time/iterations", iteration, exclude="tensorboard"
)
if (
len(self.ep_info_buffer) > 0
and len(self.ep_info_buffer[0]) > 0
):
self.logger.record(
"rollout/ep_rew_mean",
safe_mean(
[ep_info["r"] for ep_info in self.ep_info_buffer]
),
)
self.logger.record(
"rollout/ep_len_mean",
safe_mean(
[ep_info["l"] for ep_info in self.ep_info_buffer]
),
)
self.logger.record("time/fps", fps)
self.logger.record(
"time/time_elapsed",
int(time.time() - self.start_time),
exclude="tensorboard",
)
self.logger.record(
"time/total_timesteps",
self.num_timesteps,
exclude="tensorboard",
)
self.logger.dump(step=self.num_timesteps)
self.train()
callback.on_training_end()
return self