Solving a Brax environment using EvoTorch¶
This notebook demonstrates how the Brax environment named humanoid
can be solved using EvoTorch. The hyperparameters here are tuned for brax version 0.10.5.
EvoTorch provides VecGymNE
, a neuroevolution problem type that focuses on solving vectorized environments. If GPU is available, VecGymNE
can utilize it to boost performance. In this notebook, we use VecGymNE
to solve the humanoid
task.
For this notebook to work, the libraries JAX and Brax are required. For installing JAX, you might want to look at its official installation instructions. After a successful installation of JAX, Brax can be installed via:
Below, we import the necessary libraries.
from evotorch.algorithms import PGPE
from evotorch.neuroevolution import VecGymNE
from evotorch.logging import StdOutLogger, PicklingLogger
import os
import torch
from torch import nn
from datetime import datetime
from glob import glob
import shutil
We now check if CUDA is available. If it is, we prepare a configuration which will tell VecGymNE
to use a single GPU both for the population and for the fitness evaluation operations. If CUDA is not available, we will instead turn to actor-based parallelization on the CPU to boost the performance.
def how_many_cuda_devices():
import sys
import subprocess as sp
instr = r"""
import torch
x = torch.as_tensor(1.0)
device_count = 0
while True:
try:
x.to(f"cuda:{device_count}")
device_count += 1
except Exception:
break
print(device_count)"""
proc = sp.Popen(["python"], stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.PIPE, text=True)
outstr, errstr = proc.communicate(instr)
rcode = proc.wait()
if rcode == 0:
return int(outstr.strip())
else:
print(errstr)
raise RuntimeError(f"Cannot determine number of cuda devices:\n\n{errstr}")
if torch.cuda.is_available():
assert NUM_CUDA_DEVICES >= 1
# CUDA is available. Here, we prepare GPU-specific settings.
if NUM_CUDA_DEVICES == 1:
# We make only one GPU visible.
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
# os.environ["XLA_PYTHON_CLIENT_MEM_FRACTION"] = ".5" # Tell JAX to pre-allocate half of a GPU
os.environ["XLA_PYTHON_CLIENT_PREALLOCATE"] = "false" # Tell JAX to allocate on demand
# This is the device on which the population will be stored
device = "cuda:0"
# We do not want multi-actor parallelization when we have only 1 GPU.
num_actors = 0
# In the case of 1 CUDA device, there will be no distributed training
num_gpus_per_actor = None
distributed_algorithm = False
else:
# In the case of more than one CUDA devices, we enable distributed training
# os.environ["XLA_PYTHON_CLIENT_MEM_FRACTION"] = str((1 / NUM_CUDA_DEVICES) / 2)
os.environ["XLA_PYTHON_CLIENT_PREALLOCATE"] = "false" # Tell JAX to allocate on demand
device = "cpu" # Main device of the population is cpu
num_actors = NUM_CUDA_DEVICES # Allocate an actor per GPU
num_gpus_per_actor = 1 # Each actor gets assigned a GPU
distributed_algorithm = True # PGPE is to work on distributed mode
else:
# Since CUDA is not available, the device of the population will be cpu.
device = "cpu"
# No actor per GPU, since GPU is not available
num_gpus_per_actor = None
distributed_algorithm = False
#num_actors = "max" # Use all the CPUs to speed-up the evaluations.
num_actors = 1
# Because we are already using all the CPUs for actor-based parallelization,
# we tell XLA not to use multiple threads for its operations.
# (Following the suggestions at https://github.com/google/jax/issues/743)
os.environ["XLA_FLAGS"] = "--xla_cpu_multi_thread_eigen=false intra_op_parallelism_threads=1"
# We also tell OpenBLAS and MKL to use only 1 thread for their operations.
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
We now define our policy. The policy can be expressed as a string, or as an instance or as a subclass of torch.nn.Module
.
# --- A simple linear policy ---
# policy = "Linear(obs_length, act_length)"
# --- A feed-forward network ---
policy = "Linear(obs_length, 64) >> Tanh() >> Linear(64, act_length)"
# --- A feed-forward network with layer normalization ---
# policy = (
# """
# Linear(obs_length, 64)
# >> Tanh()
# >> LayerNorm(64, elementwise_affine=False)
# >> Linear(64, act_length)
# """
# )
# --- A recurrent network with layer normalization ---
# Note: in addition to RNN, LSTM is also supported
#
# policy = (
# """
# RNN(obs_length, 64)
# >> LayerNorm(64, elementwise_affine=False)
# >> Linear(64, act_length)
# """
# )
# --- A manual feed-forward network ---
# class MyManualNetwork(nn.Module):
# def __init__(self):
# super().__init__()
# ...
#
# def forward(self, x: torch.Tensor) -> torch.Tensor:
# ...
#
# policy = MyManualNetwork
# --- A manual recurrent network ---
# class MyManualRecurrentNetwork(nn.Module):
# def __init__(self):
# super().__init__()
# ...
#
# def forward(self, x: torch.Tensor, hidden_state = None) -> tuple:
# ...
# output_tensor = ...
# new_hidden_state = ... # hidden state could be a tensor, or a tuple or dict of tensors
# return output_tensor, new_hidden_state
#
# policy = MyManualRecurrentNetwork
Below, we instantiate our VecGymNE
problem.
Note.
At the time of writing this (27 May 2024), the arXiv paper of EvoTorch reports results based on the old implementations of the brax tasks (which were the default until brax v0.1.2). In brax version v0.9.0, these old task implementations moved into the namespace brax.v1
. If you wish to reproduce the results reported in the arXiv paper of EvoTorch, you might want to specify the environment name as "brax::old::humanoid"
(where the substring "old::"
causes VecGymNE
to instantiate the environment using the namespace brax.v1
), so that you will observe scores and execution times compatible with the ones reported in that arXiv paper. Please also see the mentioned arXiv paper for the hyperparameters used for the old brax environments.
problem = VecGymNE(
env=TASK_NAME,
network=policy,
#
# Collect observation stats, and use those stats to normalize incoming observations
observation_normalization=True,
#
# In the case of the "humanoid" task, the agent receives an "alive bonus" of 5.0 for each
# non-terminal state it observes. In this example, we cancel out this fixed amount of
# alive bonus using the keyword argument `decrease_rewards_by`.
# The amount of alive bonus changes from task to task (some of them don't have this bonus
# at all).
decrease_rewards_by=5.0,
#
# As an alternative to giving a fixed amount of alive bonus, we now enable a scheduled
# alive bonus.
# From timestep 0 to 400, the agents will receive no alive bonus.
# From timestep 400 to 700, the agents will receive partial alive bonus.
# Beginning with timestep 700, the agents will receive full (10.0) alive bonus.
alive_bonus_schedule=(400, 700, 10.0),
device=device,
num_actors=num_actors,
num_gpus_per_actor=num_gpus_per_actor,
)
problem, problem.solution_length
Initialize a PGPE to work on the problem.
Note: If you receive memory allocation error from the GPU driver, you might want to try again with:
- a decreased popsize
- a policy with decreased hidden size and/or number of layers (in case the policy is a neural network)
RADIUS = 2.25
MAX_SPEED = RADIUS / 15
CENTER_LR = MAX_SPEED * 0.75
POPSIZE = 4000
NUM_GENERATIONS = 1000
SAVE_INTERVAL = 20
# Instantiate a PGPE using the hyperparameters prepared above
searcher = PGPE(
problem,
popsize=POPSIZE,
num_interactions=(POPSIZE * 1000 * 0.75),
radius_init=RADIUS,
center_learning_rate=CENTER_LR,
optimizer="clipup",
optimizer_config={"max_speed": MAX_SPEED},
stdev_learning_rate=0.1,
distributed=distributed_algorithm,
)
searcher
task_name_for_saving = TASK_NAME.split("::")[-1]
now_as_str = datetime.now().strftime("%Y-%m-%d-%H.%M.%S")
OUTPUT_DIR = f"{task_name_for_saving}_{now_as_str}_{os.getpid()}"
print("PicklingLogger will save into", OUTPUT_DIR)
We register two loggers for our PGPE instance.
- StdOutLogger: A logger which will print out the status of the optimization.
- PicklingLogger: A logger which will periodically save the latest result into a pickle file.
_ = StdOutLogger(searcher)
pickler = PicklingLogger(searcher, interval=SAVE_INTERVAL, directory=OUTPUT_DIR)
We are now ready to start the evolutionary search.
for generation in range(1, 1 + NUM_GENERATIONS):
t_before_step = datetime.now()
searcher.step()
t_after_step = datetime.now()
print("Elapsed:", (t_after_step - t_before_step).total_seconds())
print("The run is finished.")
print("The pickle file that contains the latest result is:")
print(pickler.last_file_name)
See the notebook Brax_Experiments_Visualization.ipynb for visualizing the pickle files generated by this notebook.