Skip to content

Solving reinforcement learning tasks using functional evolutionary algorithms

The functional implementations of evolutionary algorithms can interact with the object-oriented Problem API of EvoTorch. To demonstrate this, we instantiate a GymNE problem configured to work on the reinforcement learning task CartPole-v1, and we use the functional pgpe algorithm to solve it.

from evotorch.algorithms.functional import pgpe, pgpe_ask, pgpe_tell
from evotorch.neuroevolution import GymNE
from datetime import datetime
import torch

Below, we instantiate the reinforcement learning problem.

problem = GymNE(
    # The id of the gymnasium task:
    "CartPole-v1",

    # Policy architecture to use.
    # This can also be given as a subclass of `torch.nn.Module`, or an instantiated
    # `torch.nn.Module` object. For simplicity, we use a basic feed-forward neural
    # network, that can be expressed as a string.
    "Linear(obs_length, 16) >> Tanh() >> Linear(16, act_length)",

    # Setting `observation_normalization` as True means that stats regarding
    # observations will be collected during each population evaluation
    # process, and those stats will be used to normalize the future
    # observation data (that will be given as input to the policy).
    observation_normalization=True,

    # Number of actors to be used. Can be an integer.
    # The string "max" means that the number of actors will be equal to the
    # number of CPUs.
    num_actors="max",
)

problem

Now that we have instantiated our problem, we make a callable evaluator object from it. This callable evaluator, named f, behaves like a function f(x), where x can be a single solution (represented by a 1-dimensional tensor), or a population (represented by a 2-dimensional tensor where each row is a solution), or a batch of populations (represented by a tensor with at least 3 dimensions). Upon receiving its argument x, f uses the problem object to evaluate the solution(s), and return the evalution result(s).

f = problem.make_callable_evaluator()
f

Hyperparameters for pgpe:

popsize = 100
center_init = problem.make_zeros(num_solutions=1)[0]
max_speed = 0.15
center_learning_rate = max_speed * 0.75
radius = max_speed * 15
stdev_learning_rate = 0.1
stdev_max_change = 0.2

We prepare the pgpe algorithm and get its initial state:

pgpe_state = pgpe(
    # Center of the initial search distribution
    center_init=center_init,

    # Radius for the initial search distribution
    radius_init=radius,

    # Learning rates for when updating the center and the standard deviation
    # of the search distribution
    center_learning_rate=center_learning_rate,
    stdev_learning_rate=stdev_learning_rate,

    # Maximum relative amount of change for standard deviation.
    # Setting this as 0.2 means that an item of the standard deviation vector
    # will not be allowed to change more than the 20% of its original value.
    stdev_max_change=stdev_max_change,

    # The ranking method to be used.
    # "centered" is a ranking method which assigns the rank -0.5 to the worst
    # solution, and +0.5 to the best solution.
    ranking_method="centered",

    # The optimizer to be used. Can be "clipup", "adam", or "sgd".
    optimizer="clipup",

    # Optimizer-specific hyperparameters:
    optimizer_config={"max_speed": max_speed},

    # Whether or not symmetric sampling will be used.
    symmetric=True,

    # We want to maximize the evaluation results.
    # In the case of reinforcement learning tasks declared via `GymNE`,
    # evaluation results represent the cumulative rewards.
    objective_sense="max",
)

Below is the main loop of the evolutionary search.

# We run the evolutionary search for this many generations:
num_generations = 40

last_report_time = datetime.now()

# This is the interval (in seconds) for reporting the status:
reporting_interval = 1

for generation in range(1, 1 + num_generations):
    # Get a population from the pgpe algorithm
    population = pgpe_ask(pgpe_state, popsize=popsize)

    # Evaluate the fitnesses
    fitnesses = f(population)

    # Inform pgpe of the fitnesses and get its next state
    pgpe_state = pgpe_tell(pgpe_state, population, fitnesses)

    # If it is time to report, print the status
    tnow = datetime.now()
    if (tnow - last_report_time).total_seconds() > reporting_interval:
        print("generation:", generation, "median eval:", torch.median(fitnesses))
        last_report_time = tnow

Here is the center point of the most recent search distribution:

x = pgpe_state.optimizer_state.center
x

Now, we visualize the agent evolved by our functional pgpe:

problem.visualize(x)

See this notebook on GitHub