Source code for pantheonrl.envs.blockworldgym.simpleblockworld

"""
Simplified version of blockworld.
"""

# create a gym for a simplified version of blockworld
# which i can use as a starting point to create the normal
# blockworld gym environment

import gymnasium as gym
import numpy as np

from pantheonrl.common.agents import Agent
from pantheonrl.common.multiagentenv import TurnBasedEnv

GRIDLEN = 7  # block world in a 7 x 7 grid
NUM_BLOCKS = (
    5  # the number of blocks will be variable in the non-simplified version,
)
# but allows for a constant sized action space here
NUM_COLORS = 2
NO_COLOR = 0
BLUE = 1
RED = 2  # useful for if we add graphics later

NUM_TOKENS = 16  # number of tokens the planner has

PLANNER_ACTION_SPACE = gym.spaces.Discrete(
    NUM_TOKENS
)  # tokens that represent words
CONSTRUCTOR_ACTION_SPACE = gym.spaces.MultiDiscrete(
    [NUM_BLOCKS, NUM_COLORS + 1]
)
# in the simplified version, the constructor's action space is
# just coloring each block

# for each block, store h/v, coordinate, and color
blocklistformat = [2, GRIDLEN, GRIDLEN, NUM_COLORS + 1] * NUM_BLOCKS
# in the simplified version, constructor can see blocks
CONSTRUCTOR_OBS_SPACE = gym.spaces.MultiDiscrete(
    [NUM_TOKENS] + blocklistformat
)
# constructor's obs space and true colorings
PLANNER_OBS_SPACE = gym.spaces.MultiDiscrete(blocklistformat + blocklistformat)


[docs] def generate_grid_world(np_random): """ Generates a random GRIDLEN x GRIDLEN world with NUM_BLOCKS blocks Will be replaced in the true version with their generate gridworld function, which has gravity/var blocks/etc """ world = np.zeros((GRIDLEN, GRIDLEN)) blocks_so_far = 0 grid_world = [] while blocks_so_far < NUM_BLOCKS: new_block = random_block(np_random) y = new_block[1] x = new_block[2] if new_block[0] == 0: # horizontal if world[y][x] == 1 or world[y][x + 1] == 1: continue world[y][x] = 1 world[y][x + 1] = 1 else: # vertical if world[y][x] == 1 or world[y + 1][x] == 1: continue world[y][x] = 1 world[y + 1][x] = 1 grid_world.append(new_block) blocks_so_far += 1 return grid_world
[docs] def random_block(np_random): """ Places random block in the grid """ block = [] if np_random.integers(0, 2) == 0: # horizontal block.append(0) x = np_random.integers(0, GRIDLEN - 1) y = np_random.integers(0, GRIDLEN) else: block.append(1) x = np_random.integers(0, GRIDLEN) y = np_random.integers(0, GRIDLEN - 1) block.append(y) block.append(x) block.append(np_random.integers(0, NUM_COLORS) + 1) return block
[docs] class SimpleBlockEnv(TurnBasedEnv): """ Simple blockworld environment. """ def __init__(self): super().__init__( [PLANNER_OBS_SPACE, CONSTRUCTOR_OBS_SPACE], [PLANNER_ACTION_SPACE, CONSTRUCTOR_ACTION_SPACE], probegostart=1, ) self.viewer = None self.gridworld = None self.constructor_obs = None self.last_token = None
[docs] def multi_reset(self, egofirst): self.gridworld = generate_grid_world(self.np_random) self.constructor_obs = [ [block[0], block[1], block[2], 0] for block in self.gridworld ] self.last_token = 0 self.viewer = None return self._get_obs(egofirst)
def _get_obs(self, isego): if isego: return np.array([self.gridworld, self.constructor_obs]).flatten() observations = [ elem for block in self.constructor_obs for elem in block ] output = np.array(([self.last_token] + observations)) return output
[docs] def ego_step(self, action): self.last_token = action # the planner decides when done by taking action NUM_TOKENS - 1 done = action == NUM_TOKENS - 1 reward = [0, 0] if done: reward = self._get_reward() return self._get_obs(False), reward, done, {}
[docs] def alt_step(self, action): self.constructor_obs[action[0]][3] = action[1] return self._get_obs(True), [0, 0], False, {}
def _get_reward(self): # for simplified version, 100 * # colored correctly / total blocks # (in the actual one, use F1 score) correct_blocks = 0 for i in range(NUM_BLOCKS): if self.gridworld[i][3] == self.constructor_obs[i][3]: correct_blocks += 1 reward = 100 * correct_blocks / NUM_BLOCKS return [reward, reward] # since they both get the same reward
[docs] class SBWEasyPartner(Agent): """ Easy partner in the simple blockworld """
[docs] def get_action(self, obs): obs = obs.obs token = obs[0] if token > 10: token = token // 2 # tokens 1 - 5 mean color the block at that index red if 1 <= token <= 5: return [token - 1, RED] # tokens 6 - 10 mean color the block at that index blue if 6 <= token <= 10: return [token - 8, BLUE] return [0, obs[4]]
[docs] def update(self, reward, done): pass
[docs] class SBWDefaultAgent(Agent): """ Default partner in the simple blockworld """
[docs] def get_action(self, obs): obs = obs.obs token = obs[0] if token == 0: # do nothing return [0, obs[4]] blocks = np.reshape(obs[1:], (NUM_BLOCKS, 4)) grid = self._gridfromobs(blocks) # tokens 1 - 7 mean find the first uncolored one in that row # and color it red if token <= 7: index = self._findfirstuncolored(grid, token - 1, blocks) if index != -1: return [index, RED] # tokens 8 - 14 mean find the first uncolored one in that row # and color it blue if token <= 14: index = self._findfirstuncolored(grid, token - 8, blocks) if index != -1: return [index, BLUE] # otherwise do nothing return [0, obs[4]]
def _findfirstuncolored(self, grid, row, blocks): for space in grid[row]: if space != -1: if blocks[space][3] == 0: return space return -1 def _gridfromobs(self, blocks): grid = np.full((GRIDLEN, GRIDLEN), -1) for i, block in enumerate(blocks): y = block[1] x = block[2] grid[y][x] = i if block[0] == 0: # horizontal grid[y][x + 1] = i else: grid[y + 1][x] = i return grid
[docs] def update(self, reward, done): pass