Source code for pantheonrl.algos.adap.adap_learn

"""
Modified implementation of PPO to support ADAP
"""
import warnings
from typing import Any, Dict, Optional, Type, Union

import numpy as np
import torch
from gymnasium import spaces
from torch.nn import functional as F

from stable_baselines3.common.on_policy_algorithm import OnPolicyAlgorithm
from stable_baselines3.common.type_aliases import (
    GymEnv,
    MaybeCallback,
    Schedule,
)
from stable_baselines3.common.utils import explained_variance, get_schedule_fn
from stable_baselines3.common.vec_env import VecEnv
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.buffers import RolloutBuffer

from stable_baselines3.common.utils import obs_as_tensor

from .util import SAMPLERS, get_context_kl_loss
from .policies import AdapPolicy


[docs] class ADAP(OnPolicyAlgorithm): """ ADAP Borrows from Proximal Policy Optimization algorithm (PPO) (clip version) """ def __init__( self, policy: Union[str, Type[AdapPolicy]], env: Union[GymEnv, str], learning_rate: Union[float, Schedule] = 3e-4, n_steps: int = 2048, batch_size: int = 64, n_epochs: int = 10, gamma: float = 0.99, gae_lambda: float = 0.95, clip_range: Union[float, Schedule] = 0.2, clip_range_vf: Union[None, float, Schedule] = None, normalize_advantage: bool = True, ent_coef: float = 0.0, vf_coef: float = 0.5, max_grad_norm: float = 0.5, use_sde: bool = False, sde_sample_freq: int = -1, target_kl: Optional[float] = None, stats_window_size: int = 100, tensorboard_log: Optional[str] = None, policy_kwargs: Optional[Dict[str, Any]] = None, verbose: int = 0, seed: Optional[int] = None, device: Union[torch.device, str] = "auto", _init_setup_model: bool = True, # New ADAP context_loss_coeff: float = 0.1, context_size: int = 3, num_context_samples: int = 5, context_sampler: str = "l2", num_state_samples: int = 32, ): if policy_kwargs is None: policy_kwargs = {} policy_kwargs["context_size"] = context_size super().__init__( policy, env, learning_rate=learning_rate, n_steps=n_steps, gamma=gamma, gae_lambda=gae_lambda, ent_coef=ent_coef, vf_coef=vf_coef, max_grad_norm=max_grad_norm, use_sde=use_sde, sde_sample_freq=sde_sample_freq, stats_window_size=stats_window_size, tensorboard_log=tensorboard_log, policy_kwargs=policy_kwargs, verbose=verbose, device=device, seed=seed, _init_setup_model=False, supported_action_spaces=( spaces.Box, spaces.Discrete, spaces.MultiDiscrete, spaces.MultiBinary, ), ) # Sanity check, otherwise it will lead to noisy gradient and NaN # because of the advantage normalization if normalize_advantage: assert ( batch_size > 1 ), "`batch_size` must be greater than 1. \ See https://github.com/DLR-RM/stable-baselines3/issues/440" if self.env is not None: # Check that `n_steps * n_envs > 1` to avoid NaN # when doing advantage normalization if self.env.action_space == spaces.Box: self.action_dist = "gaussian" else: self.action_dist = "categorical" buffer_size = self.env.num_envs * self.n_steps assert buffer_size > 1 or ( not normalize_advantage ), f"`n_steps * n_envs` must be greater than 1. \ Currently n_steps={self.n_steps} and n_envs={self.env.num_envs}" # Check that the rollout buffer size is # a multiple of the mini-batch size if buffer_size % batch_size > 0: warnings.warn( f"You have specified a mini-batch size of {batch_size}," f" but the `RolloutBuffer` is of size \ `n_steps * n_envs = {buffer_size}`." ) self.batch_size = batch_size self.n_epochs = n_epochs self.clip_range = clip_range self.clip_range_vf = clip_range_vf self.normalize_advantage = normalize_advantage self.target_kl = target_kl self.context_loss_coeff = context_loss_coeff self.num_state_samples = num_state_samples self.num_context_samples = num_context_samples self.context_sampler = context_sampler self.context_size = context_size self.full_obs_shape = None if _init_setup_model: self._setup_model()
[docs] def set_env(self, env, force_reset=True): """Set the env to use""" super().set_env(env, force_reset=force_reset) if self.env.action_space == spaces.Box: self.action_dist = "gaussian" else: self.action_dist = "categorical"
def _setup_model(self) -> None: super()._setup_model() sampled_context = SAMPLERS[self.context_sampler]( ctx_size=self.context_size, num=1, use_torch=True ) self.policy.set_context(sampled_context) # Initialize schedules for policy/value clipping self.clip_range = get_schedule_fn(self.clip_range) if self.clip_range_vf is not None: if isinstance(self.clip_range_vf, (float, int)): assert self.clip_range_vf > 0, ( "`clip_range_vf` must be positive, " "pass `None` to deactivate vf clipping" ) self.clip_range_vf = get_schedule_fn(self.clip_range_vf)
[docs] def collect_rollouts( self, env: VecEnv, callback: BaseCallback, rollout_buffer: RolloutBuffer, n_rollout_steps: int, ) -> bool: """ Collect rollouts using the current policy and fill a `RolloutBuffer`. The term rollout here refers to the model-free notion and should not be used with the concept of rollout used in model-based RL or planning. :param env: The training environment :param callback: Callback that will be called at each step (and at the beginning and end of the rollout) :param rollout_buffer: Buffer to fill with rollouts :param n_rollout_steps: Number of experiences to collect per env :return: True if function returned with at least `n_rollout_steps` collected, False if callback terminated rollout prematurely. """ assert ( self._last_obs is not None ), "No previous observation was provided" # Switch to eval mode (this affects batch norm / dropout) self.policy.set_training_mode(False) n_steps = 0 # ADAP ADDITION if self.full_obs_shape is None: self.full_obs_shape = ( rollout_buffer.obs_shape[0] + self.context_size, ) rollout_buffer.obs_shape = tuple(self.full_obs_shape) # ADAP END rollout_buffer.reset() # Sample new weights for the state dependent exploration if self.use_sde: self.policy.reset_noise(env.num_envs) callback.on_rollout_start() while n_steps < n_rollout_steps: if ( self.use_sde and self.sde_sample_freq > 0 and n_steps % self.sde_sample_freq == 0 ): # Sample a new noise matrix self.policy.reset_noise(env.num_envs) with torch.no_grad(): # Convert to pytorch tensor or to TensorDict obs_tensor = torch.cat( ( obs_as_tensor(self._last_obs, self.device).reshape( (1, -1) ), self.policy.get_context(), ), dim=1, ) actions, values, log_probs = self.policy(obs_tensor) actions = actions.cpu().numpy() # Rescale and perform action clipped_actions = actions if isinstance(self.action_space, spaces.Box): if self.policy.squash_output: # Unscale the actions to match env bounds # if they were previously squashed (scaled in [-1, 1]) clipped_actions = self.policy.unscale_action( clipped_actions ) else: # Otherwise, clip the actions to avoid out of bound error # as we are sampling from an unbounded Gaussian clipped_actions = np.clip( actions, self.action_space.low, self.action_space.high ) new_obs, rewards, dones, infos = env.step(clipped_actions) self.num_timesteps += env.num_envs # Give access to local variables callback.update_locals(locals()) if not callback.on_step(): return False self._update_info_buffer(infos) n_steps += 1 if isinstance(self.action_space, spaces.Discrete): # Reshape in case of discrete action actions = actions.reshape(-1, 1) # Handle timeout by bootstraping with value function # see GitHub issue #633 for idx, done in enumerate(dones): if ( done and infos[idx].get("terminal_observation") is not None and infos[idx].get("TimeLimit.truncated", False) ): terminal_obs = self.policy.obs_to_tensor( infos[idx]["terminal_observation"] )[0].reshape((1, -1)) terminal_obs = torch.cat( (terminal_obs, self.policy.get_context()), dim=1 ) with torch.no_grad(): terminal_value = self.policy.predict_values( terminal_obs )[0] rewards[idx] += self.gamma * terminal_value rollout_buffer.add( np.concatenate( (self._last_obs, self.policy.get_context()), axis=None ), # self._last_obs, # type: ignore[arg-type] actions, rewards, self._last_episode_starts, # type: ignore[arg-type] values, log_probs, ) self._last_obs = new_obs # type: ignore[assignment] self._last_episode_starts = dones # ADAP CHANGE: resample context if dones[0]: sampled_context = SAMPLERS[self.context_sampler]( ctx_size=self.context_size, num=1, use_torch=True ) self.policy.set_context(sampled_context) with torch.no_grad(): # Compute value for the last timestep values = self.policy.predict_values(torch.cat(( obs_as_tensor(self._last_obs, self.device).reshape((1, -1)), self.policy.get_context()), dim=1) ) rollout_buffer.compute_returns_and_advantage( last_values=values, dones=dones ) callback.update_locals(locals()) callback.on_rollout_end() return True
[docs] def train(self) -> None: """ Update policy using the currently gathered rollout buffer. """ # Switch to train mode (this affects batch norm / dropout) self.policy.set_training_mode(True) # Update optimizer learning rate self._update_learning_rate(self.policy.optimizer) # Compute current clip range clip_range = self.clip_range(self._current_progress_remaining) # Optional: clip range for the value function if self.clip_range_vf is not None: clip_range_vf = self.clip_range_vf( self._current_progress_remaining ) entropy_losses = [] pg_losses, value_losses = [], [] clip_fractions = [] continue_training = True # train for n_epochs epochs for epoch in range(self.n_epochs): approx_kl_divs = [] context_kl_divs = [] # Do a complete pass on the rollout buffer for rollout_data in self.rollout_buffer.get(self.batch_size): actions = rollout_data.actions if isinstance(self.action_space, spaces.Discrete): # Convert discrete action from float to long actions = rollout_data.actions.long().flatten() # Re-sample the noise matrix because the log_std has changed if self.use_sde: self.policy.reset_noise(self.batch_size) values, log_prob, entropy = self.policy.evaluate_actions( rollout_data.observations, actions ) values = values.flatten() # Normalize advantage advantages = rollout_data.advantages # Normalization does not make sense if mini batchsize == 1 if self.normalize_advantage and len(advantages) > 1: advantages = (advantages - advantages.mean()) / ( advantages.std() + 1e-8 ) # ratio between old and new policy ratio = torch.exp(log_prob - rollout_data.old_log_prob) # clipped surrogate loss policy_loss_1 = advantages * ratio policy_loss_2 = advantages * torch.clamp( ratio, 1 - clip_range, 1 + clip_range ) policy_loss = -torch.min(policy_loss_1, policy_loss_2).mean() # Logging pg_losses.append(policy_loss.item()) clip_fraction = torch.mean( (torch.abs(ratio - 1) > clip_range).float() ).item() clip_fractions.append(clip_fraction) if self.clip_range_vf is None: # No clipping values_pred = values else: # Clip the difference between old and new value # NOTE: this depends on the reward scaling values_pred = rollout_data.old_values + torch.clamp( values - rollout_data.old_values, -clip_range_vf, clip_range_vf, ) # Value loss using the TD(gae_lambda) target value_loss = F.mse_loss(rollout_data.returns, values_pred) value_losses.append(value_loss.item()) # Entropy loss favor exploration if entropy is None: # Approximate entropy when no analytical form entropy_loss = -torch.mean(-log_prob) else: entropy_loss = -torch.mean(entropy) entropy_losses.append(entropy_loss.item()) # Context loss for ADAP context_loss = get_context_kl_loss( self, self.policy, rollout_data ) context_kl_divs.append(context_loss.item()) loss = ( policy_loss + self.ent_coef * entropy_loss + self.vf_coef * value_loss + self.context_loss_coeff * context_loss ) with torch.no_grad(): log_ratio = log_prob - rollout_data.old_log_prob approx_kl_div = ( torch.mean((torch.exp(log_ratio) - 1) - log_ratio) .cpu() .numpy() ) approx_kl_divs.append(approx_kl_div) if ( self.target_kl is not None and approx_kl_div > 1.5 * self.target_kl ): continue_training = False if self.verbose >= 1: print( f"Early stopping at step {epoch} due to \ reaching max kl: {approx_kl_div:.2f}" ) break # Optimization step self.policy.optimizer.zero_grad() loss.backward() # Clip grad norm torch.nn.utils.clip_grad_norm_( self.policy.parameters(), self.max_grad_norm ) self.policy.optimizer.step() self._n_updates += 1 if not continue_training: break explained_var = explained_variance( self.rollout_buffer.values.flatten(), self.rollout_buffer.returns.flatten(), ) # Logs self.logger.record("train/entropy_loss", np.mean(entropy_losses)) self.logger.record("train/policy_gradient_loss", np.mean(pg_losses)) self.logger.record("train/value_loss", np.mean(value_losses)) self.logger.record("train/approx_kl", np.mean(approx_kl_divs)) self.logger.record("train/clip_fraction", np.mean(clip_fractions)) self.logger.record("train/loss", loss.item()) self.logger.record("train/explained_variance", explained_var) if hasattr(self.policy, "log_std"): self.logger.record( "train/std", torch.exp(self.policy.log_std).mean().item() ) self.logger.record( "train/n_updates", self._n_updates, exclude="tensorboard" ) self.logger.record("train/clip_range", clip_range) if self.clip_range_vf is not None: self.logger.record("train/clip_range_vf", clip_range_vf)
[docs] def learn( self, total_timesteps: int, callback: MaybeCallback = None, log_interval: int = 1, tb_log_name: str = "ADAP", reset_num_timesteps: bool = True, progress_bar: bool = False, ): return super().learn( total_timesteps=total_timesteps, callback=callback, log_interval=log_interval, tb_log_name=tb_log_name, reset_num_timesteps=reset_num_timesteps, progress_bar=progress_bar, )