Evolving Objects¶
This notebook shows how one can express solutions that are not entirely numeric, and that differ in structure and/or length. To express such solutions, one can set the dtype
of the problem at hand as object
, which is demonstrated in this notebook.
Task¶
In this example notebook, we aim to evolve a neural network policy for the MuJoCo Ant environment provided by the gymnasium
library.
Implementation details¶
Genetic Algorithm.
We use a simple genetic algorithm, inspired by the neuroevolution studies [1]
and [2]
. This simple genetic algorithm has neither cross-over nor tournament selection. We only rely on a mutation operator. The default population size in this notebook is 4. At each generation, 16 mutated candidate solutions are generated, giving us an extended population of size 20. The best 4 solutions within this extended population is then selected for the next generation.
How a solution is encoded and decoded.
In [1]
, a solution is expressed via a list of random seeds. These random seeds are decoded into a parameter vector to fill an entire neural network, by generating pseudo-random Gaussian noise out of each seed, and then applying a final summation upon these generated noise vectors.
In [2]
, parameters of three modules are evolved: vision, memory, and policy. A solution includes a separate list of seeds for each module (where each list of seeds is decoded to the associated module's parameters like in [1]
).
In this notebook, like in [1]
, we evolve a single module, which is a policy based on a feed-forward neural network. Additionally, like how it was done in [2]
for multiple modules, we keep a different list of seeds for each parameter tensor of our policy neural network.
The structure of a solution can be summarized like this:
[
[ ... random seeds for parameter tensor0 ... ],
[ ... random seeds for parameter tensor1 ... ],
...
]
Action binning. In this example, by default, we apply binning on the actions generated by the feed-forward network, the two bins being -0.2 and +0.2.
How a solution is mutated. To mutate a solution, we pick one of the contained sublists, and add another random seed into the picked sublist. By this, we end up adding further noise onto the pickled sublist's associated parameter tensor.
Please note¶
Although this example is inspired by the studies [1]
and [2]
, the implementation here does not strictly follow what is proposed in those studies. Also, we do not claim that the particular implementation we show in this notebook is a competitive and efficient way of doing neuro-evolution. Still, you might find it interesting that this approach can evolve primitive gaits for the Ant-v4
environment, using a small population (popsize=4
and num_mutated_solutions=16
, 10 episodes when evaluating each solution), over a small number of generations (30).
Most importantly, we hope this notebook will serve as a tutorial showing how one can define problems whose solutions are not necessarily numeric, and/or whose lengths are not fixed.
[1]
Felipe Petroski Such, Vashisht Madhavan, Edoardo Conti, Joel Lehman, Kenneth O. Stanley, Jeff Clune (2018). Deep Neuroevolution: Genetic Algorithms Are a Competitive Alternative for Training Deep Neural Networks for Reinforcement Learning.
[2]
Sebastian Risi, Kenneth O. Stanley (2019). Deep Neuroevolution of Recurrent and Discrete World Models.
We begin with importing the necessary libraries:
from evotorch import Problem, Solution, SolutionBatch
from evotorch.algorithms import GeneticAlgorithm
from evotorch.tools import ObjectArray, as_tensor
from evotorch.logging import StdOutLogger, PicklingLogger
import numpy as np
import gymnasium as gym
import pickle
import os
from typing import Iterable, Optional, Union
from datetime import datetime
import torch
from torch import nn
from copy import deepcopy
Below are utility functions for generating and loading observation normalization data.
def env_name_to_file_name(env_name: str) -> str:
"""
Convert the gymnasium environment ID to a more file-name-friendly counterpart.
The character ':' in the input string will be replaced with '__colon__'.
Similarly, the character '/' in the input string will be replaced with '__slash__'.
Args:
env_name: gymnasium environment ID
Returns:
File-name-friendly counterpart of the input string.
"""
result = env_name
result = result.replace(":", "__colon__")
result = result.replace("/", "__slash__")
return result
def create_obs_data(
*,
env_name: str,
num_timesteps: int,
report_interval: Union[int, float] = 5,
seed: int = 0,
) -> tuple:
"""
Create observation normalization data with the help of random actions.
This function creates a gymnasium environment from the given `env_name`.
Then, it keeps sending random actions to this environment, and collects stats from the observations.
Args:
env_name: ID of the gymnasium environment
num_timesteps: For how many timesteps will the function operate on the environment
report_interval: Time interval, in seconds, for reporting the status
seed: A seed that will be used for regulating the randomness of both the environment
and of the random actions.
Returns:
A tuple of the form `(mean, stdev)`, where `mean` is the elementwise mean of the observation vectors,
and `stdev` is the elementwise standard deviation of the observation vectors.
"""
print("Creating observation data for", env_name)
class accumulated:
sum: Optional[np.ndarray] = None
sum_of_squares: Optional[np.ndarray] = None
count: int = 0
def accumulate(obs: np.ndarray):
if accumulated.sum is None:
accumulated.sum = obs.copy()
else:
accumulated.sum += obs
squared = obs ** 2
if accumulated.sum_of_squares is None:
accumulated.sum_of_squares = squared
else:
accumulated.sum_of_squares += squared
accumulated.count += 1
rndgen = np.random.RandomState(seed)
env = gym.make(env_name)
assert isinstance(env.action_space, gym.spaces.Box), "Can only work with Box action spaces"
def reset_env() -> tuple:
return env.reset(seed=rndgen.randint(2 ** 32))
action_gap = env.action_space.high - env.action_space.low
def sample_action() -> np.ndarray:
return (rndgen.rand(*(env.action_space.shape)) * action_gap) + env.action_space.low
observation, _ = reset_env()
accumulate(observation)
last_report_time = datetime.now()
for t in range(num_timesteps):
action = sample_action()
observation, _, terminated, truncated, _ = env.step(action)
accumulate(observation)
done = terminated | truncated
if done:
observation, info = reset_env()
accumulate(observation)
tnow = datetime.now()
if (tnow - last_report_time).total_seconds() > report_interval:
print("Number of timesteps:", t, "/", num_timesteps)
last_report_time = tnow
E_x = accumulated.sum / accumulated.count
E_x2 = accumulated.sum_of_squares / accumulated.count
mean = E_x
variance = np.maximum(E_x2 - ((E_x) ** 2), 1e-2)
stdev = np.sqrt(variance)
print("Done.")
return mean, stdev
def get_obs_data(env_name: str, num_timesteps: int = 50000, seed: int = 0) -> tuple:
"""
Generate observation normalization data for the gymnasium environment whose name is given.
If such normalization data was already generated and saved into a pickle file, that pickle file will be loaded.
Otherwise, new normalization data will be generated and saved into a new pickle file.
Args:
env_name: ID of the gymnasium environment
num_timesteps: For how many timesteps will the observation collector operate on the environment
seed: A seed that will be used for regulating the randomness of both the environment
and of the random actions.
Returns:
A tuple of the form `(mean, stdev)`, where `mean` is the elementwise mean of the observation vectors,
and `stdev` is the elementwise standard deviation of the observation vectors.
"""
num_timesteps = int(num_timesteps)
envfname = env_name_to_file_name(env_name)
fname = f"obs_seed{seed}_t{num_timesteps}_{envfname}.pickle"
if os.path.isfile(fname):
with open(fname, "rb") as f:
return pickle.load(f)
else:
obsdata = create_obs_data(env_name=env_name, num_timesteps=num_timesteps, seed=seed)
with open(fname, "wb") as f:
pickle.dump(obsdata, f)
return obsdata
Below is the environment ID of the MuJoCo Ant environment that we are targeting.
To try this notebook on a different environment, one can change the value of ENV_NAME
below.
Please note, however, that different environments will most probably require different hyperparameter settings.
Below, we generate (or load) observation normalization data for the environment:
class PolicySearchProblem(Problem):
"""
A policy search problem where solutions are expressed as lists of sublists of seeds.
This problem class can be seen as a simplified re-implementation of
`evotorch.neuroevolution.GymNE`, the main difference being that a policy (a solution)
is represented via a list of sublists of seeds, instead of vectors of real numbers.
Among the simplifications compared to `evotorch.neuroevolution.GymNE` are:
- There is no online observation normalization. Instead, a static normalization data is
given at the moment of initialization.
- Discrete-actioned environments are not considered here. Both observations and actions
are assumed to be in the form of 1-dimensional numeric arrays.
"""
def __init__(
self,
*,
env_name: str,
obs_mean: Optional[np.ndarray] = None,
obs_stdev: Optional[np.ndarray] = None,
mutation_power: float = 0.5,
mutation_decay: float = 0.9,
min_mutation_power: float = 0.05,
hidden_sizes: tuple = (64,),
bin: Optional[float] = 0.2,
num_episodes: int = 5,
decrease_rewards_by: Optional[float] = 1.0,
num_actors: Optional[Union[int, str]] = 4, # "max"
):
"""
`__init__(...)`: Initialize the `PolicySearchProblem`.
Args:
env_name: ID of the gymnasium environment, as a string.
obs_mean: Mean vector for the observations of the environment.
obs_stdev: Standard deviation vector for the observations of the environment.
mutation_power: Multiplier that affects the magnitude of the pseudo-random mutation noise.
mutation_decay: As more seeds are added to a sublist of a solution, the magnitude of the pseudo-random
mutation can be decayed with the help of this argument. If `mutation_decay` is 1.0, there will be no
decaying. If `mutation_decay` is 0.9, the magnitude of the pseudo-random noise is decayed via
multiplication by 0.9 with each new seed.
min_mutation_power: Lower bound for the mutation power, to prevent it from decaying too much.
hidden_sizes: A tuple containing the sizes for the hidden layers of the network.
The integers within this tuple determines the layer sizes, and the number of elements within this
tuple determines the number of hidden layers of the feedforward policy network.
bin: When given as a real number `r`, the action vector generated by the policy network will be
subject to binning, the two action bins being `-r` and `+r`. Can be left as None if action binning
is not desired.
num_episodes: For how many episodes will a solution be tested.
decrease_rewards_by: When given as a real number, each reward will be decreased by this amount for each
timestep. Can be used to nullify the alive bonuses of the environment.
This feature can only be used when the actual rewards of the environment is used. Therefore,
if `eval_by_x` or `eval_by_distance` is True, this argument must be left as None.
num_actors: Number of actors to be created for parallelized evaluation.
If given as "max", all available CPUs will be used.
"""
self._env_name = str(env_name)
self._env: Optional[gym.Env] = None
self._policy: Optional[nn.Module] = None
self._policy_dtype: Optional[torch.dtype] = None
self._mutation_power: float = float(mutation_power)
self._mutation_decay: float = float(mutation_decay)
self._min_mutation_power: float = float(min_mutation_power)
self._hidden_sizes: tuple = tuple(int(item) for item in hidden_sizes)
self._num_episodes: int = int(num_episodes)
if bin is None:
self._bin_ub = None
self._bin_lb = None
else:
self._bin_ub = abs(float(bin))
self._bin_lb = -(self._bin_ub)
self._num_layers: int = sum(1 for _ in self._make_neural_network(1, 1).parameters())
self._obs_mean: Optional[np.ndarray] = obs_mean
self._obs_stdev: Optional[np.ndarray] = obs_stdev
self._decrease_rewards_by = None if decrease_rewards_by is None else float(decrease_rewards_by)
# Please notice how we set the `dtype` of the problem as `object`.
# Having `dtype=object` causes this Problem instance to work with `ObjectArray` instances, rather than
# PyTorch tensors.
# An ObjectArray can store `None`s, lists, dictionaries, strings, floats, ints, numpy arrays, and PyTorch
# tensors within itself.
# In our example, each solution will be a list of lists, where the sublists contain integers representing
# random seeds.
super().__init__(
objective_sense="max",
dtype=object,
num_actors=num_actors,
)
def _fill(self, values: ObjectArray):
# Fill the given `values` with newly created solutions.
# Each item within `values` represents the decision values for another solution.
n = len(values)
all_seeds = np.random.randint(2 ** 32, size=(n, self._num_layers))
values[:] = [
[[seed] for seed in seeds_for_layers]
for seeds_for_layers in all_seeds
]
def _get_env(self) -> gym.Env:
# Get the gymnasium environment instance.
# If the environment is not created yet, create and return it.
if self._env is None:
self._env = gym.make(self._env_name)
assert isinstance(self._env.observation_space, gym.spaces.Box)
assert isinstance(self._env.action_space, gym.spaces.Box)
return self._env
def _make_neural_network(self, input_size: int, output_size: int) -> nn.Module:
from evotorch.neuroevolution.net.layers import Bin
# Make the neural network policy in the form of an `nn.Module` instance.
sizes = (input_size,) + self._hidden_sizes + (output_size,)
submodules = []
last_i = len(sizes) - 2
for i in range(1 + last_i):
if i > 0:
submodules.append(nn.Tanh())
submodules.append(nn.Linear(sizes[i], sizes[i + 1]))
if (self._bin_lb is not None) and (self._bin_ub is not None):
submodules.append(Bin(self._bin_lb, self._bin_ub))
return nn.Sequential(*submodules)
def _get_policy(self, solution: Optional[Iterable] = None) -> nn.Module:
# Get the policy neural network.
# If the policy neural network is not created yet, create it with the help of the internal method
# `make_neural_network(...)`
# If `solution` is given, decode the `solution` into parameter tensors, and parameterize the neural network
# policy with these tensors.
if self._policy is None:
env = self._get_env()
[obs_length] = env.observation_space.shape
[act_length] = env.action_space.shape
self._policy = self._make_neural_network(obs_length, act_length)
for p in self._policy.parameters():
self._policy_dtype = p.dtype
break
if solution is not None:
if isinstance(solution, Solution):
# Get the decision values stored by the `solution`.
# In the case of this example, given how we initially `_fill` our solutions,
# we should get a list of lists of random seeds.
# Note that the `.values` property gives us the decision values in a read-only form.
# Therefore, the "list"s we get here are actually `ImmutableList` instances.
# Within this method, having these lists in read-only form should not break anything, because
# we just want to read and decode them.
solution = solution.values
elif isinstance(solution, SolutionBatch):
raise TypeError(
"Cannot fill the policy network from a `SolutionBatch` (which could contain multiple solutions)."
" Please try again with a single `Solution` object."
)
seeds_for_all_layers = solution
parameters_for_all_layers = list(self._policy.parameters())
assert len(seeds_for_all_layers) == len(parameters_for_all_layers)
with torch.no_grad():
for seeds, parameter_tensor in zip(seeds_for_all_layers, parameters_for_all_layers):
parameter_tensor[:] = 0
mutation_power = self._mutation_power
for seed in seeds:
parameter_tensor += torch.as_tensor(
(np.random.RandomState(seed).randn(*(parameter_tensor.shape)) * mutation_power),
dtype=self._policy_dtype,
)
mutation_power = max(mutation_power * self._mutation_decay, self._min_mutation_power)
return self._policy
@torch.no_grad()
def to_torch_module(self, solution: Iterable) -> nn.Module:
"""
Get an independent (cloned) policy neural network, parameterized via decoding the given `solution`.
Args:
solution: The solution which expresses the parameters of the policy by storing random seeds.
Can be a `Solution` instance or a list-like object.
Returns:
The policy, as a `nn.Module` instance. Note that, this `to_torch_module(...)` method implementation
does not contain observation normalization and action clipping layers in this returned module.
"""
if solution is None:
raise TypeError("The argument `solution` cannot be None")
return deepcopy(self._get_policy(solution))
def run_solution(
self,
solution: Iterable,
*,
visualize: bool = False,
print_scores: bool = False,
) -> float:
"""
Run the policy expressed by the given `solution`. Return the fitness.
Args:
solution: The solution which expresses the parameters of the policy by storing random seeds.
Can be a `Solution` instance or a list-like object.
visualize: Whether or not to render the environment onto the screen while running the solution.
print_scores: Whether or not to print the scores while running the solution.
Returns:
The fitness.
"""
if visualize:
env = gym.make(self._env_name, render_mode="human")
else:
env = self._get_env()
policy = self._get_policy(solution)
def normalized(obs: np.ndarray) -> np.ndarray:
if (self._obs_mean is not None) and (self._obs_stdev is not None):
obs = obs - self._obs_mean
obs = obs / self._obs_stdev
return obs
def use_policy(obs: np.ndarray) -> np.ndarray:
obs = torch.as_tensor(normalized(obs), dtype=self._policy_dtype)
with torch.no_grad():
action = policy(obs)
action = action.numpy()
return action
scores = []
for _ in range(self._num_episodes):
cumulative_reward = 0.0
unmodified_cumulative_reward = 0.0
observation, _ = env.reset()
if visualize:
env.render()
while True:
action = use_policy(observation)
observation, reward, terminated, truncated, info = env.step(action)
if visualize:
env.render()
unmodified_cumulative_reward += reward
if self._decrease_rewards_by is not None:
reward -= self._decrease_rewards_by
cumulative_reward += reward
if terminated or truncated:
break
episode_score = cumulative_reward
if print_scores:
print("Episode score:", episode_score, " Cumulative reward:", unmodified_cumulative_reward)
scores.append(episode_score)
return float(np.mean(scores))
def visualize(
self,
solution: Iterable,
*,
print_scores: bool = True,
) -> float:
"""
Visualize the policy expressed by the given `solution`. Return the fitness.
Args:
solution: The solution which expresses the parameters of the policy by storing random seeds.
Can be a `Solution` instance or a list-like object.
print_scores: Whether or not to print the scores while running the solution.
Returns:
The fitness.
"""
return self.run_solution(
solution,
visualize=True,
print_scores=print_scores,
)
def _evaluate(self, solution: Solution):
solution.set_evaluation(self.run_solution(solution))
# The `mutate` function we implement below uses this constant, which determines how many mutated decision values
# will be generated from the parent decision values.
num_mutated_solutions = 16
def mutate(all_values: ObjectArray) -> ObjectArray:
"""
Generate mutated decision values, taking the given decision values as a base.
Args:
all_values: An `ObjectArray` containing the source decision values. New mutated decision values will be
generated based on these values. Each item within this `ObjectArray` represents the decision values of a
different solution.
Returns:
Mutated decision values.
"""
# Generate an empty list that will contain the mutated decision values
mutated_values = [None for _ in range(num_mutated_solutions)]
num_original_solutions = len(all_values)
for i in range(num_mutated_solutions):
original_solution_index = i % num_original_solutions
# When we take an element from the `ObjectArray`, containers such as lists are stored in a read-only manner
# (i.e. they are actually `ImmutableList` instances).
# By using the clone method of such an immutable container, we get the native Python counterpart of the
# container, as an independent clone. Now, we can modify this clone.
solution_values = all_values[original_solution_index].clone()
num_layers = len(solution_values)
chosen_layer_index = int(np.random.randint(num_layers, size=tuple()))
solution_values[chosen_layer_index].append(int(np.random.randint(2 ** 32, size=tuple())))
mutated_values[i] = solution_values
# Note that the `as_tensor(...)` function we use below is from the namespace `evotorch.tools`.
# This `as_tensor(...)` function, upon receiving the keyword argument `dtype=object`, will convert the list it
# receives to `ObjectArray`, which is the expected return type for mutation functions such as this one
# when the problem's dtype is `object`.
return as_tensor(mutated_values, dtype=object)
Now that we have the implementation of our Problem
subclass, we instantiate it:
problem = PolicySearchProblem(
env_name=ENV_NAME,
obs_mean=obs_mean,
obs_stdev=obs_stdev,
num_actors="max", # <- "max" means 'use all CPUs'
)
Instantiate the GeneticAlgorithm
searcher = GeneticAlgorithm(
# This is the `Problem` instance that we will work on:
problem,
# We only have one mutation operator, which is implemented within the `mutate` function:
operators=[mutate],
# Population size:
popsize=4,
# If `elitist` is given as True:
# - Mutated solutions generated with the help of `mutate(...)` will be added to the parent population, creating
# an extended population.
# - The top-`popsize` of the solutions from within this extended population will be picked as the next
# generation's solutions.
#
# If `elitist` is given as False:
# - If the number of mutated solutions is less than `popsize`, the worst solutions of the main population
# will be replaced by the mutated solutions
# - If the number of mutated solutions is equal or greater than `popsize`, the main population will be replaced
# entirely by the top-`popsize` mutated solutions.
# - With `num_mutated_solutions>=popsize` (which is the default case within this notebook, given that `popsize=4`
# and `num_mutated_solutions=16`), it is NOT recommended to set `elitist=False`, because the mutation
# operator we implement here does not have any guarantee of improving the parent solutions. So, randomly made
# mutated solutions would entirely replace the parents, turning this genetic algorithm to a blind search.
elitist=True,
)
searcher
_ = StdOutLogger(searcher)
pickling_logger = PicklingLogger(searcher, items_to_save=["best", "pop_best"], interval=5)
t0 = datetime.now()
searcher.run(30)
t1 = datetime.now()
print("Elapsed:", (t1 - t0).total_seconds(), "seconds.")
Visualization of the solution¶
# "pop_best": solution which got the best score in the last population
# "best": solution which got the best score during the entire evolutionary run
# Please note that, in the case of Ant-v4, the simulator (and therefore the problem
# we are solving in this example) is stochastic. Therefore, even when the "best" solution
# has a better recorded score, it does not mean that it is definitely better than the
# "pop_best" score.
# If our optimization problem were purely numeric and if we were using a distribution-based algorithm
# such as PGPE, CEM, etc., the "center" solution would also be available (and would usually be a
# reliable candidate for being picked as the final solution for a stochastic problem such as this).
problem.visualize(searcher.status["pop_best"])