Index
This namespace contains the implementations of various evolutionary algorithms.
cmaes
¶
This namespace contains the CMAES class
CMAES (SearchAlgorithm, SinglePopulationAlgorithmMixin)
¶
This is a GPU-accelerated and vectorized implementation, based on pycma (version r3.2.2) and the below references.
References:
Nikolaus Hansen, Youhei Akimoto, and Petr Baudis.
CMA-ES/pycma on Github. Zenodo, DOI:10.5281/zenodo.2559634,
February 2019.
<https://github.com/CMA-ES/pycma>
Nikolaus Hansen, Andreas Ostermeier (2001).
Completely Derandomized Self-Adaptation in Evolution Strategies.
Nikolaus Hansen (2016).
The CMA Evolution Strategy: A Tutorial.
Source code in evotorch/algorithms/cmaes.py
class CMAES(SearchAlgorithm, SinglePopulationAlgorithmMixin):
"""
CMAES: Covariance Matrix Adaptation Evolution Strategy.
This is a GPU-accelerated and vectorized implementation, based on pycma (version r3.2.2)
and the below references.
References:
Nikolaus Hansen, Youhei Akimoto, and Petr Baudis.
CMA-ES/pycma on Github. Zenodo, DOI:10.5281/zenodo.2559634,
February 2019.
<https://github.com/CMA-ES/pycma>
Nikolaus Hansen, Andreas Ostermeier (2001).
Completely Derandomized Self-Adaptation in Evolution Strategies.
Nikolaus Hansen (2016).
The CMA Evolution Strategy: A Tutorial.
"""
def __init__(
self,
problem: Problem,
*,
stdev_init: Real,
popsize: Optional[int] = None,
center_init: Optional[Vector] = None,
c_m: Real = 1.0,
c_sigma: Optional[Real] = None,
c_sigma_ratio: Real = 1.0,
damp_sigma: Optional[Real] = None,
damp_sigma_ratio: Real = 1.0,
c_c: Optional[Real] = None,
c_c_ratio: Real = 1.0,
c_1: Optional[Real] = None,
c_1_ratio: Real = 1.0,
c_mu: Optional[Real] = None,
c_mu_ratio: Real = 1.0,
active: bool = True,
csa_squared: bool = False,
stdev_min: Optional[Real] = None,
stdev_max: Optional[Real] = None,
separable: bool = False,
limit_C_decomposition: bool = True,
obj_index: Optional[int] = None,
):
"""
`__init__(...)`: Initialize the CMAES solver.
Args:
problem (Problem): The problem object which is being worked on.
stdev_init (Real): Initial step-size
popsize: Population size. Can be specified as an int,
or can be left as None in which case the CMA-ES rule of thumb is applied:
popsize = 4 + floor(3 log d) where d is the dimension
center_init: Initial center point of the search distribution.
Can be given as a Solution or as a 1-D array.
If left as None, an initial center point is generated
with the help of the problem object's `generate_values(...)`
method.
c_m (Real): Learning rate for updating the mean
of the search distribution. By default the value is 1.
c_sigma (Optional[Real]): Learning rate for updating the step size. If None,
then the CMA-ES rules of thumb will be applied.
c_sigma_ratio (Real): Multiplier on the learning rate for the step size.
if c_sigma has been left as None, can be used to rescale the default c_sigma value.
damp_sigma (Optional[Real]): Damping factor for updating the step size. If None,
then the CMA-ES rules of thumb will be applied.
damp_sigma_ratio (Real): Multiplier on the damping factor for the step size.
if damp_sigma has been left as None, can be used to rescale the default damp_sigma value.
c_c (Optional[Real]): Learning rate for updating the rank-1 evolution path.
If None, then the CMA-ES rules of thumb will be applied.
c_c_ratio (Real): Multiplier on the learning rate for the rank-1 evolution path.
if c_c has been left as None, can be used to rescale the default c_c value.
c_1 (Optional[Real]): Learning rate for the rank-1 update to the covariance matrix.
If None, then the CMA-ES rules of thumb will be applied.
c_1_ratio (Real): Multiplier on the learning rate for the rank-1 update to the covariance matrix.
if c_1 has been left as None, can be used to rescale the default c_1 value.
c_mu (Optional[Real]): Learning rate for the rank-mu update to the covariance matrix.
If None, then the CMA-ES rules of thumb will be applied.
c_mu_ratio (Real): Multiplier on the learning rate for the rank-mu update to the covariance matrix.
if c_mu has been left as None, can be used to rescale the default c_mu value.
active (bool): Whether to use Active CMA-ES. Defaults to True, consistent with the tutorial paper and pycma.
csa_squared (bool): Whether to use the squared rule ("CSA_squared" in pycma) for the step-size adapation.
This effectively corresponds to taking the natural gradient for the evolution path on the step size,
rather than the default CMA-ES rule of thumb.
stdev_min (Optional[Real]): Minimum allowed standard deviation of the search
distribution. Leaving this as None means that no such
boundary is to be used.
Can be given as None or as a scalar.
stdev_max (Optional[Real]): Maximum allowed standard deviation of the search
distribution. Leaving this as None means that no such
boundary is to be used.
Can be given as None or as a scalar.
separable (bool): Provide this as True if you would like the problem
to be treated as a separable one. Treating a problem
as separable means to adapt only the diagonal parts
of the covariance matrix and to keep the non-diagonal
parts 0. High dimensional problems result in large
covariance matrices on which operating is computationally
expensive. Therefore, for such high dimensional problems,
setting `separable` as True might be useful.
limit_C_decomposition (bool): Whether to limit the frequency of decomposition of the shape matrix C
Setting this to True (default) means that C will not be decomposed every generation
This degrades the quality of the sampling and updates, but provides a guarantee of O(d^2) time complexity.
This option can be used with separable=True (e.g. for experimental reasons) but the performance will only degrade
without time-complexity benefits.
obj_index (Optional[int]): Objective index according to which evaluation
of the solution will be done.
"""
# Initialize the base class
SearchAlgorithm.__init__(self, problem, center=self._get_center, stepsize=self._get_sigma)
# Ensure that the problem is numeric
problem.ensure_numeric()
# Store the objective index
self._obj_index = problem.normalize_obj_index(obj_index)
# Track d = solution length for reference in initialization of hyperparameters
d = self._problem.solution_length
# === Initialize population ===
if not popsize:
# Default value used in CMA-ES literature 4 + floor(3 log n)
popsize = 4 + int(np.floor(3 * np.log(d)))
self.popsize = int(popsize)
# Half popsize, referred to as mu in CMA-ES literature
self.mu = int(np.floor(popsize / 2))
self._population = problem.generate_batch(popsize=popsize)
# === Initialize search distribution ===
self.separable = separable
# If `center_init` is not given, generate an initial solution
# with the help of the problem object.
# If it is given as a Solution, then clone the solution's values
# as a PyTorch tensor.
# Otherwise, use the given initial solution as the starting
# point in the search space.
if center_init is None:
center_init = self._problem.generate_values(1)
elif isinstance(center_init, Solution):
center_init = center_init.values.clone()
# Store the center
self.m = self._problem.make_tensor(center_init)
# Store the initial step size
self.sigma = self._problem.make_tensor(stdev_init)
if separable:
# Initialize C as the diagonal vector. Note that when separable, the eigendecomposition is not needed
self.C = self._problem.make_ones(d)
# In this case A is simply the square root of elements of C
self.A = self._problem.make_ones(d)
else:
# Initialize C = AA^T all diagonal.
self.C = self._problem.make_I(d)
self.A = self.C.clone()
# === Initialize raw weights ===
# Conditioned on popsize
# w_i = log((lambda + 1) / 2) - log(i) for i = 1 ... lambda
raw_weights = self.problem.make_tensor(np.log((popsize + 1) / 2) - torch.log(torch.arange(popsize) + 1))
# positive valued weights are the first mu
positive_weights = raw_weights[: self.mu]
negative_weights = raw_weights[self.mu :]
# Variance effective selection mass of positive weights
# Not affected by future updates to raw_weights
self.mu_eff = torch.sum(positive_weights).pow(2.0) / torch.sum(positive_weights.pow(2.0))
# === Initialize search parameters ===
# Conditioned on weights
# Store fixed information
self.c_m = c_m
self.active = active
self.csa_squared = csa_squared
self.stdev_min = stdev_min
self.stdev_max = stdev_max
# Learning rate for step-size adaption
if c_sigma is None:
c_sigma = (self.mu_eff + 2.0) / (d + self.mu_eff + 3)
self.c_sigma = c_sigma_ratio * c_sigma
# Damping factor for step-size adapation
if damp_sigma is None:
damp_sigma = 1 + 2 * max(0, torch.sqrt((self.mu_eff - 1) / (d + 1)) - 1) + self.c_sigma
self.damp_sigma = damp_sigma_ratio * damp_sigma
# Learning rate for evolution path for rank-1 update
if c_c is None:
# Branches on separability
if separable:
c_c = (1 + (1 / d) + (self.mu_eff / d)) / (d**0.5 + (1 / d) + 2 * (self.mu_eff / d))
else:
c_c = (4 + self.mu_eff / d) / (d + (4 + 2 * self.mu_eff / d))
self.c_c = c_c_ratio * c_c
# Learning rate for rank-1 update to covariance matrix
if c_1 is None:
# Branches on separability
if separable:
c_1 = 1.0 / (d + 2.0 * np.sqrt(d) + self.mu_eff / d)
else:
c_1 = min(1, popsize / 6) * 2 / ((d + 1.3) ** 2.0 + self.mu_eff)
self.c_1 = c_1_ratio * c_1
# Learning rate for rank-mu update to covariance matrix
if c_mu is None:
# Branches on separability
if separable:
c_mu = (0.25 + self.mu_eff + (1.0 / self.mu_eff) - 2) / (d + 4 * np.sqrt(d) + (self.mu_eff / 2.0))
else:
c_mu = min(
1 - self.c_1, 2 * ((0.25 + self.mu_eff - 2 + (1 / self.mu_eff)) / ((d + 2) ** 2.0 + self.mu_eff))
)
self.c_mu = c_mu_ratio * c_mu
# The 'variance aware' coefficient used for the additive component of the evolution path for sigma
self.variance_discount_sigma = torch.sqrt(self.c_sigma * (2 - self.c_sigma) * self.mu_eff)
# The 'variance aware' coefficient used for the additive component of the evolution path for rank-1 updates
self.variance_discount_c = torch.sqrt(self.c_c * (2 - self.c_c) * self.mu_eff)
# === Finalize weights ===
# Conditioned on search parameters and raw weights
# Positive weights always sum to 1
positive_weights = positive_weights / torch.sum(positive_weights)
if self.active:
# Active CMA-ES: negative weights sum to alpha
# Get the variance effective selection mass of negative weights
mu_eff_neg = torch.sum(negative_weights).pow(2.0) / torch.sum(negative_weights.pow(2.0))
# Alpha is the minimum of the following 3 terms
alpha_mu = 1 + self.c_1 / self.c_mu
alpha_mu_eff = 1 + 2 * mu_eff_neg / (self.mu_eff + 2)
alpha_pos_def = (1 - self.c_mu - self.c_1) / (d * self.c_mu)
alpha = min([alpha_mu, alpha_mu_eff, alpha_pos_def])
# Rescale negative weights
negative_weights = alpha * negative_weights / torch.sum(torch.abs(negative_weights))
else:
# Negative weights are simply zero
negative_weights = torch.zeros_like(negative_weights)
# Concatenate final weights
self.weights = torch.cat([positive_weights, negative_weights], dim=-1)
# === Some final setup ===
# Initialize the evolution paths
self.p_sigma = 0.0
self.p_c = 0.0
# Hansen's approximation to the expectation of ||x|| x ~ N(0, I_d).
# Note that we could use the exact formulation with Gamma functions, but we'll retain this form for consistency
self.unbiased_expectation = np.sqrt(d) * (1 - (1 / (4 * d)) + 1 / (21 * d**2))
self.last_ex = None
# How often to decompose C
if limit_C_decomposition:
self.decompose_C_freq = max(1, int(np.floor(_safe_divide(1, 10 * d * (self.c_1.cpu() + self.c_mu.cpu())))))
else:
self.decompose_C_freq = 1
# Use the SinglePopulationAlgorithmMixin to enable additional status reports regarding the population.
SinglePopulationAlgorithmMixin.__init__(self)
@property
def population(self) -> SolutionBatch:
"""Population generated by the CMA-ES algorithm"""
return self._population
def _get_center(self) -> torch.Tensor:
"""Get the center of search distribution, m"""
return self.m
def _get_sigma(self) -> float:
"""Get the step-size of the search distribution, sigma"""
return float(self.sigma.cpu())
@property
def obj_index(self) -> int:
"""Index of the objective being focused on"""
return self._obj_index
def sample_distribution(self, num_samples: Optional[int] = None) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""Sample the population. All 3 representations of solutions are returned for easy calculations of updates.
Note that the computation time of this operation of O(d^2 num_samples) unless separable, in which case O(d num_samples)
Args:
num_samples (Optional[int]): The number of samples to draw. If None, then the population size is used
Returns:
zs (torch.Tensor): A tensor of shape [num_samples, d] of samples from the local coordinate space e.g. z_i ~ N(0, I_d)
ys (torch.Tensor): A tensor of shape [num_samples, d] of samples from the shaped coordinate space e.g. y_i ~ N(0, C)
xs (torch.Tensor): A tensor of shape [num_samples, d] of samples from the search space e.g. x_i ~ N(m, sigma^2 C)
"""
if num_samples is None:
num_samples = self.popsize
# Generate z values
zs = self._problem.make_gaussian(num_solutions=num_samples)
# Construct ys = A zs
if self.separable:
# In the separable case A is diagonal so is represented as a single vector
ys = self.A.unsqueeze(0) * zs
else:
ys = (self.A @ zs.T).T
# Construct xs = m + sigma ys
xs = self.m.unsqueeze(0) + self.sigma * ys
return zs, ys, xs
def get_population_weights(self, xs: torch.Tensor) -> torch.Tensor:
"""Get the assigned weights of the population (e.g. evaluate, rank and return)
Args:
xs (torch.Tensor): The population samples drawn from N(mu, sigma^2 C)
Returns:
assigned_weights (torch.Tensor): A [popsize, ] dimensional tensor of ordered weights
"""
# Computation is O(popsize * F_time) where F_time is the evalutation time per sample
# Fill the population
self._population.set_values(xs)
# Evaluate the current population
self.problem.evaluate(self._population)
# Sort the population
indices = self._population.argsort(obj_index=self.obj_index)
# Invert the sorting of the population to obtain the ranks
# Note that these ranks start at zero, but this is fine as we are just using them for indexing
ranks = torch.zeros_like(indices)
ranks[indices] = torch.arange(self.popsize, dtype=indices.dtype, device=indices.device)
# Get weights corresponding to each rank
assigned_weights = self.weights[ranks]
return assigned_weights
def update_m(self, zs: torch.Tensor, ys: torch.Tensor, assigned_weights: torch.Tensor) -> torch.Tensor:
"""Update the center of the search distribution m
With zs and ys retained from sampling, this operation is O(popsize d), as it involves summing across popsize d-dimensional vectors.
Args:
zs (torch.Tensor): A tensor of shape [popsize, d] of samples from the local coordinate space e.g. z_i ~ N(0, I_d)
ys (torch.Tensor): A tensor of shape [popsize, d] of samples from the shaped coordinate space e.g. y_i ~ N(0, C)
assigned_weights (torch.Tensor): A [popsize, ] dimensional tensor of ordered weights
Returns:
local_m_displacement (torch.Tensor): A tensor of shape [d], corresponding to the local transformation of m,
(1/sigma) (C^-1/2) (m' - m) where m' is the updated m
shaped_m_displacement (torch.Tensor): A tensor of shape [d], corresponding to the shaped transformation of m,
(1/sigma) (m' - m) where m' is the updated m
"""
# Get the top-mu weights
top_mu = torch.topk(assigned_weights, k=self.mu)
top_mu_weights = top_mu.values
top_mu_indices = top_mu.indices
# Compute the weighted recombination in local coordinate space
local_m_displacement = torch.sum(top_mu_weights.unsqueeze(-1) * zs[top_mu_indices], dim=0)
# Compute the weighted recombination in shaped coordinate space
shaped_m_displacement = torch.sum(top_mu_weights.unsqueeze(-1) * ys[top_mu_indices], dim=0)
# Update m
self.m = self.m + self.c_m * self.sigma * shaped_m_displacement
# Return the weighted recombinations
return local_m_displacement, shaped_m_displacement
def update_p_sigma(self, local_m_displacement: torch.Tensor) -> None:
"""Update the evolution path for sigma, p_sigma
This operation is bounded O(d), as is simply the sum of vectors
Args:
local_m_displacement (torch.Tensor): The weighted recombination of local samples zs, corresponding to
(1/sigma) (C^-1/2) (m' - m) where m' is the updated m
"""
self.p_sigma = (1 - self.c_sigma) * self.p_sigma + self.variance_discount_sigma * local_m_displacement
def update_sigma(self) -> None:
"""Update the step size sigma according to its evolution path p_sigma
This operation is bounded O(d), with the most expensive component being the norm of the evolution path, a d-dimensional vector.
"""
d = self._problem.solution_length
# Compute the exponential update
if self.csa_squared:
# Exponential update based on natural gradient maximizing squared norm of p_sigma
exponential_update = (torch.norm(self.p_sigma).pow(2.0) / d - 1) / 2
else:
# Exponential update increasing likelihood p_sigma having expected norm
exponential_update = torch.norm(self.p_sigma) / self.unbiased_expectation - 1
# Rescale exponential update based on learning rate + damping factor
exponential_update = (self.c_sigma / self.damp_sigma) * exponential_update
# Multiplicative update to sigma
self.sigma = self.sigma * torch.exp(exponential_update)
def update_p_c(self, shaped_m_displacement: torch.Tensor, h_sig: torch.Tensor) -> None:
"""Update the evolution path for rank-1 update, p_c
This operation is bounded O(d), as is simply the sum of vectors
Args:
local_m_displacement (torch.Tensor): The weighted recombination of shaped samples ys, corresponding to
(1/sigma) (m' - m) where m' is the updated m
h_sig (torch.Tensor): Whether to stall the update based on the evolution path on sigma, p_sigma, expressed as a torch float
"""
self.p_c = (1 - self.c_c) * self.p_c + h_sig * self.variance_discount_c * shaped_m_displacement
def update_C(self, zs: torch.Tensor, ys: torch.Tensor, assigned_weights: torch.Tensor, h_sig: torch.Tensor) -> None:
"""Update the covariance shape matrix C based on rank-1 and rank-mu updates
This operation is bounded O(d^2 popsize), which is associated with computing the rank-mu update (summing across popsize d*d matrices)
Args:
zs (torch.Tensor): A tensor of shape [popsize, d] of samples from the local coordinate space e.g. z_i ~ N(0, I_d)
ys (torch.Tensor): A tensor of shape [popsize, d] of samples from the shaped coordinate space e.g. y_i ~ N(0, C)
assigned_weights (torch.Tensor): A [popsize, ] dimensional tensor of ordered weights
h_sig (torch.Tensor): Whether to stall the update based on the evolution path on sigma, p_sigma, expressed as a torch float
"""
d = self._problem.solution_length
# If using Active CMA-ES, reweight negative weights
if self.active:
assigned_weights = torch.where(
assigned_weights > 0, assigned_weights, d * assigned_weights / torch.norm(zs, dim=-1).pow(2.0)
)
c1a = self.c_1 * (1 - (1 - h_sig**2) * self.c_c * (2 - self.c_c)) # adjust for variance loss
weighted_pc = (self.c_1 / (c1a + 1e-23)) ** 0.5
if self.separable:
# Rank-1 update
r1_update = c1a * (self.p_c.pow(2.0) - self.C)
# Rank-mu update
rmu_update = self.c_mu * torch.sum(
assigned_weights.unsqueeze(-1) * (ys.pow(2.0) - self.C.unsqueeze(0)), dim=0
)
else:
# Rank-1 update
r1_update = c1a * (torch.outer(weighted_pc * self.p_c, weighted_pc * self.p_c) - self.C)
# Rank-mu update
rmu_update = self.c_mu * (
torch.sum(assigned_weights.unsqueeze(-1).unsqueeze(-1) * (ys.unsqueeze(1) * ys.unsqueeze(2)), dim=0)
- torch.sum(self.weights) * self.C
)
# Update C
self.C = self.C + r1_update + rmu_update
def decompose_C(self) -> None:
"""Perform the decomposition C = AA^T using a cholesky decomposition
Note that traditionally CMA-ES uses the eigendecomposition C = BDDB^-1. In our case,
we keep track of zs, ys and xs when sampling, so we never need C^-1/2.
Therefore, a cholesky decomposition is all that is necessary. This generally requires
O(d^3/3) operations, rather than the more costly O(d^3) operations associated with the eigendecomposition.
"""
if self.separable:
self.A = self.C.pow(0.5)
else:
self.A = torch.linalg.cholesky(self.C)
def _step(self):
"""Perform a step of the CMA-ES solver"""
# === Sampling, evaluation and ranking ===
# Sample the search distribution
zs, ys, xs = self.sample_distribution()
# Get the weights assigned to each solution
assigned_weights = self.get_population_weights(xs)
# === Center adaption ===
local_m_displacement, shaped_m_displacement = self.update_m(zs, ys, assigned_weights)
# === Step size adaption ===
# Update evolution path p_sigma
self.update_p_sigma(local_m_displacement)
# Update sigma
self.update_sigma()
# Compute h_sig, a boolean flag for stalling the update to p_c
h_sig = _h_sig(self.p_sigma, self.c_sigma, self._steps_count)
# === Unscaled covariance adapation ===
# Update evolution path p_c
self.update_p_c(shaped_m_displacement, h_sig)
# Update the covariance shape C
self.update_C(zs, ys, assigned_weights, h_sig)
# === Post-step corrections ===
# Limit element-wise standard deviation of sigma^2 C
if self.stdev_min is not None or self.stdev_max is not None:
self.C = _limit_stdev(self.sigma, self.C, self.stdev_min, self.stdev_max)
# Decompose C
if (self._steps_count + 1) % self.decompose_C_freq == 0:
self.decompose_C()
obj_index: int
property
readonly
¶
Index of the objective being focused on
population: SolutionBatch
property
readonly
¶
Population generated by the CMA-ES algorithm
__init__(self, problem, *, stdev_init, popsize=None, center_init=None, c_m=1.0, c_sigma=None, c_sigma_ratio=1.0, damp_sigma=None, damp_sigma_ratio=1.0, c_c=None, c_c_ratio=1.0, c_1=None, c_1_ratio=1.0, c_mu=None, c_mu_ratio=1.0, active=True, csa_squared=False, stdev_min=None, stdev_max=None, separable=False, limit_C_decomposition=True, obj_index=None)
special
¶
__init__(...)
: Initialize the CMAES solver.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object which is being worked on. |
required |
stdev_init |
Real |
Initial step-size |
required |
popsize |
Optional[int] |
Population size. Can be specified as an int, or can be left as None in which case the CMA-ES rule of thumb is applied: popsize = 4 + floor(3 log d) where d is the dimension |
None |
center_init |
Union[Iterable[float], torch.Tensor] |
Initial center point of the search distribution.
Can be given as a Solution or as a 1-D array.
If left as None, an initial center point is generated
with the help of the problem object's |
None |
c_m |
Real |
Learning rate for updating the mean of the search distribution. By default the value is 1. |
1.0 |
c_sigma |
Optional[Real] |
Learning rate for updating the step size. If None, then the CMA-ES rules of thumb will be applied. |
None |
c_sigma_ratio |
Real |
Multiplier on the learning rate for the step size. if c_sigma has been left as None, can be used to rescale the default c_sigma value. |
1.0 |
damp_sigma |
Optional[Real] |
Damping factor for updating the step size. If None, then the CMA-ES rules of thumb will be applied. |
None |
damp_sigma_ratio |
Real |
Multiplier on the damping factor for the step size. if damp_sigma has been left as None, can be used to rescale the default damp_sigma value. |
1.0 |
c_c |
Optional[Real] |
Learning rate for updating the rank-1 evolution path. If None, then the CMA-ES rules of thumb will be applied. |
None |
c_c_ratio |
Real |
Multiplier on the learning rate for the rank-1 evolution path. if c_c has been left as None, can be used to rescale the default c_c value. |
1.0 |
c_1 |
Optional[Real] |
Learning rate for the rank-1 update to the covariance matrix. If None, then the CMA-ES rules of thumb will be applied. |
None |
c_1_ratio |
Real |
Multiplier on the learning rate for the rank-1 update to the covariance matrix. if c_1 has been left as None, can be used to rescale the default c_1 value. |
1.0 |
c_mu |
Optional[Real] |
Learning rate for the rank-mu update to the covariance matrix. If None, then the CMA-ES rules of thumb will be applied. |
None |
c_mu_ratio |
Real |
Multiplier on the learning rate for the rank-mu update to the covariance matrix. if c_mu has been left as None, can be used to rescale the default c_mu value. |
1.0 |
active |
bool |
Whether to use Active CMA-ES. Defaults to True, consistent with the tutorial paper and pycma. |
True |
csa_squared |
bool |
Whether to use the squared rule ("CSA_squared" in pycma) for the step-size adapation. This effectively corresponds to taking the natural gradient for the evolution path on the step size, rather than the default CMA-ES rule of thumb. |
False |
stdev_min |
Optional[Real] |
Minimum allowed standard deviation of the search distribution. Leaving this as None means that no such boundary is to be used. Can be given as None or as a scalar. |
None |
stdev_max |
Optional[Real] |
Maximum allowed standard deviation of the search distribution. Leaving this as None means that no such boundary is to be used. Can be given as None or as a scalar. |
None |
separable |
bool |
Provide this as True if you would like the problem
to be treated as a separable one. Treating a problem
as separable means to adapt only the diagonal parts
of the covariance matrix and to keep the non-diagonal
parts 0. High dimensional problems result in large
covariance matrices on which operating is computationally
expensive. Therefore, for such high dimensional problems,
setting |
False |
limit_C_decomposition |
bool |
Whether to limit the frequency of decomposition of the shape matrix C Setting this to True (default) means that C will not be decomposed every generation This degrades the quality of the sampling and updates, but provides a guarantee of O(d^2) time complexity. This option can be used with separable=True (e.g. for experimental reasons) but the performance will only degrade without time-complexity benefits. |
True |
obj_index |
Optional[int] |
Objective index according to which evaluation of the solution will be done. |
None |
Source code in evotorch/algorithms/cmaes.py
def __init__(
self,
problem: Problem,
*,
stdev_init: Real,
popsize: Optional[int] = None,
center_init: Optional[Vector] = None,
c_m: Real = 1.0,
c_sigma: Optional[Real] = None,
c_sigma_ratio: Real = 1.0,
damp_sigma: Optional[Real] = None,
damp_sigma_ratio: Real = 1.0,
c_c: Optional[Real] = None,
c_c_ratio: Real = 1.0,
c_1: Optional[Real] = None,
c_1_ratio: Real = 1.0,
c_mu: Optional[Real] = None,
c_mu_ratio: Real = 1.0,
active: bool = True,
csa_squared: bool = False,
stdev_min: Optional[Real] = None,
stdev_max: Optional[Real] = None,
separable: bool = False,
limit_C_decomposition: bool = True,
obj_index: Optional[int] = None,
):
"""
`__init__(...)`: Initialize the CMAES solver.
Args:
problem (Problem): The problem object which is being worked on.
stdev_init (Real): Initial step-size
popsize: Population size. Can be specified as an int,
or can be left as None in which case the CMA-ES rule of thumb is applied:
popsize = 4 + floor(3 log d) where d is the dimension
center_init: Initial center point of the search distribution.
Can be given as a Solution or as a 1-D array.
If left as None, an initial center point is generated
with the help of the problem object's `generate_values(...)`
method.
c_m (Real): Learning rate for updating the mean
of the search distribution. By default the value is 1.
c_sigma (Optional[Real]): Learning rate for updating the step size. If None,
then the CMA-ES rules of thumb will be applied.
c_sigma_ratio (Real): Multiplier on the learning rate for the step size.
if c_sigma has been left as None, can be used to rescale the default c_sigma value.
damp_sigma (Optional[Real]): Damping factor for updating the step size. If None,
then the CMA-ES rules of thumb will be applied.
damp_sigma_ratio (Real): Multiplier on the damping factor for the step size.
if damp_sigma has been left as None, can be used to rescale the default damp_sigma value.
c_c (Optional[Real]): Learning rate for updating the rank-1 evolution path.
If None, then the CMA-ES rules of thumb will be applied.
c_c_ratio (Real): Multiplier on the learning rate for the rank-1 evolution path.
if c_c has been left as None, can be used to rescale the default c_c value.
c_1 (Optional[Real]): Learning rate for the rank-1 update to the covariance matrix.
If None, then the CMA-ES rules of thumb will be applied.
c_1_ratio (Real): Multiplier on the learning rate for the rank-1 update to the covariance matrix.
if c_1 has been left as None, can be used to rescale the default c_1 value.
c_mu (Optional[Real]): Learning rate for the rank-mu update to the covariance matrix.
If None, then the CMA-ES rules of thumb will be applied.
c_mu_ratio (Real): Multiplier on the learning rate for the rank-mu update to the covariance matrix.
if c_mu has been left as None, can be used to rescale the default c_mu value.
active (bool): Whether to use Active CMA-ES. Defaults to True, consistent with the tutorial paper and pycma.
csa_squared (bool): Whether to use the squared rule ("CSA_squared" in pycma) for the step-size adapation.
This effectively corresponds to taking the natural gradient for the evolution path on the step size,
rather than the default CMA-ES rule of thumb.
stdev_min (Optional[Real]): Minimum allowed standard deviation of the search
distribution. Leaving this as None means that no such
boundary is to be used.
Can be given as None or as a scalar.
stdev_max (Optional[Real]): Maximum allowed standard deviation of the search
distribution. Leaving this as None means that no such
boundary is to be used.
Can be given as None or as a scalar.
separable (bool): Provide this as True if you would like the problem
to be treated as a separable one. Treating a problem
as separable means to adapt only the diagonal parts
of the covariance matrix and to keep the non-diagonal
parts 0. High dimensional problems result in large
covariance matrices on which operating is computationally
expensive. Therefore, for such high dimensional problems,
setting `separable` as True might be useful.
limit_C_decomposition (bool): Whether to limit the frequency of decomposition of the shape matrix C
Setting this to True (default) means that C will not be decomposed every generation
This degrades the quality of the sampling and updates, but provides a guarantee of O(d^2) time complexity.
This option can be used with separable=True (e.g. for experimental reasons) but the performance will only degrade
without time-complexity benefits.
obj_index (Optional[int]): Objective index according to which evaluation
of the solution will be done.
"""
# Initialize the base class
SearchAlgorithm.__init__(self, problem, center=self._get_center, stepsize=self._get_sigma)
# Ensure that the problem is numeric
problem.ensure_numeric()
# Store the objective index
self._obj_index = problem.normalize_obj_index(obj_index)
# Track d = solution length for reference in initialization of hyperparameters
d = self._problem.solution_length
# === Initialize population ===
if not popsize:
# Default value used in CMA-ES literature 4 + floor(3 log n)
popsize = 4 + int(np.floor(3 * np.log(d)))
self.popsize = int(popsize)
# Half popsize, referred to as mu in CMA-ES literature
self.mu = int(np.floor(popsize / 2))
self._population = problem.generate_batch(popsize=popsize)
# === Initialize search distribution ===
self.separable = separable
# If `center_init` is not given, generate an initial solution
# with the help of the problem object.
# If it is given as a Solution, then clone the solution's values
# as a PyTorch tensor.
# Otherwise, use the given initial solution as the starting
# point in the search space.
if center_init is None:
center_init = self._problem.generate_values(1)
elif isinstance(center_init, Solution):
center_init = center_init.values.clone()
# Store the center
self.m = self._problem.make_tensor(center_init)
# Store the initial step size
self.sigma = self._problem.make_tensor(stdev_init)
if separable:
# Initialize C as the diagonal vector. Note that when separable, the eigendecomposition is not needed
self.C = self._problem.make_ones(d)
# In this case A is simply the square root of elements of C
self.A = self._problem.make_ones(d)
else:
# Initialize C = AA^T all diagonal.
self.C = self._problem.make_I(d)
self.A = self.C.clone()
# === Initialize raw weights ===
# Conditioned on popsize
# w_i = log((lambda + 1) / 2) - log(i) for i = 1 ... lambda
raw_weights = self.problem.make_tensor(np.log((popsize + 1) / 2) - torch.log(torch.arange(popsize) + 1))
# positive valued weights are the first mu
positive_weights = raw_weights[: self.mu]
negative_weights = raw_weights[self.mu :]
# Variance effective selection mass of positive weights
# Not affected by future updates to raw_weights
self.mu_eff = torch.sum(positive_weights).pow(2.0) / torch.sum(positive_weights.pow(2.0))
# === Initialize search parameters ===
# Conditioned on weights
# Store fixed information
self.c_m = c_m
self.active = active
self.csa_squared = csa_squared
self.stdev_min = stdev_min
self.stdev_max = stdev_max
# Learning rate for step-size adaption
if c_sigma is None:
c_sigma = (self.mu_eff + 2.0) / (d + self.mu_eff + 3)
self.c_sigma = c_sigma_ratio * c_sigma
# Damping factor for step-size adapation
if damp_sigma is None:
damp_sigma = 1 + 2 * max(0, torch.sqrt((self.mu_eff - 1) / (d + 1)) - 1) + self.c_sigma
self.damp_sigma = damp_sigma_ratio * damp_sigma
# Learning rate for evolution path for rank-1 update
if c_c is None:
# Branches on separability
if separable:
c_c = (1 + (1 / d) + (self.mu_eff / d)) / (d**0.5 + (1 / d) + 2 * (self.mu_eff / d))
else:
c_c = (4 + self.mu_eff / d) / (d + (4 + 2 * self.mu_eff / d))
self.c_c = c_c_ratio * c_c
# Learning rate for rank-1 update to covariance matrix
if c_1 is None:
# Branches on separability
if separable:
c_1 = 1.0 / (d + 2.0 * np.sqrt(d) + self.mu_eff / d)
else:
c_1 = min(1, popsize / 6) * 2 / ((d + 1.3) ** 2.0 + self.mu_eff)
self.c_1 = c_1_ratio * c_1
# Learning rate for rank-mu update to covariance matrix
if c_mu is None:
# Branches on separability
if separable:
c_mu = (0.25 + self.mu_eff + (1.0 / self.mu_eff) - 2) / (d + 4 * np.sqrt(d) + (self.mu_eff / 2.0))
else:
c_mu = min(
1 - self.c_1, 2 * ((0.25 + self.mu_eff - 2 + (1 / self.mu_eff)) / ((d + 2) ** 2.0 + self.mu_eff))
)
self.c_mu = c_mu_ratio * c_mu
# The 'variance aware' coefficient used for the additive component of the evolution path for sigma
self.variance_discount_sigma = torch.sqrt(self.c_sigma * (2 - self.c_sigma) * self.mu_eff)
# The 'variance aware' coefficient used for the additive component of the evolution path for rank-1 updates
self.variance_discount_c = torch.sqrt(self.c_c * (2 - self.c_c) * self.mu_eff)
# === Finalize weights ===
# Conditioned on search parameters and raw weights
# Positive weights always sum to 1
positive_weights = positive_weights / torch.sum(positive_weights)
if self.active:
# Active CMA-ES: negative weights sum to alpha
# Get the variance effective selection mass of negative weights
mu_eff_neg = torch.sum(negative_weights).pow(2.0) / torch.sum(negative_weights.pow(2.0))
# Alpha is the minimum of the following 3 terms
alpha_mu = 1 + self.c_1 / self.c_mu
alpha_mu_eff = 1 + 2 * mu_eff_neg / (self.mu_eff + 2)
alpha_pos_def = (1 - self.c_mu - self.c_1) / (d * self.c_mu)
alpha = min([alpha_mu, alpha_mu_eff, alpha_pos_def])
# Rescale negative weights
negative_weights = alpha * negative_weights / torch.sum(torch.abs(negative_weights))
else:
# Negative weights are simply zero
negative_weights = torch.zeros_like(negative_weights)
# Concatenate final weights
self.weights = torch.cat([positive_weights, negative_weights], dim=-1)
# === Some final setup ===
# Initialize the evolution paths
self.p_sigma = 0.0
self.p_c = 0.0
# Hansen's approximation to the expectation of ||x|| x ~ N(0, I_d).
# Note that we could use the exact formulation with Gamma functions, but we'll retain this form for consistency
self.unbiased_expectation = np.sqrt(d) * (1 - (1 / (4 * d)) + 1 / (21 * d**2))
self.last_ex = None
# How often to decompose C
if limit_C_decomposition:
self.decompose_C_freq = max(1, int(np.floor(_safe_divide(1, 10 * d * (self.c_1.cpu() + self.c_mu.cpu())))))
else:
self.decompose_C_freq = 1
# Use the SinglePopulationAlgorithmMixin to enable additional status reports regarding the population.
SinglePopulationAlgorithmMixin.__init__(self)
decompose_C(self)
¶
Perform the decomposition C = AA^T using a cholesky decomposition Note that traditionally CMA-ES uses the eigendecomposition C = BDDB^-1. In our case, we keep track of zs, ys and xs when sampling, so we never need C^-½. Therefore, a cholesky decomposition is all that is necessary. This generally requires O(d^3/3) operations, rather than the more costly O(d^3) operations associated with the eigendecomposition.
Source code in evotorch/algorithms/cmaes.py
def decompose_C(self) -> None:
"""Perform the decomposition C = AA^T using a cholesky decomposition
Note that traditionally CMA-ES uses the eigendecomposition C = BDDB^-1. In our case,
we keep track of zs, ys and xs when sampling, so we never need C^-1/2.
Therefore, a cholesky decomposition is all that is necessary. This generally requires
O(d^3/3) operations, rather than the more costly O(d^3) operations associated with the eigendecomposition.
"""
if self.separable:
self.A = self.C.pow(0.5)
else:
self.A = torch.linalg.cholesky(self.C)
get_population_weights(self, xs)
¶
Get the assigned weights of the population (e.g. evaluate, rank and return)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
xs |
torch.Tensor |
The population samples drawn from N(mu, sigma^2 C) |
required |
Returns:
Type | Description |
---|---|
assigned_weights (torch.Tensor) |
A [popsize, ] dimensional tensor of ordered weights |
Source code in evotorch/algorithms/cmaes.py
def get_population_weights(self, xs: torch.Tensor) -> torch.Tensor:
"""Get the assigned weights of the population (e.g. evaluate, rank and return)
Args:
xs (torch.Tensor): The population samples drawn from N(mu, sigma^2 C)
Returns:
assigned_weights (torch.Tensor): A [popsize, ] dimensional tensor of ordered weights
"""
# Computation is O(popsize * F_time) where F_time is the evalutation time per sample
# Fill the population
self._population.set_values(xs)
# Evaluate the current population
self.problem.evaluate(self._population)
# Sort the population
indices = self._population.argsort(obj_index=self.obj_index)
# Invert the sorting of the population to obtain the ranks
# Note that these ranks start at zero, but this is fine as we are just using them for indexing
ranks = torch.zeros_like(indices)
ranks[indices] = torch.arange(self.popsize, dtype=indices.dtype, device=indices.device)
# Get weights corresponding to each rank
assigned_weights = self.weights[ranks]
return assigned_weights
sample_distribution(self, num_samples=None)
¶
Sample the population. All 3 representations of solutions are returned for easy calculations of updates. Note that the computation time of this operation of O(d^2 num_samples) unless separable, in which case O(d num_samples)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
num_samples |
Optional[int] |
The number of samples to draw. If None, then the population size is used |
None |
Returns:
Type | Description |
---|---|
zs (torch.Tensor) |
A tensor of shape [num_samples, d] of samples from the local coordinate space e.g. z_i ~ N(0, I_d) ys (torch.Tensor): A tensor of shape [num_samples, d] of samples from the shaped coordinate space e.g. y_i ~ N(0, C) xs (torch.Tensor): A tensor of shape [num_samples, d] of samples from the search space e.g. x_i ~ N(m, sigma^2 C) |
Source code in evotorch/algorithms/cmaes.py
def sample_distribution(self, num_samples: Optional[int] = None) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""Sample the population. All 3 representations of solutions are returned for easy calculations of updates.
Note that the computation time of this operation of O(d^2 num_samples) unless separable, in which case O(d num_samples)
Args:
num_samples (Optional[int]): The number of samples to draw. If None, then the population size is used
Returns:
zs (torch.Tensor): A tensor of shape [num_samples, d] of samples from the local coordinate space e.g. z_i ~ N(0, I_d)
ys (torch.Tensor): A tensor of shape [num_samples, d] of samples from the shaped coordinate space e.g. y_i ~ N(0, C)
xs (torch.Tensor): A tensor of shape [num_samples, d] of samples from the search space e.g. x_i ~ N(m, sigma^2 C)
"""
if num_samples is None:
num_samples = self.popsize
# Generate z values
zs = self._problem.make_gaussian(num_solutions=num_samples)
# Construct ys = A zs
if self.separable:
# In the separable case A is diagonal so is represented as a single vector
ys = self.A.unsqueeze(0) * zs
else:
ys = (self.A @ zs.T).T
# Construct xs = m + sigma ys
xs = self.m.unsqueeze(0) + self.sigma * ys
return zs, ys, xs
update_C(self, zs, ys, assigned_weights, h_sig)
¶
Update the covariance shape matrix C based on rank-1 and rank-mu updates This operation is bounded O(d^2 popsize), which is associated with computing the rank-mu update (summing across popsize d*d matrices)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
zs |
torch.Tensor |
A tensor of shape [popsize, d] of samples from the local coordinate space e.g. z_i ~ N(0, I_d) |
required |
ys |
torch.Tensor |
A tensor of shape [popsize, d] of samples from the shaped coordinate space e.g. y_i ~ N(0, C) |
required |
assigned_weights |
torch.Tensor |
A [popsize, ] dimensional tensor of ordered weights |
required |
h_sig |
torch.Tensor |
Whether to stall the update based on the evolution path on sigma, p_sigma, expressed as a torch float |
required |
Source code in evotorch/algorithms/cmaes.py
def update_C(self, zs: torch.Tensor, ys: torch.Tensor, assigned_weights: torch.Tensor, h_sig: torch.Tensor) -> None:
"""Update the covariance shape matrix C based on rank-1 and rank-mu updates
This operation is bounded O(d^2 popsize), which is associated with computing the rank-mu update (summing across popsize d*d matrices)
Args:
zs (torch.Tensor): A tensor of shape [popsize, d] of samples from the local coordinate space e.g. z_i ~ N(0, I_d)
ys (torch.Tensor): A tensor of shape [popsize, d] of samples from the shaped coordinate space e.g. y_i ~ N(0, C)
assigned_weights (torch.Tensor): A [popsize, ] dimensional tensor of ordered weights
h_sig (torch.Tensor): Whether to stall the update based on the evolution path on sigma, p_sigma, expressed as a torch float
"""
d = self._problem.solution_length
# If using Active CMA-ES, reweight negative weights
if self.active:
assigned_weights = torch.where(
assigned_weights > 0, assigned_weights, d * assigned_weights / torch.norm(zs, dim=-1).pow(2.0)
)
c1a = self.c_1 * (1 - (1 - h_sig**2) * self.c_c * (2 - self.c_c)) # adjust for variance loss
weighted_pc = (self.c_1 / (c1a + 1e-23)) ** 0.5
if self.separable:
# Rank-1 update
r1_update = c1a * (self.p_c.pow(2.0) - self.C)
# Rank-mu update
rmu_update = self.c_mu * torch.sum(
assigned_weights.unsqueeze(-1) * (ys.pow(2.0) - self.C.unsqueeze(0)), dim=0
)
else:
# Rank-1 update
r1_update = c1a * (torch.outer(weighted_pc * self.p_c, weighted_pc * self.p_c) - self.C)
# Rank-mu update
rmu_update = self.c_mu * (
torch.sum(assigned_weights.unsqueeze(-1).unsqueeze(-1) * (ys.unsqueeze(1) * ys.unsqueeze(2)), dim=0)
- torch.sum(self.weights) * self.C
)
# Update C
self.C = self.C + r1_update + rmu_update
update_m(self, zs, ys, assigned_weights)
¶
Update the center of the search distribution m With zs and ys retained from sampling, this operation is O(popsize d), as it involves summing across popsize d-dimensional vectors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
zs |
torch.Tensor |
A tensor of shape [popsize, d] of samples from the local coordinate space e.g. z_i ~ N(0, I_d) |
required |
ys |
torch.Tensor |
A tensor of shape [popsize, d] of samples from the shaped coordinate space e.g. y_i ~ N(0, C) |
required |
assigned_weights |
torch.Tensor |
A [popsize, ] dimensional tensor of ordered weights |
required |
Returns:
Type | Description |
---|---|
local_m_displacement (torch.Tensor) |
A tensor of shape [d], corresponding to the local transformation of m, (1/sigma) (C^-½) (m' - m) where m' is the updated m shaped_m_displacement (torch.Tensor): A tensor of shape [d], corresponding to the shaped transformation of m, (1/sigma) (m' - m) where m' is the updated m |
Source code in evotorch/algorithms/cmaes.py
def update_m(self, zs: torch.Tensor, ys: torch.Tensor, assigned_weights: torch.Tensor) -> torch.Tensor:
"""Update the center of the search distribution m
With zs and ys retained from sampling, this operation is O(popsize d), as it involves summing across popsize d-dimensional vectors.
Args:
zs (torch.Tensor): A tensor of shape [popsize, d] of samples from the local coordinate space e.g. z_i ~ N(0, I_d)
ys (torch.Tensor): A tensor of shape [popsize, d] of samples from the shaped coordinate space e.g. y_i ~ N(0, C)
assigned_weights (torch.Tensor): A [popsize, ] dimensional tensor of ordered weights
Returns:
local_m_displacement (torch.Tensor): A tensor of shape [d], corresponding to the local transformation of m,
(1/sigma) (C^-1/2) (m' - m) where m' is the updated m
shaped_m_displacement (torch.Tensor): A tensor of shape [d], corresponding to the shaped transformation of m,
(1/sigma) (m' - m) where m' is the updated m
"""
# Get the top-mu weights
top_mu = torch.topk(assigned_weights, k=self.mu)
top_mu_weights = top_mu.values
top_mu_indices = top_mu.indices
# Compute the weighted recombination in local coordinate space
local_m_displacement = torch.sum(top_mu_weights.unsqueeze(-1) * zs[top_mu_indices], dim=0)
# Compute the weighted recombination in shaped coordinate space
shaped_m_displacement = torch.sum(top_mu_weights.unsqueeze(-1) * ys[top_mu_indices], dim=0)
# Update m
self.m = self.m + self.c_m * self.sigma * shaped_m_displacement
# Return the weighted recombinations
return local_m_displacement, shaped_m_displacement
update_p_c(self, shaped_m_displacement, h_sig)
¶
Update the evolution path for rank-1 update, p_c This operation is bounded O(d), as is simply the sum of vectors
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_m_displacement |
torch.Tensor |
The weighted recombination of shaped samples ys, corresponding to (1/sigma) (m' - m) where m' is the updated m |
required |
h_sig |
torch.Tensor |
Whether to stall the update based on the evolution path on sigma, p_sigma, expressed as a torch float |
required |
Source code in evotorch/algorithms/cmaes.py
def update_p_c(self, shaped_m_displacement: torch.Tensor, h_sig: torch.Tensor) -> None:
"""Update the evolution path for rank-1 update, p_c
This operation is bounded O(d), as is simply the sum of vectors
Args:
local_m_displacement (torch.Tensor): The weighted recombination of shaped samples ys, corresponding to
(1/sigma) (m' - m) where m' is the updated m
h_sig (torch.Tensor): Whether to stall the update based on the evolution path on sigma, p_sigma, expressed as a torch float
"""
self.p_c = (1 - self.c_c) * self.p_c + h_sig * self.variance_discount_c * shaped_m_displacement
update_p_sigma(self, local_m_displacement)
¶
Update the evolution path for sigma, p_sigma This operation is bounded O(d), as is simply the sum of vectors
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_m_displacement |
torch.Tensor |
The weighted recombination of local samples zs, corresponding to (1/sigma) (C^-½) (m' - m) where m' is the updated m |
required |
Source code in evotorch/algorithms/cmaes.py
def update_p_sigma(self, local_m_displacement: torch.Tensor) -> None:
"""Update the evolution path for sigma, p_sigma
This operation is bounded O(d), as is simply the sum of vectors
Args:
local_m_displacement (torch.Tensor): The weighted recombination of local samples zs, corresponding to
(1/sigma) (C^-1/2) (m' - m) where m' is the updated m
"""
self.p_sigma = (1 - self.c_sigma) * self.p_sigma + self.variance_discount_sigma * local_m_displacement
update_sigma(self)
¶
Update the step size sigma according to its evolution path p_sigma This operation is bounded O(d), with the most expensive component being the norm of the evolution path, a d-dimensional vector.
Source code in evotorch/algorithms/cmaes.py
def update_sigma(self) -> None:
"""Update the step size sigma according to its evolution path p_sigma
This operation is bounded O(d), with the most expensive component being the norm of the evolution path, a d-dimensional vector.
"""
d = self._problem.solution_length
# Compute the exponential update
if self.csa_squared:
# Exponential update based on natural gradient maximizing squared norm of p_sigma
exponential_update = (torch.norm(self.p_sigma).pow(2.0) / d - 1) / 2
else:
# Exponential update increasing likelihood p_sigma having expected norm
exponential_update = torch.norm(self.p_sigma) / self.unbiased_expectation - 1
# Rescale exponential update based on learning rate + damping factor
exponential_update = (self.c_sigma / self.damp_sigma) * exponential_update
# Multiplicative update to sigma
self.sigma = self.sigma * torch.exp(exponential_update)
distributed
special
¶
gaussian
¶
CEM (GaussianSearchAlgorithm)
¶
The cross-entropy method (CEM) (Rubinstein, 1999).
This CEM implementation is focused on continuous optimization, and follows the variant explained in Duan et al. (2016).
The adaptive population size mechanism explained in Toklu et al. (2020)
(and previously used in the accompanying source code of the study
Salimans et al. (2017)) is supported, where the population size in an
iteration keeps increasing until a certain numberof interactions with
the simulator of the reinforcement learning environment is made.
See the initialization arguments num_interactions
, popsize_max
.
References:
Rubinstein, R. (1999). The cross-entropy method for combinatorial
and continuous optimization.
Methodology and computing in applied probability, 1(2), 127-190.
Duan, Y., Chen, X., Houthooft, R., Schulman, J., Abbeel, P. (2016).
Benchmarking deep reinforcement learning for continuous control.
International conference on machine learning. PMLR, 2016.
Salimans, T., Ho, J., Chen, X., Sidor, S. and Sutskever, I. (2017).
Evolution Strategies as a Scalable Alternative to
Reinforcement Learning.
Toklu, N.E., Liskowski, P., Srivastava, R.K. (2020).
ClipUp: A Simple and Powerful Optimizer
for Distribution-based Policy Evolution.
Parallel Problem Solving from Nature (PPSN 2020).
Source code in evotorch/algorithms/distributed/gaussian.py
class CEM(GaussianSearchAlgorithm):
"""
The cross-entropy method (CEM) (Rubinstein, 1999).
This CEM implementation is focused on continuous optimization,
and follows the variant explained in Duan et al. (2016).
The adaptive population size mechanism explained in Toklu et al. (2020)
(and previously used in the accompanying source code of the study
Salimans et al. (2017)) is supported, where the population size in an
iteration keeps increasing until a certain numberof interactions with
the simulator of the reinforcement learning environment is made.
See the initialization arguments `num_interactions`, `popsize_max`.
References:
Rubinstein, R. (1999). The cross-entropy method for combinatorial
and continuous optimization.
Methodology and computing in applied probability, 1(2), 127-190.
Duan, Y., Chen, X., Houthooft, R., Schulman, J., Abbeel, P. (2016).
Benchmarking deep reinforcement learning for continuous control.
International conference on machine learning. PMLR, 2016.
Salimans, T., Ho, J., Chen, X., Sidor, S. and Sutskever, I. (2017).
Evolution Strategies as a Scalable Alternative to
Reinforcement Learning.
Toklu, N.E., Liskowski, P., Srivastava, R.K. (2020).
ClipUp: A Simple and Powerful Optimizer
for Distribution-based Policy Evolution.
Parallel Problem Solving from Nature (PPSN 2020).
"""
DISTRIBUTION_TYPE = SeparableGaussian
DISTRIBUTION_PARAMS = NotImplemented # To be filled by the CEM instance
def __init__(
self,
problem: Problem,
*,
popsize: int,
parenthood_ratio: float,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[Union[float, RealOrVector]] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the search algorithm.
Args:
problem: The problem object to work on.
popsize: The population size.
parenthood_ratio: Expected as a float larger than 0 and smaller
than 1. For example, setting this value to 0.1 means that
the top 10% of the population will be declared as the parents,
and those parents will be used for updating the population.
The amount of parents is always computed according to the
specified `popsize`, not according to the adapted population
size, and not according to `popsize_max`.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
center_init: The initial center solution.
Can be left as None.
stdev_min: The minimum value for the standard deviation
values of the Gaussian search distribution.
Can be left as None (which is the default),
or can be given as a scalar or as a 1-dimensional array.
stdev_max: The maximum value for the standard deviation
values of the Gaussian search distribution.
Can be left as None (which is the default),
or can be given as a scalar or as a 1-dimensional array.
stdev_max_change: The maximum update ratio allowed on the
standard deviation. Expected as None if no such limiter
is needed, or as a real number within 0.0 and 1.0 otherwise.
In the PGPE implementation of Ha (2017, 2018), a value of
0.2 (20%) was used.
For this CEM implementation, the default is None.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
self.DISTRIBUTION_PARAMS = {"parenthood_ratio": float(parenthood_ratio)}
super().__init__(
problem,
popsize=popsize,
center_learning_rate=1.0,
stdev_learning_rate=1.0,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=None,
optimizer_config=None,
ranking_method=None,
center_init=center_init,
stdev_min=stdev_min,
stdev_max=stdev_max,
stdev_max_change=stdev_max_change,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
)
DISTRIBUTION_TYPE (Distribution)
¶
Separable Multivariate Gaussian, as used by PGPE
Source code in evotorch/algorithms/distributed/gaussian.py
class SeparableGaussian(Distribution):
"""Separable Multivariate Gaussian, as used by PGPE"""
MANDATORY_PARAMETERS = {"mu", "sigma"}
OPTIONAL_PARAMETERS = {"divide_mu_grad_by", "divide_sigma_grad_by", "parenthood_ratio"}
def __init__(
self,
parameters: dict,
*,
solution_length: Optional[int] = None,
device: Optional[Device] = None,
dtype: Optional[DType] = None,
):
[mu_length] = parameters["mu"].shape
[sigma_length] = parameters["sigma"].shape
if solution_length is None:
solution_length = mu_length
else:
if solution_length != mu_length:
raise ValueError(
f"The argument `solution_length` does not match the length of `mu` provided in `parameters`."
f" solution_length={solution_length},"
f' parameters["mu"]={mu_length}.'
)
if mu_length != sigma_length:
raise ValueError(
f"The tensors `mu` and `sigma` provided within `parameters` have mismatching lengths."
f' parameters["mu"]={mu_length},'
f' parameters["sigma"]={sigma_length}.'
)
super().__init__(
solution_length=solution_length,
parameters=parameters,
device=device,
dtype=dtype,
)
@property
def mu(self) -> torch.Tensor:
return self.parameters["mu"]
@mu.setter
def mu(self, new_mu: Iterable):
self.parameters["mu"] = torch.as_tensor(new_mu, dtype=self.dtype, device=self.device)
@property
def sigma(self) -> torch.Tensor:
return self.parameters["sigma"]
@sigma.setter
def sigma(self, new_sigma: Iterable):
self.parameters["sigma"] = torch.as_tensor(new_sigma, dtype=self.dtype, device=self.device)
def _fill(self, out: torch.Tensor, *, generator: Optional[torch.Generator] = None):
self.make_gaussian(out=out, center=self.mu, stdev=self.sigma, generator=generator)
def _divide_grad(self, param_name: str, grad: torch.Tensor, weights: torch.Tensor) -> torch.Tensor:
option = f"divide_{param_name}_grad_by"
if option in self.parameters:
div_by_what = self.parameters[option]
if div_by_what == "num_solutions":
[num_solutions] = weights.shape
grad = grad / num_solutions
elif div_by_what == "num_directions":
[num_solutions] = weights.shape
num_directions = num_solutions // 2
grad = grad / num_directions
elif div_by_what == "total_weight":
total_weight = torch.sum(torch.abs(weights))
grad = grad / total_weight
elif div_by_what == "weight_stdev":
weight_stdev = torch.std(weights)
grad = grad / weight_stdev
else:
raise ValueError(f"The parameter {option} has an unrecognized value: {div_by_what}")
return grad
def _compute_gradients_via_parenthood_ratio(self, samples: torch.Tensor, weights: torch.Tensor) -> dict:
[num_samples, _] = samples.shape
num_elites = math.floor(num_samples * self.parameters["parenthood_ratio"])
elite_indices = weights.argsort(descending=True)[:num_elites]
elites = samples[elite_indices, :]
return {
"mu": torch.mean(elites, dim=0) - self.parameters["mu"],
"sigma": torch.std(elites, dim=0) - self.parameters["sigma"],
}
def _compute_gradients(self, samples: torch.Tensor, weights: torch.Tensor, ranking_used: Optional[str]) -> dict:
if "parenthood_ratio" in self.parameters:
return self._compute_gradients_via_parenthood_ratio(samples, weights)
else:
mu = self.mu
sigma = self.sigma
# Compute the scaled noises, that is, the noise vectors which
# were used for generating the solutions
# (solution = scaled_noise + center)
scaled_noises = samples - mu
# Make sure that the weights (utilities) are 0-centered
# (Otherwise the formulations would have to consider a bias term)
if ranking_used not in ("centered", "normalized"):
weights = weights - torch.mean(weights)
mu_grad = self._divide_grad(
"mu",
total(dot(weights, scaled_noises)),
weights,
)
sigma_grad = self._divide_grad(
"sigma",
total(dot(weights, ((scaled_noises**2) - (sigma**2)) / sigma)),
weights,
)
return {
"mu": mu_grad,
"sigma": sigma_grad,
}
def update_parameters(
self,
gradients: dict,
*,
learning_rates: Optional[dict] = None,
optimizers: Optional[dict] = None,
) -> "SeparableGaussian":
mu_grad = gradients["mu"]
sigma_grad = gradients["sigma"]
new_mu = self.mu + self._follow_gradient("mu", mu_grad, learning_rates=learning_rates, optimizers=optimizers)
new_sigma = self.sigma + self._follow_gradient(
"sigma", sigma_grad, learning_rates=learning_rates, optimizers=optimizers
)
return self.modified_copy(mu=new_mu, sigma=new_sigma)
def relative_entropy(dist_0: "SeparableGaussian", dist_1: "SeparableGaussian") -> float:
mu_0 = dist_0.parameters["mu"]
mu_1 = dist_1.parameters["mu"]
sigma_0 = dist_0.parameters["sigma"]
sigma_1 = dist_1.parameters["sigma"]
cov_0 = sigma_0.pow(2.0)
cov_1 = sigma_1.pow(2.0)
mu_delta = mu_1 - mu_0
trace_cov = torch.sum(cov_0 / cov_1)
k = dist_0.solution_length
scaled_mu = torch.sum(mu_delta.pow(2.0) / cov_1)
log_det = torch.sum(torch.log(cov_1)) - torch.sum(torch.log(cov_0))
return 0.5 * (trace_cov - k + scaled_mu + log_det)
update_parameters(self, gradients, *, learning_rates=None, optimizers=None)
¶
Do an update on the distribution by following the given gradients.
It is expected that the inheriting class has its own implementation for this method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gradients |
dict |
Gradients, as a dictionary, which will be used for computing the necessary updates. |
required |
learning_rates |
Optional[dict] |
A dictionary which contains learning rates for parameters that will be updated using a learning rate coefficient. |
None |
optimizers |
Optional[dict] |
A dictionary which contains optimizer objects for parameters that will be updated using an adaptive optimizer. |
None |
Returns:
Type | Description |
---|---|
SeparableGaussian |
The updated copy of the distribution. |
Source code in evotorch/algorithms/distributed/gaussian.py
def update_parameters(
self,
gradients: dict,
*,
learning_rates: Optional[dict] = None,
optimizers: Optional[dict] = None,
) -> "SeparableGaussian":
mu_grad = gradients["mu"]
sigma_grad = gradients["sigma"]
new_mu = self.mu + self._follow_gradient("mu", mu_grad, learning_rates=learning_rates, optimizers=optimizers)
new_sigma = self.sigma + self._follow_gradient(
"sigma", sigma_grad, learning_rates=learning_rates, optimizers=optimizers
)
return self.modified_copy(mu=new_mu, sigma=new_sigma)
__init__(self, problem, *, popsize, parenthood_ratio, stdev_init=None, radius_init=None, num_interactions=None, popsize_max=None, center_init=None, stdev_min=None, stdev_max=None, stdev_max_change=None, obj_index=None, distributed=False, popsize_weighted_grad_avg=None)
special
¶
__init__(...)
: Initialize the search algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object to work on. |
required |
popsize |
int |
The population size. |
required |
parenthood_ratio |
float |
Expected as a float larger than 0 and smaller
than 1. For example, setting this value to 0.1 means that
the top 10% of the population will be declared as the parents,
and those parents will be used for updating the population.
The amount of parents is always computed according to the
specified |
required |
stdev_init |
Union[float, Iterable[float], torch.Tensor] |
The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
radius_init |
Union[float, Iterable[float], torch.Tensor] |
The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
num_interactions |
Optional[int] |
When given as an integer n, it is ensured that a population has interacted with the GymProblem's environment n times. If this target has not been reached yet, then the population is declared too small, and gets extended with more samples, until n amount of interactions is reached. When given as None, popsize is the only configuration affecting the size of a population. |
None |
popsize_max |
Optional[int] |
Having |
None |
center_init |
Union[float, Iterable[float], torch.Tensor] |
The initial center solution. Can be left as None. |
None |
stdev_min |
Union[float, Iterable[float], torch.Tensor] |
The minimum value for the standard deviation values of the Gaussian search distribution. Can be left as None (which is the default), or can be given as a scalar or as a 1-dimensional array. |
None |
stdev_max |
Union[float, Iterable[float], torch.Tensor] |
The maximum value for the standard deviation values of the Gaussian search distribution. Can be left as None (which is the default), or can be given as a scalar or as a 1-dimensional array. |
None |
stdev_max_change |
Union[float, Iterable[float], torch.Tensor] |
The maximum update ratio allowed on the standard deviation. Expected as None if no such limiter is needed, or as a real number within 0.0 and 1.0 otherwise. In the PGPE implementation of Ha (2017, 2018), a value of 0.2 (20%) was used. For this CEM implementation, the default is None. |
None |
obj_index |
Optional[int] |
Index of the objective according to which the gradient estimations will be done. For single-objective problems, this can be left as None. |
None |
distributed |
bool |
Whether or not the gradient computation will
be distributed. If |
False |
popsize_weighted_grad_avg |
Optional[bool] |
Only to be used in distributed mode.
(where being in distributed mode means |
None |
Source code in evotorch/algorithms/distributed/gaussian.py
def __init__(
self,
problem: Problem,
*,
popsize: int,
parenthood_ratio: float,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[Union[float, RealOrVector]] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the search algorithm.
Args:
problem: The problem object to work on.
popsize: The population size.
parenthood_ratio: Expected as a float larger than 0 and smaller
than 1. For example, setting this value to 0.1 means that
the top 10% of the population will be declared as the parents,
and those parents will be used for updating the population.
The amount of parents is always computed according to the
specified `popsize`, not according to the adapted population
size, and not according to `popsize_max`.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
center_init: The initial center solution.
Can be left as None.
stdev_min: The minimum value for the standard deviation
values of the Gaussian search distribution.
Can be left as None (which is the default),
or can be given as a scalar or as a 1-dimensional array.
stdev_max: The maximum value for the standard deviation
values of the Gaussian search distribution.
Can be left as None (which is the default),
or can be given as a scalar or as a 1-dimensional array.
stdev_max_change: The maximum update ratio allowed on the
standard deviation. Expected as None if no such limiter
is needed, or as a real number within 0.0 and 1.0 otherwise.
In the PGPE implementation of Ha (2017, 2018), a value of
0.2 (20%) was used.
For this CEM implementation, the default is None.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
self.DISTRIBUTION_PARAMS = {"parenthood_ratio": float(parenthood_ratio)}
super().__init__(
problem,
popsize=popsize,
center_learning_rate=1.0,
stdev_learning_rate=1.0,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=None,
optimizer_config=None,
ranking_method=None,
center_init=center_init,
stdev_min=stdev_min,
stdev_max=stdev_max,
stdev_max_change=stdev_max_change,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
)
GaussianSearchAlgorithm (SearchAlgorithm, SinglePopulationAlgorithmMixin)
¶
Base class for search algorithms based on Gaussian distribution.
Source code in evotorch/algorithms/distributed/gaussian.py
class GaussianSearchAlgorithm(SearchAlgorithm, SinglePopulationAlgorithmMixin):
"""
Base class for search algorithms based on Gaussian distribution.
"""
DISTRIBUTION_TYPE = NotImplemented
DISTRIBUTION_PARAMS = NotImplemented
def __init__(
self,
problem: Problem,
*,
popsize: int,
center_learning_rate: float,
stdev_learning_rate: float,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer=None,
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = None,
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[RealOrVector] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
ensure_even_popsize: bool = False,
):
# Ensure that the problem is numeric
problem.ensure_numeric()
# The distribution-based algorithms we consider here cannot handle strict lower and upper bound constraints.
# Therefore, we ensure that the given problem is unbounded.
problem.ensure_unbounded()
# Initialize the SearchAlgorithm, which is the parent class
SearchAlgorithm.__init__(
self,
problem,
center=self._get_mu,
stdev=self._get_sigma,
mean_eval=self._get_mean_eval,
)
self._ensure_even_popsize = bool(ensure_even_popsize)
if not distributed:
# self.add_status_getters({"median_eval": self._get_median_eval})
if num_interactions is not None:
self.add_status_getters({"popsize": self._get_popsize})
if self._ensure_even_popsize:
if (popsize % 2) != 0:
raise ValueError(
f"`popsize` was expected as an even number. However, the received `popsize` is {popsize}."
)
if center_init is None:
# If a starting point for the search distribution is not given,
# then we use the problem object to generate us one.
mu = problem.generate_values(1).reshape(-1)
else:
# If a starting point for the search distribution is given,
# then we make sure that its length, dtype, and device
# are correct.
mu = problem.ensure_tensor_length_and_dtype(center_init, allow_scalar=False, about="center_init")
# Get the standard deviation or the radius configuration from the arguments
stdev_init = to_stdev_init(
solution_length=problem.solution_length, stdev_init=stdev_init, radius_init=radius_init
)
# Make sure that the provided initial standard deviation is
# of correct length, dtype, and device.
sigma = problem.ensure_tensor_length_and_dtype(stdev_init, about="stdev_init", allow_scalar=False)
# Create the distribution
dist_cls = self.DISTRIBUTION_TYPE
dist_params = deepcopy(self.DISTRIBUTION_PARAMS) if self.DISTRIBUTION_PARAMS is not None else {}
dist_params.update({"mu": mu, "sigma": sigma})
self._distribution: Distribution = dist_cls(dist_params, dtype=problem.dtype, device=problem.device)
# Store the following keyword arguments to use later
self._popsize = int(popsize)
self._popsize_max = None if popsize_max is None else int(popsize_max)
self._num_interactions = None if num_interactions is None else int(num_interactions)
self._center_learning_rate = float(center_learning_rate)
self._stdev_learning_rate = float(stdev_learning_rate)
self._optimizer = self._initialize_optimizer(self._center_learning_rate, optimizer, optimizer_config)
self._ranking_method = None if ranking_method is None else str(ranking_method)
self._stdev_min = (
None
if stdev_min is None
else problem.ensure_tensor_length_and_dtype(stdev_min, about="stdev_min", allow_scalar=True)
)
self._stdev_max = (
None
if stdev_max is None
else problem.ensure_tensor_length_and_dtype(stdev_max, about="stdev_max", allow_scalar=True)
)
self._stdev_max_change = (
None
if stdev_max_change is None
else problem.ensure_tensor_length_and_dtype(stdev_max_change, about="stdev_max_change", allow_scalar=True)
)
self._obj_index = problem.normalize_obj_index(obj_index)
if distributed and (problem.num_actors > 0):
# If the algorithm is initialized in distributed mode, and also if the problem is configured
# for parallelization, then the _step method becomes an alias for _step_distributed
self._step = self._step_distributed
else:
# Otherwise, the _step method becomes an alias for _step_non_distributed
self._step = self._step_non_distributed
if popsize_weighted_grad_avg is None:
self._popsize_weighted_grad_avg = num_interactions is None
else:
if not distributed:
raise ValueError(
"The argument `popsize_weighted_grad_avg` can only be used in distributed mode."
" (i.e. when the argument `distributed` is given as True)."
" When `distributed` is False, please leave `popsize_weighted_grad_avg` as None."
)
self._popsize_weighted_grad_avg = bool(popsize_weighted_grad_avg)
self._mean_eval: Optional[float] = None
self._population: Optional[SolutionBatch] = None
self._first_iter: bool = True
# We would like to add the reporting capabilities of the mixin class `singlePopulationAlgorithmMixin`.
# However, we exclude "mean_eval" from the reporting services requested from `SinglePopulationAlgorithmMixin`
# because this class has its own reporting mechanism for `mean_eval`.
# Additionally, we enable the reporting services of `SinglePopulationAlgorithmMixin` only when we are
# in the non-distributed mode. This is because we do not have a centrally stored population at all in the
# distributed mode.
SinglePopulationAlgorithmMixin.__init__(self, exclude="mean_eval", enable=(not distributed))
def _initialize_optimizer(
self, learning_rate: float, optimizer=None, optimizer_config: Optional[dict] = None
) -> object:
if optimizer is None:
return None
elif isinstance(optimizer, str):
center_optim_cls = get_optimizer_class(optimizer, optimizer_config)
return center_optim_cls(
stepsize=float(learning_rate),
dtype=self._distribution.dtype,
solution_length=self._distribution.solution_length,
device=self._distribution.device,
)
else:
return optimizer
def _step(self):
raise NotImplementedError
def _step_distributed(self):
# Use the problem object's `sample_and_compute_gradients` method
# to do parallelized and distributed gradient computation
fetched = self.problem.sample_and_compute_gradients(
self._distribution,
self._popsize,
popsize_max=self._popsize_max,
obj_index=self._obj_index,
num_interactions=self._num_interactions,
ranking_method=self._ranking_method,
ensure_even_popsize=self._ensure_even_popsize,
)
# The method `sample_and_compute_gradients(...)` returns a list of dictionaries, each dictionary being
# the result of a different remote computation.
# For each remote computation, the list will contain a dictionary that looks like this:
# {"gradients": <gradients dictionary here>, "num_solutions": ..., "mean_eval": ...}
# We will now accumulate all the gradients, num_solutions, and mean_evals in their own lists.
# So, in the end, we will have a list of gradients, a list of num_solutions, and a list of
# mean_eval.
# These lists will be stored by the following temporary class:
class list_of:
gradients = []
num_solutions = []
mean_eval = []
# We are now filling the lists declared above
n = len(fetched)
for i in range(n):
list_of.gradients.append(fetched[i]["gradients"])
list_of.num_solutions.append(fetched[i]["num_solutions"])
list_of.mean_eval.append(fetched[i]["mean_eval"])
# Here, we get the keys of our gradient dictionaries.
# For most simple Gaussian distributions, grad_keys should be {"mu", "sigma"}.
grad_keys = set(list_of.gradients[0].keys())
# We now find the total number of solutions and the overall average mean_eval.
# The overall average mean will be reported to the user.
total_num_solutions = 0
total_weighted_eval = 0
for i in range(n):
total_num_solutions += list_of.num_solutions[i]
total_weighted_eval += float(list_of.num_solutions[i] * list_of.mean_eval[i])
avg_mean_eval = total_weighted_eval / total_num_solutions
# For each gradient (in most cases among 'mu' and 'sigma'), we allocate a new 0-filled tensor.
avg_gradients = {}
for key in grad_keys:
avg_gradients[key] = self._distribution.make_zeros(num_solutions=1).reshape(-1)
# Below, we iterate over all collected results and add their gradients, in a weighted manner, onto the
# `avg_gradients` we allocated above.
# At the end, `avg_gradients` will store the weighted-averaged gradients to be followed by the algorithm.
for i in range(n):
# For each collected result, we compute a weight for the gradient, which is the number of solutions
# sampled divided by the total number of sampled solutions.
num_solutions = list_of.num_solutions[i]
if self._popsize_weighted_grad_avg:
# If we are to weigh each gradient by its popsize (i.e. by its sample size)
# then the its weight is computed as its number of solutions divided by the
# total number of solutions
weight = num_solutions / total_num_solutions
else:
# If we are NOT to weigh each gradient by its popsize (i.e. by its sample size)
# then the weight of this gradient simply becomes 1 divided by the number of gradients.
weight = 1 / n
for key in grad_keys:
grad = list_of.gradients[i][key]
avg_gradients[key] += weight * grad
self._update_distribution(avg_gradients)
self._mean_eval = avg_mean_eval
def _step_non_distributed(self):
# First, we define an inner function which fills the current population by sampling from the distribution.
def fill_and_eval_pop():
# This inner function is responsible for filling the main population with samples
# and evaluate them.
if self._num_interactions is None:
# If num_interactions is configured as None, this means that we are not going to adapt
# the population size according to the number of simulation interactions reported
# by the problem object.
# We first make sure that the population (which is to be of constant size, since we are
# not in the adaptive population size mode) is allocated.
if self._population is None:
self._population = SolutionBatch(
self.problem, popsize=self._popsize, device=self._distribution.device, empty=True
)
# Now, we do in-place sampling on the population.
self._distribution.sample(out=self._population.access_values(), generator=self.problem)
# Finally, here, the solutions are evaluated.
self.problem.evaluate(self._population)
else:
# If num_interactions is not None, then this means that we have a threshold for the number
# of simulator interactions to reach before declaring the phase of sampling complete.
# In other words, we have to adapt our population size according to the number of simulator
# interactions reported by the problem object.
# The 'total_interaction_count' status reported by the problem object shows the global interaction count.
# Therefore, to properly count the simulator interactions we made during this generation, we need
# to get the interaction count before starting our sampling and evaluation operations.
first_num_interactions = self.problem.status.get("total_interaction_count", 0)
# We will keep allocating and evaluating new populations until the interaction count threshold is reached.
# These newly allocated populations will eventually concatenated into one.
# The not-yet-concatenated populations and the total allocated population size will be stored below:
populations = []
total_popsize = 0
# Below, we repeatedly allocate, sample, and evaluate, until our thresholds are reached.
while True:
# Allocate a new population
newpop = SolutionBatch(
self.problem,
popsize=self._popsize,
like=self._population,
empty=True,
)
# Update the total population size
total_popsize += len(newpop)
# Sample new solutions within the newly allocated population
self._distribution.sample(out=newpop.access_values(), generator=self.problem)
# Evaluate the new population
self.problem.evaluate(newpop)
# Add the newly allocated and evaluated population into the populations list
populations.append(newpop)
# In addition to the num_interactions threshold, we might also have a popsize_max threshold.
# We now check this threshold.
if (self._popsize_max is not None) and (total_popsize >= self._popsize_max):
# If the popsize_max threshold is reached, we leave the loop.
break
# We now compute the number of interactions we have made during this while loop.
interactions_made = self.problem.status["total_interaction_count"] - first_num_interactions
if interactions_made > self._num_interactions:
# If the number of interactions exceeds our threshold, we leave the loop.
break
# Finally, we concatenate all our populations into one.
self._population = SolutionBatch.cat(populations)
if self._first_iter:
# If we are computing the first generation, we just sample from our distribution and evaluate
# the solutions.
fill_and_eval_pop()
self._first_iter = False
else:
# If we are computing next generations, then we need to compute the gradients of the last
# generation, sample a new population, and evaluate the new population's solutions.
samples = self._population.access_values(keep_evals=True)
fitnesses = self._population.access_evals()[:, self._obj_index]
obj_sense = self.problem.senses[self._obj_index]
ranking_method = self._ranking_method
gradients = self._distribution.compute_gradients(
samples, fitnesses, objective_sense=obj_sense, ranking_method=ranking_method
)
self._update_distribution(gradients)
fill_and_eval_pop()
def _update_distribution(self, gradients: dict):
# This is where we follow the gradients with the help of the stored Distribution object.
# First, we check whether or not we will need to do a controlled update on the
# standard deviation (do we have imposed lower and upper bounds for the standard deviation,
# and do we have a maximum change limiter?)
controlled_stdev_update = (
(self._stdev_min is not None) or (self._stdev_max is not None) or (self._stdev_max_change is not None)
)
if controlled_stdev_update:
# If the standard deviation update needs to be controlled, we store the standard deviation just before
# the update. We will use this later.
old_sigma = self._distribution.sigma
# Here, we determine for which distribution parameter we have a learning rate and for which distribution
# parameter we have an optimizer.
learning_rates = {}
optimizers = {}
if self._optimizer is not None:
# If there is an optimizer, then we declare that "mu" has an optimizer
optimizers["mu"] = self._optimizer
else:
# If we do not have an optimizer, then we declare that "mu" has a raw learning rate coefficient
learning_rates["mu"] = self._center_learning_rate
# Here, we declare that "sigma" has a learning rate
learning_rates["sigma"] = self._stdev_learning_rate
# With the help of the Distribution object's `update_parameters(...)` method, we follow the gradients
updated_dist = self._distribution.update_parameters(
gradients, learning_rates=learning_rates, optimizers=optimizers
)
if controlled_stdev_update:
# If our standard deviation update needs to be controlled, then, considering the pre-update
# standard deviation, we ensure that the update constraints (lower and upper bounds and maximum change)
# are not violated.
updated_dist = updated_dist.modified_copy(
sigma=modify_tensor(
old_sigma,
updated_dist.sigma,
lb=self._stdev_min,
ub=self._stdev_max,
max_change=self._stdev_max_change,
)
)
# Now we can declare that our main distribution is the updated one
self._distribution = updated_dist
def _get_mu(self) -> torch.Tensor:
return self._distribution.parameters["mu"]
def _get_sigma(self) -> torch.Tensor:
return self._distribution.parameters["sigma"]
def _get_mean_eval(self) -> Optional[float]:
if self._population is None:
return self._mean_eval
else:
return float(torch.mean(self._population.evals[:, self._obj_index]))
# def _get_median_eval(self) -> Optional[float]:
# if self._population is None:
# return None
# else:
# return float(torch.median(self._population.evals[:, self._obj_index]))
def _get_popsize(self) -> int:
return 0 if self._population is None else len(self._population)
@property
def optimizer(self):
"""
Get the optimizer used by this search algorithm.
If an optimizer is not being used, the result will be `None`.
If a PyTorch optimizer is being used, the result will be an instance of
`torch.optim.Optimizer`.
If the returned optimizer is "clipup", then the returned object will be
an instance of `evotorch.optimizers.ClipUp`.
The returned optimizer object can be used for reading/writing the
hyperparameters. For example, to read the learning of the optimizer,
one can do:
```python
learning_rate = my_search_algorithm.optimizer.param_groups[0]["lr"]
```
One can also update the learning rate like this:
```python
my_search_algorithm.optimizer.param_groups[0]["lr"] = new_learning_rate
```
**Note for when updating the learning rate of ClipUp.**
At the moment of initialization, if one provides `center_learning_rate`
but the maximum speed is not specified (i.e. the search algorithm is not
given something like `optimizer_config={"max_speed": ...}`), then, the
maximum speed is initialized as `2*center_learning_rate`. However, when
this `center_learning_rate` is later modified (via
`my_search_algorithm.optimizer.param_groups[0]["lr"] = new_center_learning_rate`
the maximum speed will NOT be automatically adjusted.
Therefore, when updating the center learning rate of ClipUp, consider
also adjusting the maximum speed of ClipUp via:
`my_search_algorithm.optimizer.param_groups[0]["max_speed"] = ...`
"""
return None if self._optimizer is None else self._optimizer.contained_optimizer
@property
def population(self) -> Optional[SolutionBatch]:
"""
The population, represented by a SolutionBatch.
If the population is not initialized yet, the retrieved value will
be None.
Also note that, if this algorithm is in distributed mode, the
retrieved value will be None, since the distributed mode causes the
population to be generated in the remote actors, and not in the main
process.
"""
return self._population
@property
def obj_index(self) -> int:
"""
Index of the focused objective
"""
return self._obj_index
obj_index: int
property
readonly
¶
Index of the focused objective
optimizer
property
readonly
¶
Get the optimizer used by this search algorithm.
If an optimizer is not being used, the result will be None
.
If a PyTorch optimizer is being used, the result will be an instance of
torch.optim.Optimizer
.
If the returned optimizer is "clipup", then the returned object will be
an instance of evotorch.optimizers.ClipUp
.
The returned optimizer object can be used for reading/writing the hyperparameters. For example, to read the learning of the optimizer, one can do:
One can also update the learning rate like this:
Note for when updating the learning rate of ClipUp.
At the moment of initialization, if one provides center_learning_rate
but the maximum speed is not specified (i.e. the search algorithm is not
given something like optimizer_config={"max_speed": ...}
), then, the
maximum speed is initialized as 2*center_learning_rate
. However, when
this center_learning_rate
is later modified (via
my_search_algorithm.optimizer.param_groups[0]["lr"] = new_center_learning_rate
the maximum speed will NOT be automatically adjusted.
Therefore, when updating the center learning rate of ClipUp, consider
also adjusting the maximum speed of ClipUp via:
my_search_algorithm.optimizer.param_groups[0]["max_speed"] = ...
population: Optional[evotorch.core.SolutionBatch]
property
readonly
¶
The population, represented by a SolutionBatch.
If the population is not initialized yet, the retrieved value will be None. Also note that, if this algorithm is in distributed mode, the retrieved value will be None, since the distributed mode causes the population to be generated in the remote actors, and not in the main process.
PGPE (GaussianSearchAlgorithm)
¶
This implementation is the symmetric-sampling variant proposed in the paper Sehnke et al. (2010).
Inspired by the PGPE implementations used in the studies of Ha (2017, 2019), and by the evolution strategy variant of Salimans et al. (2017), this PGPE implementation uses 0-centered ranking by default. The default optimizer for this PGPE implementation is ClipUp (Toklu et al., 2020).
References:
Frank Sehnke, Christian Osendorfer, Thomas Ruckstiess,
Alex Graves, Jan Peters, Jurgen Schmidhuber (2010).
Parameter-exploring Policy Gradients.
Neural Networks 23(4), 551-559.
David Ha (2017). Evolving Stable Strategies.
<http://blog.otoro.net/2017/11/12/evolving-stable-strategies/>
Salimans, T., Ho, J., Chen, X., Sidor, S. and Sutskever, I. (2017).
Evolution Strategies as a Scalable Alternative to
Reinforcement Learning.
David Ha (2019). Reinforcement Learning for Improving Agent Design.
Artificial life 25 (4), 352-365.
Toklu, N.E., Liskowski, P., Srivastava, R.K. (2020).
ClipUp: A Simple and Powerful Optimizer
for Distribution-based Policy Evolution.
Parallel Problem Solving from Nature (PPSN 2020).
Source code in evotorch/algorithms/distributed/gaussian.py
class PGPE(GaussianSearchAlgorithm):
"""
PGPE: Policy gradient with parameter-based exploration.
This implementation is the symmetric-sampling variant proposed
in the paper Sehnke et al. (2010).
Inspired by the PGPE implementations used in the studies
of Ha (2017, 2019), and by the evolution strategy variant of
Salimans et al. (2017), this PGPE implementation uses 0-centered
ranking by default.
The default optimizer for this PGPE implementation is ClipUp
(Toklu et al., 2020).
References:
Frank Sehnke, Christian Osendorfer, Thomas Ruckstiess,
Alex Graves, Jan Peters, Jurgen Schmidhuber (2010).
Parameter-exploring Policy Gradients.
Neural Networks 23(4), 551-559.
David Ha (2017). Evolving Stable Strategies.
<http://blog.otoro.net/2017/11/12/evolving-stable-strategies/>
Salimans, T., Ho, J., Chen, X., Sidor, S. and Sutskever, I. (2017).
Evolution Strategies as a Scalable Alternative to
Reinforcement Learning.
David Ha (2019). Reinforcement Learning for Improving Agent Design.
Artificial life 25 (4), 352-365.
Toklu, N.E., Liskowski, P., Srivastava, R.K. (2020).
ClipUp: A Simple and Powerful Optimizer
for Distribution-based Policy Evolution.
Parallel Problem Solving from Nature (PPSN 2020).
"""
DISTRIBUTION_TYPE = NotImplemented # To be filled by the PGPE instance
DISTRIBUTION_PARAMS = NotImplemented # To be filled by the PGPE instance
def __init__(
self,
problem: Problem,
*,
popsize: int,
center_learning_rate: float,
stdev_learning_rate: float,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer="clipup",
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = "centered",
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[RealOrVector] = 0.2,
symmetric: bool = True,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the PGPE algorithm.
Args:
problem: The problem object which is being worked on.
The problem must have its dtype defined
(which means it works on Solution objects,
not with custom Solution objects).
Also, the problem must be single-objective.
popsize: The population size.
In the case of PGPE, `popsize` is expected as an even number
in non-distributed mode. In distributed mode, PGPE will
ensure that each sub-population size assigned to a remote
actor is an even number.
This behavior is because PGPE does symmetric sampling
(i.e. solutions are sampled in pairs).
center_learning_rate: The learning rate for the center
of the search distribution.
stdev_learning_rate: The learning rate for the standard
deviation values of the search distribution.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
optimizer: The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
The default is 'clipup'.
Note that, for ClipUp, the default maximum speed is set
as twice the given `center_learning_rate`.
This maximum speed can be configured by passing
`{"max_speed": ...}` to `optimizer_config`.
optimizer_config: Configuration which will be passed
to the optimizer as keyword arguments.
See `evotorch.optimizers` for details about
which optimizer accepts which keyword arguments.
ranking_method: Which ranking method will be used for
fitness shaping. See the documentation of
`evotorch.ranking.rank(...)` for details.
As in the study of Salimans et al. (2017),
the default is 'centered'.
Can be given as None if no such ranking is required.
center_init: The initial center solution.
Can be left as None.
stdev_min: Lower bound for the standard deviation value/array.
Can be given as a real number, or as an array of real numbers.
stdev_max: Upper bound for the standard deviation value/array.
Can be given as a real number, or as an array of real numbers.
stdev_max_change: The maximum update ratio allowed on the
standard deviation. Expected as None if no such limiter
is needed, or as a real number within 0.0 and 1.0 otherwise.
Like in the implementation of Ha (2017, 2018),
the default value for this setting is 0.2, meaning that
the update on the standard deviation values can not be
more than 20% of their original values.
symmetric: Whether or not the solutions will be sampled
in a symmetric/mirrored/antithetic manner.
The default is True.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
if symmetric:
self.DISTRIBUTION_TYPE = SymmetricSeparableGaussian
divide_by = "num_directions"
else:
self.DISTRIBUTION_TYPE = SeparableGaussian
divide_by = "num_solutions"
self.DISTRIBUTION_PARAMS = {"divide_mu_grad_by": divide_by, "divide_sigma_grad_by": divide_by}
super().__init__(
problem,
popsize=popsize,
center_learning_rate=center_learning_rate,
stdev_learning_rate=stdev_learning_rate,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=optimizer,
optimizer_config=optimizer_config,
ranking_method=ranking_method,
center_init=center_init,
stdev_min=stdev_min,
stdev_max=stdev_max,
stdev_max_change=stdev_max_change,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
ensure_even_popsize=symmetric,
)
__init__(self, problem, *, popsize, center_learning_rate, stdev_learning_rate, stdev_init=None, radius_init=None, num_interactions=None, popsize_max=None, optimizer='clipup', optimizer_config=None, ranking_method='centered', center_init=None, stdev_min=None, stdev_max=None, stdev_max_change=0.2, symmetric=True, obj_index=None, distributed=False, popsize_weighted_grad_avg=None)
special
¶
__init__(...)
: Initialize the PGPE algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object which is being worked on. The problem must have its dtype defined (which means it works on Solution objects, not with custom Solution objects). Also, the problem must be single-objective. |
required |
popsize |
int |
The population size.
In the case of PGPE, |
required |
center_learning_rate |
float |
The learning rate for the center of the search distribution. |
required |
stdev_learning_rate |
float |
The learning rate for the standard deviation values of the search distribution. |
required |
stdev_init |
Union[float, Iterable[float], torch.Tensor] |
The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
radius_init |
Union[float, Iterable[float], torch.Tensor] |
The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
num_interactions |
Optional[int] |
When given as an integer n, it is ensured that a population has interacted with the GymProblem's environment n times. If this target has not been reached yet, then the population is declared too small, and gets extended with more samples, until n amount of interactions is reached. When given as None, popsize is the only configuration affecting the size of a population. |
None |
popsize_max |
Optional[int] |
Having |
None |
optimizer |
The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
The default is 'clipup'.
Note that, for ClipUp, the default maximum speed is set
as twice the given |
'clipup' |
|
optimizer_config |
Optional[dict] |
Configuration which will be passed
to the optimizer as keyword arguments.
See |
None |
ranking_method |
Optional[str] |
Which ranking method will be used for
fitness shaping. See the documentation of
|
'centered' |
center_init |
Union[float, Iterable[float], torch.Tensor] |
The initial center solution. Can be left as None. |
None |
stdev_min |
Union[float, Iterable[float], torch.Tensor] |
Lower bound for the standard deviation value/array. Can be given as a real number, or as an array of real numbers. |
None |
stdev_max |
Union[float, Iterable[float], torch.Tensor] |
Upper bound for the standard deviation value/array. Can be given as a real number, or as an array of real numbers. |
None |
stdev_max_change |
Union[float, Iterable[float], torch.Tensor] |
The maximum update ratio allowed on the standard deviation. Expected as None if no such limiter is needed, or as a real number within 0.0 and 1.0 otherwise. Like in the implementation of Ha (2017, 2018), the default value for this setting is 0.2, meaning that the update on the standard deviation values can not be more than 20% of their original values. |
0.2 |
symmetric |
bool |
Whether or not the solutions will be sampled in a symmetric/mirrored/antithetic manner. The default is True. |
True |
obj_index |
Optional[int] |
Index of the objective according to which the gradient estimations will be done. For single-objective problems, this can be left as None. |
None |
distributed |
bool |
Whether or not the gradient computation will
be distributed. If |
False |
popsize_weighted_grad_avg |
Optional[bool] |
Only to be used in distributed mode.
(where being in distributed mode means |
None |
Source code in evotorch/algorithms/distributed/gaussian.py
def __init__(
self,
problem: Problem,
*,
popsize: int,
center_learning_rate: float,
stdev_learning_rate: float,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer="clipup",
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = "centered",
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[RealOrVector] = 0.2,
symmetric: bool = True,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the PGPE algorithm.
Args:
problem: The problem object which is being worked on.
The problem must have its dtype defined
(which means it works on Solution objects,
not with custom Solution objects).
Also, the problem must be single-objective.
popsize: The population size.
In the case of PGPE, `popsize` is expected as an even number
in non-distributed mode. In distributed mode, PGPE will
ensure that each sub-population size assigned to a remote
actor is an even number.
This behavior is because PGPE does symmetric sampling
(i.e. solutions are sampled in pairs).
center_learning_rate: The learning rate for the center
of the search distribution.
stdev_learning_rate: The learning rate for the standard
deviation values of the search distribution.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
optimizer: The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
The default is 'clipup'.
Note that, for ClipUp, the default maximum speed is set
as twice the given `center_learning_rate`.
This maximum speed can be configured by passing
`{"max_speed": ...}` to `optimizer_config`.
optimizer_config: Configuration which will be passed
to the optimizer as keyword arguments.
See `evotorch.optimizers` for details about
which optimizer accepts which keyword arguments.
ranking_method: Which ranking method will be used for
fitness shaping. See the documentation of
`evotorch.ranking.rank(...)` for details.
As in the study of Salimans et al. (2017),
the default is 'centered'.
Can be given as None if no such ranking is required.
center_init: The initial center solution.
Can be left as None.
stdev_min: Lower bound for the standard deviation value/array.
Can be given as a real number, or as an array of real numbers.
stdev_max: Upper bound for the standard deviation value/array.
Can be given as a real number, or as an array of real numbers.
stdev_max_change: The maximum update ratio allowed on the
standard deviation. Expected as None if no such limiter
is needed, or as a real number within 0.0 and 1.0 otherwise.
Like in the implementation of Ha (2017, 2018),
the default value for this setting is 0.2, meaning that
the update on the standard deviation values can not be
more than 20% of their original values.
symmetric: Whether or not the solutions will be sampled
in a symmetric/mirrored/antithetic manner.
The default is True.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
if symmetric:
self.DISTRIBUTION_TYPE = SymmetricSeparableGaussian
divide_by = "num_directions"
else:
self.DISTRIBUTION_TYPE = SeparableGaussian
divide_by = "num_solutions"
self.DISTRIBUTION_PARAMS = {"divide_mu_grad_by": divide_by, "divide_sigma_grad_by": divide_by}
super().__init__(
problem,
popsize=popsize,
center_learning_rate=center_learning_rate,
stdev_learning_rate=stdev_learning_rate,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=optimizer,
optimizer_config=optimizer_config,
ranking_method=ranking_method,
center_init=center_init,
stdev_min=stdev_min,
stdev_max=stdev_max,
stdev_max_change=stdev_max_change,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
ensure_even_popsize=symmetric,
)
SNES (GaussianSearchAlgorithm)
¶
Inspired by the implementation at: http://schaul.site44.com/code/snes.py
Reference:
Schaul, T., Glasmachers, T., Schmidhuber, J. (2011).
High Dimensions and Heavy Tails for Natural Evolution Strategies.
Proceedings of the 13th annual conference on Genetic and evolutionary
computation (GECCO 2011).
Source code in evotorch/algorithms/distributed/gaussian.py
class SNES(GaussianSearchAlgorithm):
"""
SNES: Separable Natural Evolution Strategies
Inspired by the implementation at: http://schaul.site44.com/code/snes.py
Reference:
Schaul, T., Glasmachers, T., Schmidhuber, J. (2011).
High Dimensions and Heavy Tails for Natural Evolution Strategies.
Proceedings of the 13th annual conference on Genetic and evolutionary
computation (GECCO 2011).
"""
DISTRIBUTION_TYPE = ExpSeparableGaussian
DISTRIBUTION_PARAMS = None
def __init__(
self,
problem: Problem,
*,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
popsize: Optional[int] = None,
center_learning_rate: Optional[float] = None,
stdev_learning_rate: Optional[float] = None,
scale_learning_rate: bool = True,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer=None,
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = "nes",
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[RealOrVector] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the SNES algorithm.
Args:
problem: The problem object which is being worked on.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
popsize: Population size. Can be specified as an int,
or can be left as None to let the solver decide.
In the case of SNES, `popsize` can be left as None,
in which case the default `popsize` will be computed
as `4 + floor(3 * log(n))` where `n` is the length
of a solution.
center_learning_rate: Learning rate for updating the mean
of the search distribution. Default value is 1.0
stdev_learning_rate: Learning rate for updating the covariance
matrix of the search distribution.
The default value is `0.2 * (3 + log(n)) / sqrt(n)`
where `n` is the length of a solution.
scale_learning_rate: For SNES, there is a default standard
deviation learning rate value which is computed as
`0.2 * (3 + log(n)) / sqrt(n)` (where `n` is the solution
length).
If scale_learning_rate is True (which is the default),
then the effective learning rate for the standard deviation
becomes the provided `stdev_learning_rate` multiplied by this
default value. If `scale_learning_rate` is False, then the
effective standard deviation learning rate becomes
equal to the provided `stdev_learning_rate` value.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
optimizer: The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
The default is None.
Note that, for ClipUp, the default maximum speed is set
as twice the given `center_learning_rate`.
This maximum speed can be configured by passing
`{"max_speed": ...}` to `optimizer_config`.
optimizer_config: Configuration which will be passed
to the optimizer as keyword arguments.
See `evotorch.optimizers` for details about
which optimizer accepts which keyword arguments.
ranking_method: Which ranking method will be used for
fitness shaping. See the documentation of
`evotorch.ranking.rank(...)` for details.
The default is 'nes'.
Can be given as None if no such ranking is required.
center_init: The initial center solution.
Can be left as None.
stdev_min: Minimum values for the standard deviation.
Expected as a 1-dimensional array to serve as a limiter
to the diagonals of the covariance matrix's square root.
stdev_max: Maximum values for the standard deviation.
Expected as a 1-dimensional array to serve as a limiter
to the diagonals of the covariance matrix's square root.
stdev_max_change: Maximum change allowed for when updating
the square roort of the covariance matrix.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
if popsize is None:
popsize = int(4 + math.floor(3 * math.log(problem.solution_length)))
if center_learning_rate is None:
center_learning_rate = 1.0
def default_stdev_lr():
n = problem.solution_length
return 0.2 * (3 + math.log(n)) / math.sqrt(n)
if stdev_learning_rate is None:
stdev_learning_rate = default_stdev_lr()
else:
stdev_learning_rate = float(stdev_learning_rate)
if scale_learning_rate:
stdev_learning_rate *= default_stdev_lr()
super().__init__(
problem,
popsize=popsize,
center_learning_rate=center_learning_rate,
stdev_learning_rate=stdev_learning_rate,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=optimizer,
optimizer_config=optimizer_config,
ranking_method=ranking_method,
center_init=center_init,
stdev_min=stdev_min,
stdev_max=stdev_max,
stdev_max_change=stdev_max_change,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
)
DISTRIBUTION_TYPE (SeparableGaussian)
¶
exponentialseparable Multivariate Gaussian, as used by SNES
Source code in evotorch/algorithms/distributed/gaussian.py
class ExpSeparableGaussian(SeparableGaussian):
"""exponentialseparable Multivariate Gaussian, as used by SNES"""
MANDATORY_PARAMETERS = {"mu", "sigma"}
OPTIONAL_PARAMETERS = set()
def _compute_gradients(self, samples: torch.Tensor, weights: torch.Tensor, ranking_used: Optional[str]) -> dict:
if ranking_used != "nes":
weights = weights / torch.sum(torch.abs(weights))
scaled_noises = samples - self.mu
raw_noises = scaled_noises / self.sigma
mu_grad = total(dot(weights, scaled_noises))
sigma_grad = total(dot(weights, (raw_noises**2) - 1))
return {"mu": mu_grad, "sigma": sigma_grad}
def update_parameters(
self,
gradients: dict,
*,
learning_rates: Optional[dict] = None,
optimizers: Optional[dict] = None,
) -> "ExpSeparableGaussian":
mu_grad = gradients["mu"]
sigma_grad = gradients["sigma"]
new_mu = self.mu + self._follow_gradient("mu", mu_grad, learning_rates=learning_rates, optimizers=optimizers)
new_sigma = self.sigma * torch.exp(
0.5 * self._follow_gradient("sigma", sigma_grad, learning_rates=learning_rates, optimizers=optimizers)
)
return self.modified_copy(mu=new_mu, sigma=new_sigma)
update_parameters(self, gradients, *, learning_rates=None, optimizers=None)
¶
Do an update on the distribution by following the given gradients.
It is expected that the inheriting class has its own implementation for this method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gradients |
dict |
Gradients, as a dictionary, which will be used for computing the necessary updates. |
required |
learning_rates |
Optional[dict] |
A dictionary which contains learning rates for parameters that will be updated using a learning rate coefficient. |
None |
optimizers |
Optional[dict] |
A dictionary which contains optimizer objects for parameters that will be updated using an adaptive optimizer. |
None |
Returns:
Type | Description |
---|---|
ExpSeparableGaussian |
The updated copy of the distribution. |
Source code in evotorch/algorithms/distributed/gaussian.py
def update_parameters(
self,
gradients: dict,
*,
learning_rates: Optional[dict] = None,
optimizers: Optional[dict] = None,
) -> "ExpSeparableGaussian":
mu_grad = gradients["mu"]
sigma_grad = gradients["sigma"]
new_mu = self.mu + self._follow_gradient("mu", mu_grad, learning_rates=learning_rates, optimizers=optimizers)
new_sigma = self.sigma * torch.exp(
0.5 * self._follow_gradient("sigma", sigma_grad, learning_rates=learning_rates, optimizers=optimizers)
)
return self.modified_copy(mu=new_mu, sigma=new_sigma)
__init__(self, problem, *, stdev_init=None, radius_init=None, popsize=None, center_learning_rate=None, stdev_learning_rate=None, scale_learning_rate=True, num_interactions=None, popsize_max=None, optimizer=None, optimizer_config=None, ranking_method='nes', center_init=None, stdev_min=None, stdev_max=None, stdev_max_change=None, obj_index=None, distributed=False, popsize_weighted_grad_avg=None)
special
¶
__init__(...)
: Initialize the SNES algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object which is being worked on. |
required |
stdev_init |
Union[float, Iterable[float], torch.Tensor] |
The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
radius_init |
Union[float, Iterable[float], torch.Tensor] |
The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
popsize |
Optional[int] |
Population size. Can be specified as an int,
or can be left as None to let the solver decide.
In the case of SNES, |
None |
center_learning_rate |
Optional[float] |
Learning rate for updating the mean of the search distribution. Default value is 1.0 |
None |
stdev_learning_rate |
Optional[float] |
Learning rate for updating the covariance
matrix of the search distribution.
The default value is |
None |
scale_learning_rate |
bool |
For SNES, there is a default standard
deviation learning rate value which is computed as
|
True |
num_interactions |
Optional[int] |
When given as an integer n, it is ensured that a population has interacted with the GymProblem's environment n times. If this target has not been reached yet, then the population is declared too small, and gets extended with more samples, until n amount of interactions is reached. When given as None, popsize is the only configuration affecting the size of a population. |
None |
popsize_max |
Optional[int] |
Having |
None |
num_interactions |
Optional[int] |
When given as an integer n, it is ensured that a population has interacted with the GymProblem's environment n times. If this target has not been reached yet, then the population is declared too small, and gets extended with more samples, until n amount of interactions is reached. When given as None, popsize is the only configuration affecting the size of a population. |
None |
popsize_max |
Optional[int] |
Having |
None |
optimizer |
The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
The default is None.
Note that, for ClipUp, the default maximum speed is set
as twice the given |
None |
|
optimizer_config |
Optional[dict] |
Configuration which will be passed
to the optimizer as keyword arguments.
See |
None |
ranking_method |
Optional[str] |
Which ranking method will be used for
fitness shaping. See the documentation of
|
'nes' |
center_init |
Union[float, Iterable[float], torch.Tensor] |
The initial center solution. Can be left as None. |
None |
stdev_min |
Union[float, Iterable[float], torch.Tensor] |
Minimum values for the standard deviation. Expected as a 1-dimensional array to serve as a limiter to the diagonals of the covariance matrix's square root. |
None |
stdev_max |
Union[float, Iterable[float], torch.Tensor] |
Maximum values for the standard deviation. Expected as a 1-dimensional array to serve as a limiter to the diagonals of the covariance matrix's square root. |
None |
stdev_max_change |
Union[float, Iterable[float], torch.Tensor] |
Maximum change allowed for when updating the square roort of the covariance matrix. |
None |
obj_index |
Optional[int] |
Index of the objective according to which the gradient estimations will be done. For single-objective problems, this can be left as None. |
None |
distributed |
bool |
Whether or not the gradient computation will
be distributed. If |
False |
popsize_weighted_grad_avg |
Optional[bool] |
Only to be used in distributed mode.
(where being in distributed mode means |
None |
Source code in evotorch/algorithms/distributed/gaussian.py
def __init__(
self,
problem: Problem,
*,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
popsize: Optional[int] = None,
center_learning_rate: Optional[float] = None,
stdev_learning_rate: Optional[float] = None,
scale_learning_rate: bool = True,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer=None,
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = "nes",
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[RealOrVector] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the SNES algorithm.
Args:
problem: The problem object which is being worked on.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
popsize: Population size. Can be specified as an int,
or can be left as None to let the solver decide.
In the case of SNES, `popsize` can be left as None,
in which case the default `popsize` will be computed
as `4 + floor(3 * log(n))` where `n` is the length
of a solution.
center_learning_rate: Learning rate for updating the mean
of the search distribution. Default value is 1.0
stdev_learning_rate: Learning rate for updating the covariance
matrix of the search distribution.
The default value is `0.2 * (3 + log(n)) / sqrt(n)`
where `n` is the length of a solution.
scale_learning_rate: For SNES, there is a default standard
deviation learning rate value which is computed as
`0.2 * (3 + log(n)) / sqrt(n)` (where `n` is the solution
length).
If scale_learning_rate is True (which is the default),
then the effective learning rate for the standard deviation
becomes the provided `stdev_learning_rate` multiplied by this
default value. If `scale_learning_rate` is False, then the
effective standard deviation learning rate becomes
equal to the provided `stdev_learning_rate` value.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
optimizer: The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
The default is None.
Note that, for ClipUp, the default maximum speed is set
as twice the given `center_learning_rate`.
This maximum speed can be configured by passing
`{"max_speed": ...}` to `optimizer_config`.
optimizer_config: Configuration which will be passed
to the optimizer as keyword arguments.
See `evotorch.optimizers` for details about
which optimizer accepts which keyword arguments.
ranking_method: Which ranking method will be used for
fitness shaping. See the documentation of
`evotorch.ranking.rank(...)` for details.
The default is 'nes'.
Can be given as None if no such ranking is required.
center_init: The initial center solution.
Can be left as None.
stdev_min: Minimum values for the standard deviation.
Expected as a 1-dimensional array to serve as a limiter
to the diagonals of the covariance matrix's square root.
stdev_max: Maximum values for the standard deviation.
Expected as a 1-dimensional array to serve as a limiter
to the diagonals of the covariance matrix's square root.
stdev_max_change: Maximum change allowed for when updating
the square roort of the covariance matrix.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
if popsize is None:
popsize = int(4 + math.floor(3 * math.log(problem.solution_length)))
if center_learning_rate is None:
center_learning_rate = 1.0
def default_stdev_lr():
n = problem.solution_length
return 0.2 * (3 + math.log(n)) / math.sqrt(n)
if stdev_learning_rate is None:
stdev_learning_rate = default_stdev_lr()
else:
stdev_learning_rate = float(stdev_learning_rate)
if scale_learning_rate:
stdev_learning_rate *= default_stdev_lr()
super().__init__(
problem,
popsize=popsize,
center_learning_rate=center_learning_rate,
stdev_learning_rate=stdev_learning_rate,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=optimizer,
optimizer_config=optimizer_config,
ranking_method=ranking_method,
center_init=center_init,
stdev_min=stdev_min,
stdev_max=stdev_max,
stdev_max_change=stdev_max_change,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
)
XNES (GaussianSearchAlgorithm)
¶
Inspired by the implementation at: http://schaul.site44.com/code/xnes.py https://github.com/pybrain/pybrain/blob/master/pybrain/optimization/distributionbased/xnes.py
Reference
Glasmachers, Tobias, et al. Exponential natural evolution strategies. Proceedings of the 12th annual conference on Genetic and evolutionary computation (GECCO 2010).
Source code in evotorch/algorithms/distributed/gaussian.py
class XNES(GaussianSearchAlgorithm):
"""
XNES: Exponential Natural Evolution Strategies
Inspired by the implementation at:
http://schaul.site44.com/code/xnes.py
https://github.com/pybrain/pybrain/blob/master/pybrain/optimization/distributionbased/xnes.py
Reference:
Glasmachers, Tobias, et al.
Exponential natural evolution strategies.
Proceedings of the 12th annual conference on Genetic and evolutionary
computation (GECCO 2010).
"""
DISTRIBUTION_TYPE = ExpGaussian
DISTRIBUTION_PARAMS = None
def __init__(
self,
problem: Problem,
*,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
popsize: Optional[int] = None,
center_learning_rate: Optional[float] = None,
stdev_learning_rate: Optional[float] = None,
scale_learning_rate: bool = True,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer=None,
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = "nes",
center_init: Optional[RealOrVector] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the XNES algorithm.
Args:
problem: The problem object which is being worked on.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
popsize: Population size. Can be specified as an int,
or can be left as None to let the solver decide.
In the case of SNES, `popsize` can be left as None,
in which case the default `popsize` will be computed
as `4 + floor(3 * log(n))` where `n` is the length
of a solution.
center_learning_rate: Learning rate for updating the mean
of the search distribution. Default value is 1.0
stdev_learning_rate: Learning rate for updating the covariance
matrix of the search distribution.
The default value is `0.6 * (3 + log(n)) / (n * sqrt(n))`
where `n` is the length of a solution.
scale_learning_rate: For SNES, there is a default standard
deviation learning rate value which is computed as
`0.6 * (3 + log(n)) / (n * sqrt(n))` (where `n` is the solution
length).
If scale_learning_rate is True (which is the default),
then the effective learning rate for the standard deviation
becomes the provided `stdev_learning_rate` multiplied by this
default value. If `scale_learning_rate` is False, then the
effective standard deviation learning rate becomes
equal to the provided `stdev_learning_rate` value.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
optimizer: The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
The default is None.
Note that, for ClipUp, the default maximum speed is set
as twice the given `center_learning_rate`.
This maximum speed can be configured by passing
`{"max_speed": ...}` to `optimizer_config`.
optimizer_config: Configuration which will be passed
to the optimizer as keyword arguments.
See `evotorch.optimizers` for details about
which optimizer accepts which keyword arguments.
ranking_method: Which ranking method will be used for
fitness shaping. See the documentation of
`evotorch.ranking.rank(...)` for details.
The default is 'nes'.
Can be given as None if no such ranking is required.
center_init: The initial center solution.
Can be left as None.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
if popsize is None:
popsize = int(4 + math.floor(3 * math.log(problem.solution_length)))
if center_learning_rate is None:
center_learning_rate = 1.0
def default_stdev_lr():
n = problem.solution_length
return 0.6 * (3 + math.log(n)) / (n * math.sqrt(n))
if stdev_learning_rate is None:
stdev_learning_rate = default_stdev_lr()
else:
stdev_learning_rate = float(stdev_learning_rate)
if scale_learning_rate:
stdev_learning_rate *= default_stdev_lr()
super().__init__(
problem,
popsize=popsize,
center_learning_rate=center_learning_rate,
stdev_learning_rate=stdev_learning_rate,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=optimizer,
optimizer_config=optimizer_config,
ranking_method=ranking_method,
center_init=center_init,
stdev_min=None,
stdev_max=None,
stdev_max_change=None,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
)
DISTRIBUTION_TYPE (Distribution)
¶
exponential Multivariate Gaussian, as used by XNES
Source code in evotorch/algorithms/distributed/gaussian.py
class ExpGaussian(Distribution):
"""exponential Multivariate Gaussian, as used by XNES"""
# Corresponding to mu and A in symbols used in xNES paper
MANDATORY_PARAMETERS = {"mu", "sigma"}
# Inverse of sigma, numerically more stable to track this independently to sigma
OPTIONAL_PARAMETERS = {"sigma_inv"}
def __init__(
self,
parameters: dict,
*,
solution_length: Optional[int] = None,
device: Optional[Device] = None,
dtype: Optional[DType] = None,
):
[mu_length] = parameters["mu"].shape
# Make sigma 2D
if len(parameters["sigma"].shape) == 1:
parameters["sigma"] = torch.diag(parameters["sigma"])
# Automatically generate sigma_inv if not provided
if "sigma_inv" not in parameters:
parameters["sigma_inv"] = torch.inverse(parameters["sigma"])
[sigma_length, _] = parameters["sigma"].shape
if solution_length is None:
solution_length = mu_length
else:
if solution_length != mu_length:
raise ValueError(
f"The argument `solution_length` does not match the length of `mu` provided in `parameters`."
f" solution_length={solution_length},"
f' parameters["mu"]={mu_length}.'
)
if mu_length != sigma_length:
raise ValueError(
f"The tensors `mu` and `sigma` provided within `parameters` have mismatching lengths."
f' parameters["mu"]={mu_length},'
f' parameters["sigma"]={sigma_length}.'
)
super().__init__(
solution_length=solution_length,
parameters=parameters,
device=device,
dtype=dtype,
)
# Make identity matrix as this is used throughout in gradient computation
self.eye = self.make_I(solution_length)
@property
def mu(self) -> torch.Tensor:
"""Getter for mu
Returns:
mu (torch.Tensor): The center of the search distribution
"""
return self.parameters["mu"]
@mu.setter
def mu(self, new_mu: Iterable):
"""Setter for mu
Args:
new_mu (torch.Tensor): The new value of mu
"""
self.parameters["mu"] = torch.as_tensor(new_mu, dtype=self.dtype, device=self.device)
@property
def cov(self) -> torch.Tensor:
"""The covariance matrix A^T A"""
return self.sigma.transpose(0, 1) @ self.sigma
@property
def sigma(self) -> torch.Tensor:
"""Getter for sigma
Returns:
sigma (torch.Tensor): The square root of the covariance matrix
"""
return self.parameters["sigma"]
@property
def sigma_inv(self) -> torch.Tensor:
"""Getter for sigma_inv
Returns:
sigma_inv (torch.Tensor): The inverse square root of the covariance matrix
"""
if "sigma_inv" in self.parameters:
return self.parameters["sigma_inv"]
else:
return torch.inverse(self.parameters["sigma"])
@property
def A(self) -> torch.Tensor:
"""Alias for self.sigma, for notational consistency with paper"""
return self.sigma
@property
def A_inv(self) -> torch.Tensor:
"""Alias for self.sigma_inv, for notational consistency with paper"""
return self.sigma_inv
@sigma.setter
def sigma(self, new_sigma: Iterable):
"""Setter for sigma
Args:
new_sigma (torch.Tensor): The new value of sigma, the square root of the covariance matrix
"""
self.parameters["sigma"] = torch.as_tensor(new_sigma, dtype=self.dtype, device=self.device)
def to_global_coordinates(self, local_coordinates: torch.Tensor) -> torch.Tensor:
"""Map samples from local coordinate space N(0, I_d) to global coordinate space N(mu, A^T A)
This function is the inverse of to_local_coordinates
Args:
local_coordinates (torch.Tensor): The local coordinates sampled from N(0, I_d)
Returns:
global_coordinates (torch.Tensor): The global coordinates sampled from N(mu, A^T A)
"""
# Global samples are constructed as x = mu + A z where z is local coordinate
# We use transpose here to simplify the batched application of A
return self.mu.unsqueeze(0) + (self.A @ local_coordinates.T).T
def to_local_coordinates(self, global_coordinates: torch.Tensor) -> torch.Tensor:
"""Map samples from global coordinate space N(mu, A^T A) to local coordinate space N(0, I_d)
This function is the inverse of to_global_coordinates
Args:
global_coordinates (torch.Tensor): The global coordinates sampled from N(mu, A^T A)
Returns:
local_coordinates (torch.Tensor): The local coordinates sampled from N(0, I_d)
"""
# Global samples are constructed as x = mu + A z where z is local coordinate
# Therefore, we can recover z according to z = A_inv (x - mu)
return (self.A_inv @ (global_coordinates - self.mu.unsqueeze(0)).T).T
def _fill(self, out: torch.Tensor, *, generator: Optional[torch.Generator] = None):
"""Fill a tensor with samples from N(mu, A^T A)
Args:
out (torch.Tensor): The tensor to fill
generator (Optional[torch.Generator]): A generator to use to generate random values
"""
# Fill with local coordinates from N(0, I_d)
self.make_gaussian(out=out, generator=generator)
# Map local coordinates to global coordinate system
out[:] = self.to_global_coordinates(out)
def _compute_gradients(self, samples: torch.Tensor, weights: torch.Tensor, ranking_used: Optional[str]) -> dict:
"""Compute the gradients with respect to a given set of samples and weights
Args:
samples (torch.Tensor): Samples drawn from N(mu, A^T A), ideally using self._fill
weights (torch.Tensor): Weights e.g. fitnesses or utilities assigned to samples
ranking_used (optional[str]): The ranking method used to compute weights
Returns:
grads (dict): A dictionary containing the approximated natural gradient on d and M
"""
# Compute the local coordinates
local_coordinates = self.to_local_coordinates(samples)
# Make sure that the weights (utilities) are 0-centered
# (Otherwise the formulations would have to consider a bias term)
if ranking_used not in ("centered", "normalized"):
weights = weights - torch.mean(weights)
d_grad = total(dot(weights, local_coordinates))
local_coordinates_outer = local_coordinates.unsqueeze(1) * local_coordinates.unsqueeze(2)
M_grad = torch.sum(
weights.unsqueeze(-1).unsqueeze(-1) * (local_coordinates_outer - self.eye.unsqueeze(0)), dim=0
)
return {
"d": d_grad,
"M": M_grad,
}
def update_parameters(
self,
gradients: dict,
*,
learning_rates: Optional[dict] = None,
optimizers: Optional[dict] = None,
) -> "ExpGaussian":
d_grad = gradients["d"]
M_grad = gradients["M"]
if "d" not in learning_rates:
learning_rates["d"] = learning_rates["mu"]
if "M" not in learning_rates:
learning_rates["M"] = learning_rates["sigma"]
# Follow gradients for d, and M
update_d = self._follow_gradient("d", d_grad, learning_rates=learning_rates, optimizers=optimizers)
update_M = self._follow_gradient("M", M_grad, learning_rates=learning_rates, optimizers=optimizers)
# Fold into parameters mu, A and A inv
new_mu = self.mu + torch.mv(self.A, update_d)
new_A = self.A @ torch.matrix_exp(0.5 * update_M)
new_A_inv = torch.matrix_exp(-0.5 * update_M) @ self.A_inv
# Return modified distribution
return self.modified_copy(mu=new_mu, sigma=new_A, sigma_inv=new_A_inv)
A: Tensor
property
readonly
¶
Alias for self.sigma, for notational consistency with paper
A_inv: Tensor
property
readonly
¶
Alias for self.sigma_inv, for notational consistency with paper
cov: Tensor
property
readonly
¶
The covariance matrix A^T A
mu: Tensor
property
writable
¶
Getter for mu
Returns:
Type | Description |
---|---|
mu (torch.Tensor) |
The center of the search distribution |
sigma: Tensor
property
writable
¶
Getter for sigma
Returns:
Type | Description |
---|---|
sigma (torch.Tensor) |
The square root of the covariance matrix |
sigma_inv: Tensor
property
readonly
¶
Getter for sigma_inv
Returns:
Type | Description |
---|---|
sigma_inv (torch.Tensor) |
The inverse square root of the covariance matrix |
to_global_coordinates(self, local_coordinates)
¶
Map samples from local coordinate space N(0, I_d) to global coordinate space N(mu, A^T A) This function is the inverse of to_local_coordinates
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_coordinates |
torch.Tensor |
The local coordinates sampled from N(0, I_d) |
required |
Returns:
Type | Description |
---|---|
global_coordinates (torch.Tensor) |
The global coordinates sampled from N(mu, A^T A) |
Source code in evotorch/algorithms/distributed/gaussian.py
def to_global_coordinates(self, local_coordinates: torch.Tensor) -> torch.Tensor:
"""Map samples from local coordinate space N(0, I_d) to global coordinate space N(mu, A^T A)
This function is the inverse of to_local_coordinates
Args:
local_coordinates (torch.Tensor): The local coordinates sampled from N(0, I_d)
Returns:
global_coordinates (torch.Tensor): The global coordinates sampled from N(mu, A^T A)
"""
# Global samples are constructed as x = mu + A z where z is local coordinate
# We use transpose here to simplify the batched application of A
return self.mu.unsqueeze(0) + (self.A @ local_coordinates.T).T
to_local_coordinates(self, global_coordinates)
¶
Map samples from global coordinate space N(mu, A^T A) to local coordinate space N(0, I_d) This function is the inverse of to_global_coordinates
Parameters:
Name | Type | Description | Default |
---|---|---|---|
global_coordinates |
torch.Tensor |
The global coordinates sampled from N(mu, A^T A) |
required |
Returns:
Type | Description |
---|---|
local_coordinates (torch.Tensor) |
The local coordinates sampled from N(0, I_d) |
Source code in evotorch/algorithms/distributed/gaussian.py
def to_local_coordinates(self, global_coordinates: torch.Tensor) -> torch.Tensor:
"""Map samples from global coordinate space N(mu, A^T A) to local coordinate space N(0, I_d)
This function is the inverse of to_global_coordinates
Args:
global_coordinates (torch.Tensor): The global coordinates sampled from N(mu, A^T A)
Returns:
local_coordinates (torch.Tensor): The local coordinates sampled from N(0, I_d)
"""
# Global samples are constructed as x = mu + A z where z is local coordinate
# Therefore, we can recover z according to z = A_inv (x - mu)
return (self.A_inv @ (global_coordinates - self.mu.unsqueeze(0)).T).T
update_parameters(self, gradients, *, learning_rates=None, optimizers=None)
¶
Do an update on the distribution by following the given gradients.
It is expected that the inheriting class has its own implementation for this method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gradients |
dict |
Gradients, as a dictionary, which will be used for computing the necessary updates. |
required |
learning_rates |
Optional[dict] |
A dictionary which contains learning rates for parameters that will be updated using a learning rate coefficient. |
None |
optimizers |
Optional[dict] |
A dictionary which contains optimizer objects for parameters that will be updated using an adaptive optimizer. |
None |
Returns:
Type | Description |
---|---|
ExpGaussian |
The updated copy of the distribution. |
Source code in evotorch/algorithms/distributed/gaussian.py
def update_parameters(
self,
gradients: dict,
*,
learning_rates: Optional[dict] = None,
optimizers: Optional[dict] = None,
) -> "ExpGaussian":
d_grad = gradients["d"]
M_grad = gradients["M"]
if "d" not in learning_rates:
learning_rates["d"] = learning_rates["mu"]
if "M" not in learning_rates:
learning_rates["M"] = learning_rates["sigma"]
# Follow gradients for d, and M
update_d = self._follow_gradient("d", d_grad, learning_rates=learning_rates, optimizers=optimizers)
update_M = self._follow_gradient("M", M_grad, learning_rates=learning_rates, optimizers=optimizers)
# Fold into parameters mu, A and A inv
new_mu = self.mu + torch.mv(self.A, update_d)
new_A = self.A @ torch.matrix_exp(0.5 * update_M)
new_A_inv = torch.matrix_exp(-0.5 * update_M) @ self.A_inv
# Return modified distribution
return self.modified_copy(mu=new_mu, sigma=new_A, sigma_inv=new_A_inv)
__init__(self, problem, *, stdev_init=None, radius_init=None, popsize=None, center_learning_rate=None, stdev_learning_rate=None, scale_learning_rate=True, num_interactions=None, popsize_max=None, optimizer=None, optimizer_config=None, ranking_method='nes', center_init=None, obj_index=None, distributed=False, popsize_weighted_grad_avg=None)
special
¶
__init__(...)
: Initialize the XNES algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object which is being worked on. |
required |
stdev_init |
Union[float, Iterable[float], torch.Tensor] |
The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
radius_init |
Union[float, Iterable[float], torch.Tensor] |
The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
popsize |
Optional[int] |
Population size. Can be specified as an int,
or can be left as None to let the solver decide.
In the case of SNES, |
None |
center_learning_rate |
Optional[float] |
Learning rate for updating the mean of the search distribution. Default value is 1.0 |
None |
stdev_learning_rate |
Optional[float] |
Learning rate for updating the covariance
matrix of the search distribution.
The default value is |
None |
scale_learning_rate |
bool |
For SNES, there is a default standard
deviation learning rate value which is computed as
|
True |
num_interactions |
Optional[int] |
When given as an integer n, it is ensured that a population has interacted with the GymProblem's environment n times. If this target has not been reached yet, then the population is declared too small, and gets extended with more samples, until n amount of interactions is reached. When given as None, popsize is the only configuration affecting the size of a population. |
None |
popsize_max |
Optional[int] |
Having |
None |
num_interactions |
Optional[int] |
When given as an integer n, it is ensured that a population has interacted with the GymProblem's environment n times. If this target has not been reached yet, then the population is declared too small, and gets extended with more samples, until n amount of interactions is reached. When given as None, popsize is the only configuration affecting the size of a population. |
None |
optimizer |
The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
The default is None.
Note that, for ClipUp, the default maximum speed is set
as twice the given |
None |
|
optimizer_config |
Optional[dict] |
Configuration which will be passed
to the optimizer as keyword arguments.
See |
None |
ranking_method |
Optional[str] |
Which ranking method will be used for
fitness shaping. See the documentation of
|
'nes' |
center_init |
Union[float, Iterable[float], torch.Tensor] |
The initial center solution. Can be left as None. |
None |
obj_index |
Optional[int] |
Index of the objective according to which the gradient estimations will be done. For single-objective problems, this can be left as None. |
None |
distributed |
bool |
Whether or not the gradient computation will
be distributed. If |
False |
popsize_weighted_grad_avg |
Optional[bool] |
Only to be used in distributed mode.
(where being in distributed mode means |
None |
Source code in evotorch/algorithms/distributed/gaussian.py
def __init__(
self,
problem: Problem,
*,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
popsize: Optional[int] = None,
center_learning_rate: Optional[float] = None,
stdev_learning_rate: Optional[float] = None,
scale_learning_rate: bool = True,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer=None,
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = "nes",
center_init: Optional[RealOrVector] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the XNES algorithm.
Args:
problem: The problem object which is being worked on.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
popsize: Population size. Can be specified as an int,
or can be left as None to let the solver decide.
In the case of SNES, `popsize` can be left as None,
in which case the default `popsize` will be computed
as `4 + floor(3 * log(n))` where `n` is the length
of a solution.
center_learning_rate: Learning rate for updating the mean
of the search distribution. Default value is 1.0
stdev_learning_rate: Learning rate for updating the covariance
matrix of the search distribution.
The default value is `0.6 * (3 + log(n)) / (n * sqrt(n))`
where `n` is the length of a solution.
scale_learning_rate: For SNES, there is a default standard
deviation learning rate value which is computed as
`0.6 * (3 + log(n)) / (n * sqrt(n))` (where `n` is the solution
length).
If scale_learning_rate is True (which is the default),
then the effective learning rate for the standard deviation
becomes the provided `stdev_learning_rate` multiplied by this
default value. If `scale_learning_rate` is False, then the
effective standard deviation learning rate becomes
equal to the provided `stdev_learning_rate` value.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
optimizer: The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
The default is None.
Note that, for ClipUp, the default maximum speed is set
as twice the given `center_learning_rate`.
This maximum speed can be configured by passing
`{"max_speed": ...}` to `optimizer_config`.
optimizer_config: Configuration which will be passed
to the optimizer as keyword arguments.
See `evotorch.optimizers` for details about
which optimizer accepts which keyword arguments.
ranking_method: Which ranking method will be used for
fitness shaping. See the documentation of
`evotorch.ranking.rank(...)` for details.
The default is 'nes'.
Can be given as None if no such ranking is required.
center_init: The initial center solution.
Can be left as None.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
if popsize is None:
popsize = int(4 + math.floor(3 * math.log(problem.solution_length)))
if center_learning_rate is None:
center_learning_rate = 1.0
def default_stdev_lr():
n = problem.solution_length
return 0.6 * (3 + math.log(n)) / (n * math.sqrt(n))
if stdev_learning_rate is None:
stdev_learning_rate = default_stdev_lr()
else:
stdev_learning_rate = float(stdev_learning_rate)
if scale_learning_rate:
stdev_learning_rate *= default_stdev_lr()
super().__init__(
problem,
popsize=popsize,
center_learning_rate=center_learning_rate,
stdev_learning_rate=stdev_learning_rate,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=optimizer,
optimizer_config=optimizer_config,
ranking_method=ranking_method,
center_init=center_init,
stdev_min=None,
stdev_max=None,
stdev_max_change=None,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
)
ga
¶
Genetic algorithm variants: GeneticAlgorithm, Cosyne.
Cosyne (SearchAlgorithm, SinglePopulationAlgorithmMixin)
¶
Implementation of the CoSyNE algorithm.
References:
F.Gomez, J.Schmidhuber, R.Miikkulainen, M.Mitchell (2008).
Accelerated Neural Evolution through Cooperatively Coevolved Synapses.
Journal of Machine Learning Research 9 (5).
Source code in evotorch/algorithms/ga.py
class Cosyne(SearchAlgorithm, SinglePopulationAlgorithmMixin):
"""
Implementation of the CoSyNE algorithm.
References:
F.Gomez, J.Schmidhuber, R.Miikkulainen, M.Mitchell (2008).
Accelerated Neural Evolution through Cooperatively Coevolved Synapses.
Journal of Machine Learning Research 9 (5).
"""
def __init__(
self,
problem: Problem,
*,
popsize: int,
tournament_size: int,
mutation_stdev: Optional[float],
mutation_probability: Optional[float] = None,
permute_all: bool = False,
num_elites: Optional[int] = None,
elitism_ratio: Optional[float] = None,
eta: Optional[float] = None,
num_children: Optional[int] = None,
):
"""
`__init__(...)`: Initialize the Cosyne instance.
Args:
problem: The problem object to work on.
popsize: Population size, as an integer.
tournament_size: Tournament size, for tournament selection.
mutation_stdev: Standard deviation of the Gaussian mutation.
See [GaussianMutation][evotorch.operators.real.GaussianMutation] for more information.
mutation_probability: Elementwise Gaussian mutation probability.
Defaults to None.
See [GaussianMutation][evotorch.operators.real.GaussianMutation] for more information.
permute_all: If given as True, all solutions are subject to
permutation. If given as False (which is the default),
there will be a selection procedure for each decision
variable.
num_elites: Optionally expected as an integer, specifying the
number of elites to pass to the next generation.
Cannot be used together with the argument `elitism_ratio`.
elitism_ratio: Optionally expected as a real number between
0 and 1, specifying the amount of elites to pass to the
next generation. For example, 0.1 means that the best 10%
of the population are accepted as elites and passed onto
the next generation.
Cannot be used together with the argument `num_elites`.
eta: Optionally expected as an integer, specifying the eta
hyperparameter for the simulated binary cross-over (SBX).
If left as None, one-point cross-over will be used instead.
num_children: Number of children to generate at each iteration.
If left as None, then this number is half of the population
size.
"""
problem.ensure_numeric()
SearchAlgorithm.__init__(self, problem)
if mutation_stdev is None:
if mutation_probability is not None:
raise ValueError(
f"`mutation_probability` was set to {mutation_probability}, but `mutation_stdev` is None, "
"which means, mutation is disabled. If you want to enable the mutation, be sure to provide "
"`mutation_stdev` as well."
)
self.mutation_op = None
else:
self.mutation_op = GaussianMutation(
self._problem,
stdev=mutation_stdev,
mutation_probability=mutation_probability,
)
cross_over_kwargs = {"tournament_size": tournament_size}
if num_children is None:
cross_over_kwargs["cross_over_rate"] = 2.0
else:
cross_over_kwargs["num_children"] = num_children
if eta is None:
self._cross_over_op = OnePointCrossOver(self._problem, **cross_over_kwargs)
else:
self._cross_over_op = SimulatedBinaryCrossOver(self._problem, eta=eta, **cross_over_kwargs)
self._permutation_op = CosynePermutation(self._problem, permute_all=permute_all)
self._popsize = int(popsize)
if num_elites is not None and elitism_ratio is None:
self._num_elites = int(num_elites)
elif num_elites is None and elitism_ratio is not None:
self._num_elites = int(self._popsize * elitism_ratio)
elif num_elites is None and elitism_ratio is None:
self._num_elites = None
else:
raise ValueError(
"Received both `num_elites` and `elitism_ratio`. Please provide only one of them, or none of them."
)
self._population = SolutionBatch(problem, device=problem.device, popsize=self._popsize)
self._first_generation: bool = True
# GAStatusMixin.__init__(self)
SinglePopulationAlgorithmMixin.__init__(self)
@property
def population(self) -> SolutionBatch:
return self._population
def _step(self):
if self._first_generation:
self._first_generation = False
self._problem.evaluate(self._population)
to_merge = []
num_elites = self._num_elites
num_parents = int(self._popsize / 4)
num_relevant = max((0 if num_elites is None else num_elites), num_parents)
sorted_relevant = self._population.take_best(num_relevant)
if self._num_elites is not None and self._num_elites >= 1:
to_merge.append(sorted_relevant[:num_elites].clone())
parents = sorted_relevant[:num_parents]
children = self._cross_over_op(parents)
if self.mutation_op is not None:
children = self.mutation_op(children)
permuted = self._permutation_op(self._population)
to_merge.extend([children, permuted])
extended_population = SolutionBatch(merging_of=to_merge)
self._problem.evaluate(extended_population)
self._population = extended_population.take_best(self._popsize)
__init__(self, problem, *, popsize, tournament_size, mutation_stdev, mutation_probability=None, permute_all=False, num_elites=None, elitism_ratio=None, eta=None, num_children=None)
special
¶
__init__(...)
: Initialize the Cosyne instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object to work on. |
required |
popsize |
int |
Population size, as an integer. |
required |
tournament_size |
int |
Tournament size, for tournament selection. |
required |
mutation_stdev |
Optional[float] |
Standard deviation of the Gaussian mutation. See GaussianMutation for more information. |
required |
mutation_probability |
Optional[float] |
Elementwise Gaussian mutation probability. Defaults to None. See GaussianMutation for more information. |
None |
permute_all |
bool |
If given as True, all solutions are subject to permutation. If given as False (which is the default), there will be a selection procedure for each decision variable. |
False |
num_elites |
Optional[int] |
Optionally expected as an integer, specifying the
number of elites to pass to the next generation.
Cannot be used together with the argument |
None |
elitism_ratio |
Optional[float] |
Optionally expected as a real number between
0 and 1, specifying the amount of elites to pass to the
next generation. For example, 0.1 means that the best 10%
of the population are accepted as elites and passed onto
the next generation.
Cannot be used together with the argument |
None |
eta |
Optional[float] |
Optionally expected as an integer, specifying the eta hyperparameter for the simulated binary cross-over (SBX). If left as None, one-point cross-over will be used instead. |
None |
num_children |
Optional[int] |
Number of children to generate at each iteration. If left as None, then this number is half of the population size. |
None |
Source code in evotorch/algorithms/ga.py
def __init__(
self,
problem: Problem,
*,
popsize: int,
tournament_size: int,
mutation_stdev: Optional[float],
mutation_probability: Optional[float] = None,
permute_all: bool = False,
num_elites: Optional[int] = None,
elitism_ratio: Optional[float] = None,
eta: Optional[float] = None,
num_children: Optional[int] = None,
):
"""
`__init__(...)`: Initialize the Cosyne instance.
Args:
problem: The problem object to work on.
popsize: Population size, as an integer.
tournament_size: Tournament size, for tournament selection.
mutation_stdev: Standard deviation of the Gaussian mutation.
See [GaussianMutation][evotorch.operators.real.GaussianMutation] for more information.
mutation_probability: Elementwise Gaussian mutation probability.
Defaults to None.
See [GaussianMutation][evotorch.operators.real.GaussianMutation] for more information.
permute_all: If given as True, all solutions are subject to
permutation. If given as False (which is the default),
there will be a selection procedure for each decision
variable.
num_elites: Optionally expected as an integer, specifying the
number of elites to pass to the next generation.
Cannot be used together with the argument `elitism_ratio`.
elitism_ratio: Optionally expected as a real number between
0 and 1, specifying the amount of elites to pass to the
next generation. For example, 0.1 means that the best 10%
of the population are accepted as elites and passed onto
the next generation.
Cannot be used together with the argument `num_elites`.
eta: Optionally expected as an integer, specifying the eta
hyperparameter for the simulated binary cross-over (SBX).
If left as None, one-point cross-over will be used instead.
num_children: Number of children to generate at each iteration.
If left as None, then this number is half of the population
size.
"""
problem.ensure_numeric()
SearchAlgorithm.__init__(self, problem)
if mutation_stdev is None:
if mutation_probability is not None:
raise ValueError(
f"`mutation_probability` was set to {mutation_probability}, but `mutation_stdev` is None, "
"which means, mutation is disabled. If you want to enable the mutation, be sure to provide "
"`mutation_stdev` as well."
)
self.mutation_op = None
else:
self.mutation_op = GaussianMutation(
self._problem,
stdev=mutation_stdev,
mutation_probability=mutation_probability,
)
cross_over_kwargs = {"tournament_size": tournament_size}
if num_children is None:
cross_over_kwargs["cross_over_rate"] = 2.0
else:
cross_over_kwargs["num_children"] = num_children
if eta is None:
self._cross_over_op = OnePointCrossOver(self._problem, **cross_over_kwargs)
else:
self._cross_over_op = SimulatedBinaryCrossOver(self._problem, eta=eta, **cross_over_kwargs)
self._permutation_op = CosynePermutation(self._problem, permute_all=permute_all)
self._popsize = int(popsize)
if num_elites is not None and elitism_ratio is None:
self._num_elites = int(num_elites)
elif num_elites is None and elitism_ratio is not None:
self._num_elites = int(self._popsize * elitism_ratio)
elif num_elites is None and elitism_ratio is None:
self._num_elites = None
else:
raise ValueError(
"Received both `num_elites` and `elitism_ratio`. Please provide only one of them, or none of them."
)
self._population = SolutionBatch(problem, device=problem.device, popsize=self._popsize)
self._first_generation: bool = True
# GAStatusMixin.__init__(self)
SinglePopulationAlgorithmMixin.__init__(self)
ExtendedPopulationMixin
¶
A mixin class that provides the method _make_extended_population(...)
.
This mixin class assumes that the inheriting class has the properties
problem
(of type Problem), which provide
and population
(of type SolutionBatch),
which provide the associated problem object and the current population,
respectively.
The class which inherits this mixin class gains the method
_make_extended_population(...)
. This new method applies the operators
specified during the initialization phase of this mixin class on the
current population, produces children, and then returns an extended
population.
Source code in evotorch/algorithms/ga.py
class ExtendedPopulationMixin:
"""
A mixin class that provides the method `_make_extended_population(...)`.
This mixin class assumes that the inheriting class has the properties
`problem` (of type [Problem][evotorch.core.Problem]), which provide
and `population` (of type [SolutionBatch][evotorch.core.SolutionBatch]),
which provide the associated problem object and the current population,
respectively.
The class which inherits this mixin class gains the method
`_make_extended_population(...)`. This new method applies the operators
specified during the initialization phase of this mixin class on the
current population, produces children, and then returns an extended
population.
"""
def __init__(
self,
*,
re_evaluate: bool,
re_evaluate_parents_first: Optional[bool] = None,
operators: Optional[Iterable] = None,
allow_empty_operators_list: bool = False,
):
"""
`__init__(...)`: Initialize the ExtendedPopulationMixin.
Args:
re_evaluate: Whether or not to re-evaluate the parent population
at every generation. When dealing with problems where the
fitness and/or feature evaluations are stochastic, one might
want to set this as True. On the other hand, for when the
fitness and/or feature evaluations are deterministic, one
might prefer to set this as False for efficiency.
re_evaluate_parents_first: This is to be specified only when
`re_evaluate` is True (otherwise to be left as None).
If this is given as True, then it will be assumed that the
provided operators require the parents to be evaluated.
If this is given as False, then it will be assumed that the
provided operators work without looking at the parents'
fitnesses (in which case both parents and children can be
evaluated in a single vectorized computation cycle).
If this is left as None, then whether or not the operators
need to know the parent evaluations will be determined
automatically as follows:
if the operators contain at least one cross-over operator
then `re_evaluate_parents_first` will be internally set as
True; otherwise `re_evaluate_parents_first` will be internally
set as False.
operators: List of operators to apply on the current population
for generating a new extended population.
allow_empty_operators_list: Whether or not to allow the operator
list to be empty. The default and the recommended value
is False. For cases where the inheriting class wants to
decide the operators later (via the attribute `_operators`)
this can be set as True.
"""
self._operators = [] if operators is None else list(operators)
if (not allow_empty_operators_list) and (len(self._operators) == 0):
raise ValueError("Received `operators` as an empty sequence. Please provide at least one operator.")
self._using_cross_over: bool = False
for operator in self._operators:
if isinstance(operator, CrossOver):
self._using_cross_over = True
break
self._re_evaluate: bool = bool(re_evaluate)
if re_evaluate_parents_first is None:
self._re_evaluate_parents_first = self._using_cross_over
else:
if not self._re_evaluate:
raise ValueError(
"Encountered the argument `re_evaluate_parents_first` as something other than None."
" However, `re_evaluate` is given as False."
" Please use `re_evaluate_parents_first` only when `re_evaluate` is True."
)
self._re_evaluate_parents_first = bool(re_evaluate_parents_first)
self._first_iter: bool = True
def _make_extended_population(self, split: bool = False) -> Union[SolutionBatch, tuple]:
"""
Make and return a new extended population that is evaluated.
Args:
split: If False, then the extended population will be returned
as a single SolutionBatch object which contains both the
parents and the children.
If True, then the extended population will be returned
as a pair of SolutionBatch objects, the first one being
the parents and the second one being the children.
Returns:
The extended population.
"""
# Get the problem object and the population
problem: Problem = self.problem
population: SolutionBatch = self.population
if self._re_evaluate:
# This is the case where our mixin is configured to re-evaluate the parents at every generation.
# Set the first iteration indicator to False
self._first_iter = False
if self._re_evaluate_parents_first:
# This is the case where our mixin is configured to evaluate the parents separately first.
# This is a sub-case of `_re_evaluate=True`.
# Evaluate the population, which stores the parents
problem.evaluate(population)
# Now that our parents are evaluated, we use the operators on them and get the children.
children = _use_operators(population, self._operators)
# Evaluate the children
problem.evaluate(children)
if split:
# If our mixin is configured to return the population and the children, then we return a tuple
# containing them as separate items.
return population, children
else:
# If our mixin is configured to return the population and the children in a single batch,
# then we concatenate the population and the children and return the resulting combined batch.
return SolutionBatch.cat([population, children])
else:
# This is the case where our mixin is configured to evaluate the parents and the children in one go.
# This is a sub-case of `_re_evaluate=True`.
# Use the operators on the parent solutions. It does not matter whether or not the parents are evaluated.
children = _use_operators(population, self._operators)
# Form an extended population by concatenating the population and the children.
extended_population = SolutionBatch.cat([population, children])
# Evaluate the extended population in one go.
problem.evaluate(extended_population)
if split:
# The method was configured to return the parents and the children separately.
# Because we combined them earlier for evaluating them in one go, we will split them now.
# Get the number of parents
num_parents = len(population)
# Get the newly evaluated copies of the parents from the extended population
parents = extended_population[:num_parents]
# Get the children from the extended population
children = extended_population[num_parents:]
# Return the newly evaluated copies of the parents and the children separately.
return parents, children
else:
# The method was configured to return the parents and the children in a single SolutionBatch.
# Here, we just return the extended population that we already have produced.
return extended_population
else:
# This is the case where our mixin is configured NOT to re-evaluate the parents at every generation.
if self._first_iter:
# The first iteration indicator (`_first_iter`) is True. So, this is the first iteration.
# We set `_first_iter` to False for future generations.
self._first_iter = False
# We not evaluate the parent population (because the parents are expected to be non-evaluated at the
# beginning).
problem.evaluate(population)
# Here, we assume that the parents are already evaluated. We apply our operators on the parents.
children = _use_operators(population, self._operators)
# Now, we evaluate the children.
problem.evaluate(children)
if split:
# Return the population and the children separately if `split=True`.
return population, children
else:
# Return the population and the children in a single SolutionBatch if `split=False`.
return SolutionBatch.cat([population, children])
@property
def re_evaluate(self) -> bool:
"""
Whether or not this search algorithm re-evaluates the parents
"""
return self._re_evaluate
@property
def re_evaluate_parents_first(self) -> Optional[bool]:
"""
Whether or not this search algorithm re-evaluates the parents separately.
This property is relevant only when `re_evaluate` is True.
If `re_evaluate` is False, then this property will return None.
"""
if self._re_evaluate:
return self._re_evaluate_parents_first
else:
return None
re_evaluate: bool
property
readonly
¶
Whether or not this search algorithm re-evaluates the parents
re_evaluate_parents_first: Optional[bool]
property
readonly
¶
Whether or not this search algorithm re-evaluates the parents separately.
This property is relevant only when re_evaluate
is True.
If re_evaluate
is False, then this property will return None.
__init__(self, *, re_evaluate, re_evaluate_parents_first=None, operators=None, allow_empty_operators_list=False)
special
¶
__init__(...)
: Initialize the ExtendedPopulationMixin.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
re_evaluate |
bool |
Whether or not to re-evaluate the parent population at every generation. When dealing with problems where the fitness and/or feature evaluations are stochastic, one might want to set this as True. On the other hand, for when the fitness and/or feature evaluations are deterministic, one might prefer to set this as False for efficiency. |
required |
re_evaluate_parents_first |
Optional[bool] |
This is to be specified only when
|
None |
operators |
Optional[Iterable] |
List of operators to apply on the current population for generating a new extended population. |
None |
allow_empty_operators_list |
bool |
Whether or not to allow the operator
list to be empty. The default and the recommended value
is False. For cases where the inheriting class wants to
decide the operators later (via the attribute |
False |
Source code in evotorch/algorithms/ga.py
def __init__(
self,
*,
re_evaluate: bool,
re_evaluate_parents_first: Optional[bool] = None,
operators: Optional[Iterable] = None,
allow_empty_operators_list: bool = False,
):
"""
`__init__(...)`: Initialize the ExtendedPopulationMixin.
Args:
re_evaluate: Whether or not to re-evaluate the parent population
at every generation. When dealing with problems where the
fitness and/or feature evaluations are stochastic, one might
want to set this as True. On the other hand, for when the
fitness and/or feature evaluations are deterministic, one
might prefer to set this as False for efficiency.
re_evaluate_parents_first: This is to be specified only when
`re_evaluate` is True (otherwise to be left as None).
If this is given as True, then it will be assumed that the
provided operators require the parents to be evaluated.
If this is given as False, then it will be assumed that the
provided operators work without looking at the parents'
fitnesses (in which case both parents and children can be
evaluated in a single vectorized computation cycle).
If this is left as None, then whether or not the operators
need to know the parent evaluations will be determined
automatically as follows:
if the operators contain at least one cross-over operator
then `re_evaluate_parents_first` will be internally set as
True; otherwise `re_evaluate_parents_first` will be internally
set as False.
operators: List of operators to apply on the current population
for generating a new extended population.
allow_empty_operators_list: Whether or not to allow the operator
list to be empty. The default and the recommended value
is False. For cases where the inheriting class wants to
decide the operators later (via the attribute `_operators`)
this can be set as True.
"""
self._operators = [] if operators is None else list(operators)
if (not allow_empty_operators_list) and (len(self._operators) == 0):
raise ValueError("Received `operators` as an empty sequence. Please provide at least one operator.")
self._using_cross_over: bool = False
for operator in self._operators:
if isinstance(operator, CrossOver):
self._using_cross_over = True
break
self._re_evaluate: bool = bool(re_evaluate)
if re_evaluate_parents_first is None:
self._re_evaluate_parents_first = self._using_cross_over
else:
if not self._re_evaluate:
raise ValueError(
"Encountered the argument `re_evaluate_parents_first` as something other than None."
" However, `re_evaluate` is given as False."
" Please use `re_evaluate_parents_first` only when `re_evaluate` is True."
)
self._re_evaluate_parents_first = bool(re_evaluate_parents_first)
self._first_iter: bool = True
GeneticAlgorithm (SearchAlgorithm, SinglePopulationAlgorithmMixin, ExtendedPopulationMixin)
¶
A genetic algorithm implementation.
Basic usage. Let us consider a single-objective optimization problem where the goal is to minimize the L2 norm of a continuous tensor of length 10:
from evotorch import Problem
from evotorch.algorithms import GeneticAlgorithm
from evotorch.operators import OnePointCrossOver, GaussianMutation
import torch
def f(x: torch.Tensor) -> torch.Tensor:
return torch.linalg.norm(x)
problem = Problem(
"min",
f,
initial_bounds=(-10.0, 10.0),
solution_length=10,
)
For solving this problem, a genetic algorithm could be instantiated as follows:
ga = GeneticAlgorithm(
problem,
popsize=100,
operators=[
OnePointCrossOver(problem, tournament_size=4),
GaussianMutation(problem, stdev=0.1),
],
)
The genetic algorithm instantiated above is configured to have a population size of 100, and is configured to perform the following operations on the population at each generation: (i) select solutions with a tournament of size 4, and produce children from the selected solutions by applying one-point cross-over; (ii) apply a gaussian mutation on the values of the solutions produced by the previous step, the amount of the mutation being sampled according to a standard deviation of 0.1. Once instantiated, this GeneticAlgorithm instance can be used with an API compatible with other search algorithms, as shown below:
from evotorch.logging import StdOutLogger
_ = StdOutLogger(ga) # Report the evolution's progress to standard output
ga.run(100) # Run the algorithm for 100 generations
print("Solution with best fitness ever:", ga.status["best"])
print("Current population's best:", ga.status["pop_best"])
Please also note:
- The operators are always executed according to the order specified within
the
operators
argument. - There are more operators available in the namespace evotorch.operators.
- By default, GeneticAlgorithm is elitist. In the elitist mode, an extended
population is formed from parent solutions and child solutions, and the
best n solutions of this extended population are accepted as the next
generation. If you wish to switch to a non-elitist mode (where children
unconditionally replace the worst-performing parents), you can use the
initialization argument
elitist=False
. - It is not mandatory to specify a cross-over operator. When a cross-over operator is missing, the GeneticAlgorithm will work like a simple evolution strategy implementation which produces children by mutating the parents, and then replaces the parents (where the criteria for replacing the parents depend on whether or not elitism is enabled).
- To be able to deal with stochastic fitness functions correctly,
GeneticAlgorithm re-evaluates previously evaluated parents as well.
When you are sure that the fitness function is deterministic,
you can pass the initialization argument
re_evaluate=False
to prevent unnecessary computations.
Integer decision variables.
GeneticAlgorithm can be used on problems with dtype
declared as integer
(e.g. torch.int32
, torch.int64
, etc.).
Within the field of discrete optimization, it is common to encounter
one or more of these scenarios:
- The search space of the problem has a special structure that one will wish to exploit (within the cross-over and/or mutation operators) to be able to reach the (near-)optimum within a reasonable amount of time.
- The problem is partially or fully combinatorial.
- The problem is constrained in such a way that arbitrarily sampling discrete values for its decision variables might cause infeasibility.
Considering all these scenarios, it is difficult to come up with general cross-over and mutation operators that will work across various discrete optimization problems, and it is common to design problem-specific operators. In EvoTorch, it is possible to define custom operators and use them with GeneticAlgorithm, which is required when using GeneticAlgorithm on a problem with a non-float dtype.
As an example, let us consider the following discrete optimization problem:
def f(x: torch.Tensor) -> torch.Tensor:
return torch.sum(x)
problem = Problem(
"min",
f,
bounds=(-10, 10),
solution_length=10,
dtype=torch.int64,
)
Although EvoTorch does provide a very simple and generic (usable with float and int dtypes) cross-over named OnePointCrossOver (a cross-over which randomly decides a cutting point for each pair of parents, cuts them from those points and recombines them), it can be desirable and necessary to implement a custom cross-over operator. One can inherit from CrossOver to define a custom cross-over operator, as shown below:
from evotorch import SolutionBatch
from evotorch.operators import CrossOver
class CustomCrossOver(CrossOver):
def _do_cross_over(
self,
parents1: torch.Tensor,
parents2: torch.Tensor,
) -> SolutionBatch:
# parents1 is a tensor storing the decision values of the first
# half of the chosen parents.
# parents2 is a tensor storing the decision values of the second
# half of the chosen parents.
# We expect that the lengths of parents1 and parents2 are equal.
assert len(parents1) == len(parents2)
# Allocate an empty SolutionBatch that will store the children
childpop = SolutionBatch(self.problem, popsize=num_parents, empty=True)
# Gain access to the decision values tensor of the newly allocated
# childpop
childpop_values = childpop.access_values()
# Here we somehow fill `childpop_values` by recombining the parents.
# The most common thing to do is to produce two children by
# combining parents1[0] and parents2[0], to produce the next two
# children parents1[1] and parents2[1], and so on.
childpop_values[:] = ...
# Return the child population
return childpop
One can define a custom mutation operator by inheriting from Operator, as shown below:
class CustomMutation(Operator):
def _do(self, solutions: SolutionBatch):
# Get the decision values tensor of the solutions
sln_values = solutions.access_values()
# do in-place modifications to the decision values
sln_values[:] = ...
Alternatively, you could define the mutation operator as a function:
def my_mutation_function(original_values: torch.Tensor) -> torch.Tensor:
# Somehow produce mutated copies of the original values
mutated_values = ...
# Return the mutated values
return mutated_values
With these defined operators, we are now ready to instantiate our GeneticAlgorithm:
ga = GeneticAlgorithm(
problem,
popsize=100,
operators=[
CustomCrossOver(problem, tournament_size=4),
CustomMutation(problem),
# -- or, if you chose to define the mutation as a function: --
# my_mutation_function,
],
)
Non-numeric or variable-length solutions.
GeneticAlgorithm can also work on problems whose dtype
is declared
as object
, where dtype=object
means that a solution's value(s) can be
expressed via a tensor, a numpy array, a scalar, a tuple, a list, a
dictionary.
Like in the previously discussed case (where dtype is an integer type),
one has to define custom operators when working on problems with
dtype=object
. A custom cross-over definition specialized for
dtype=object
looks like this:
from evotorch.tools import ObjectArray
class CrossOverForObjectDType(CrossOver):
def _do_cross_over(
self,
parents1: ObjectArray,
parents2: ObjectArray,
) -> SolutionBatch:
# Allocate an empty SolutionBatch that will store the children
childpop = SolutionBatch(self.problem, popsize=num_parents, empty=True)
# Gain access to the decision values ObjectArray of the newly allocated
# childpop
childpop_values = childpop.access_values()
# Here we somehow fill `childpop_values` by recombining the parents.
# The most common thing to do is to produce two children by
# combining parents1[0] and parents2[0], to produce the next two
# children parents1[1] and parents2[1], and so on.
childpop_values[:] = ...
# Return the child population
return childpop
A custom mutation operator specialized for dtype=object
looks like this:
class MutationForObjectDType(Operator):
def _do(self, solutions: SolutionBatch):
# Get the decision values ObjectArray of the solutions
sln_values = solutions.access_values()
# do in-place modifications to the decision values
sln_values[:] = ...
A custom mutation function specialized for dtype=object
looks like this:
def mutation_for_object_dtype(original_values: ObjectArray) -> ObjectArray:
# Somehow produce mutated copies of the original values
mutated_values = ...
# Return the mutated values
return mutated_values
With these operators defined, one can instantiate the GeneticAlgorithm:
ga = GeneticAlgorithm(
problem_with_object_dtype,
popsize=100,
operators=[
CrossOverForObjectDType(problem_with_object_dtype, tournament_size=4),
MutationForObjectDType(problem_with_object_dtype),
# -- or, if you chose to define the mutation as a function: --
# mutation_for_object_dtype,
],
)
Multiple objectives. GeneticAlgorithm can work on problems with multiple objectives. When there are multiple objectives, GeneticAlgorithm will compare the solutions according to their pareto-ranks and their crowding distances, like done by the NSGA-II algorithm (Deb, 2002).
References:
Sean Luke, 2013, Essentials of Metaheuristics, Lulu, second edition
available for free at http://cs.gmu.edu/~sean/book/metaheuristics/
Kalyanmoy Deb, Amrit Pratap, Sameer Agarwal, T. Meyarivan (2002).
A Fast and Elitist Multiobjective Genetic Algorithm: NSGA-II.
Source code in evotorch/algorithms/ga.py
class GeneticAlgorithm(SearchAlgorithm, SinglePopulationAlgorithmMixin, ExtendedPopulationMixin):
"""
A genetic algorithm implementation.
**Basic usage.**
Let us consider a single-objective optimization problem where the goal is to
minimize the L2 norm of a continuous tensor of length 10:
```python
from evotorch import Problem
from evotorch.algorithms import GeneticAlgorithm
from evotorch.operators import OnePointCrossOver, GaussianMutation
import torch
def f(x: torch.Tensor) -> torch.Tensor:
return torch.linalg.norm(x)
problem = Problem(
"min",
f,
initial_bounds=(-10.0, 10.0),
solution_length=10,
)
```
For solving this problem, a genetic algorithm could be instantiated as
follows:
```python
ga = GeneticAlgorithm(
problem,
popsize=100,
operators=[
OnePointCrossOver(problem, tournament_size=4),
GaussianMutation(problem, stdev=0.1),
],
)
```
The genetic algorithm instantiated above is configured to have a population
size of 100, and is configured to perform the following operations on the
population at each generation:
(i) select solutions with a tournament of size 4, and produce children from
the selected solutions by applying one-point cross-over;
(ii) apply a gaussian mutation on the values of the solutions produced by
the previous step, the amount of the mutation being sampled according to a
standard deviation of 0.1.
Once instantiated, this GeneticAlgorithm instance can be used with an API
compatible with other search algorithms, as shown below:
```python
from evotorch.logging import StdOutLogger
_ = StdOutLogger(ga) # Report the evolution's progress to standard output
ga.run(100) # Run the algorithm for 100 generations
print("Solution with best fitness ever:", ga.status["best"])
print("Current population's best:", ga.status["pop_best"])
```
Please also note:
- The operators are always executed according to the order specified within
the `operators` argument.
- There are more operators available in the namespace
[evotorch.operators][evotorch.operators].
- By default, GeneticAlgorithm is elitist. In the elitist mode, an extended
population is formed from parent solutions and child solutions, and the
best n solutions of this extended population are accepted as the next
generation. If you wish to switch to a non-elitist mode (where children
unconditionally replace the worst-performing parents), you can use the
initialization argument `elitist=False`.
- It is not mandatory to specify a cross-over operator. When a cross-over
operator is missing, the GeneticAlgorithm will work like a simple
evolution strategy implementation which produces children by mutating
the parents, and then replaces the parents (where the criteria for
replacing the parents depend on whether or not elitism is enabled).
- To be able to deal with stochastic fitness functions correctly,
GeneticAlgorithm re-evaluates previously evaluated parents as well.
When you are sure that the fitness function is deterministic,
you can pass the initialization argument `re_evaluate=False` to prevent
unnecessary computations.
**Integer decision variables.**
GeneticAlgorithm can be used on problems with `dtype` declared as integer
(e.g. `torch.int32`, `torch.int64`, etc.).
Within the field of discrete optimization, it is common to encounter
one or more of these scenarios:
- The search space of the problem has a special structure that one will
wish to exploit (within the cross-over and/or mutation operators) to
be able to reach the (near-)optimum within a reasonable amount of time.
- The problem is partially or fully combinatorial.
- The problem is constrained in such a way that arbitrarily sampling
discrete values for its decision variables might cause infeasibility.
Considering all these scenarios, it is difficult to come up with general
cross-over and mutation operators that will work across various discrete
optimization problems, and it is common to design problem-specific
operators. In EvoTorch, it is possible to define custom operators and
use them with GeneticAlgorithm, which is required when using
GeneticAlgorithm on a problem with a non-float dtype.
As an example, let us consider the following discrete optimization
problem:
```python
def f(x: torch.Tensor) -> torch.Tensor:
return torch.sum(x)
problem = Problem(
"min",
f,
bounds=(-10, 10),
solution_length=10,
dtype=torch.int64,
)
```
Although EvoTorch does provide a very simple and generic (usable with float
and int dtypes) cross-over named
[OnePointCrossOver][evotorch.operators.real.OnePointCrossOver]
(a cross-over which randomly decides a cutting point for each pair of
parents, cuts them from those points and recombines them), it can be
desirable and necessary to implement a custom cross-over operator.
One can inherit from [CrossOver][evotorch.operators.base.CrossOver] to
define a custom cross-over operator, as shown below:
```python
from evotorch import SolutionBatch
from evotorch.operators import CrossOver
class CustomCrossOver(CrossOver):
def _do_cross_over(
self,
parents1: torch.Tensor,
parents2: torch.Tensor,
) -> SolutionBatch:
# parents1 is a tensor storing the decision values of the first
# half of the chosen parents.
# parents2 is a tensor storing the decision values of the second
# half of the chosen parents.
# We expect that the lengths of parents1 and parents2 are equal.
assert len(parents1) == len(parents2)
# Allocate an empty SolutionBatch that will store the children
childpop = SolutionBatch(self.problem, popsize=num_parents, empty=True)
# Gain access to the decision values tensor of the newly allocated
# childpop
childpop_values = childpop.access_values()
# Here we somehow fill `childpop_values` by recombining the parents.
# The most common thing to do is to produce two children by
# combining parents1[0] and parents2[0], to produce the next two
# children parents1[1] and parents2[1], and so on.
childpop_values[:] = ...
# Return the child population
return childpop
```
One can define a custom mutation operator by inheriting from
[Operator][evotorch.operators.base.Operator], as shown below:
```python
class CustomMutation(Operator):
def _do(self, solutions: SolutionBatch):
# Get the decision values tensor of the solutions
sln_values = solutions.access_values()
# do in-place modifications to the decision values
sln_values[:] = ...
```
Alternatively, you could define the mutation operator as a function:
```python
def my_mutation_function(original_values: torch.Tensor) -> torch.Tensor:
# Somehow produce mutated copies of the original values
mutated_values = ...
# Return the mutated values
return mutated_values
```
With these defined operators, we are now ready to instantiate our
GeneticAlgorithm:
```python
ga = GeneticAlgorithm(
problem,
popsize=100,
operators=[
CustomCrossOver(problem, tournament_size=4),
CustomMutation(problem),
# -- or, if you chose to define the mutation as a function: --
# my_mutation_function,
],
)
```
**Non-numeric or variable-length solutions.**
GeneticAlgorithm can also work on problems whose `dtype` is declared
as `object`, where `dtype=object` means that a solution's value(s) can be
expressed via a tensor, a numpy array, a scalar, a tuple, a list, a
dictionary.
Like in the previously discussed case (where dtype is an integer type),
one has to define custom operators when working on problems with
`dtype=object`. A custom cross-over definition specialized for
`dtype=object` looks like this:
```python
from evotorch.tools import ObjectArray
class CrossOverForObjectDType(CrossOver):
def _do_cross_over(
self,
parents1: ObjectArray,
parents2: ObjectArray,
) -> SolutionBatch:
# Allocate an empty SolutionBatch that will store the children
childpop = SolutionBatch(self.problem, popsize=num_parents, empty=True)
# Gain access to the decision values ObjectArray of the newly allocated
# childpop
childpop_values = childpop.access_values()
# Here we somehow fill `childpop_values` by recombining the parents.
# The most common thing to do is to produce two children by
# combining parents1[0] and parents2[0], to produce the next two
# children parents1[1] and parents2[1], and so on.
childpop_values[:] = ...
# Return the child population
return childpop
```
A custom mutation operator specialized for `dtype=object` looks like this:
```python
class MutationForObjectDType(Operator):
def _do(self, solutions: SolutionBatch):
# Get the decision values ObjectArray of the solutions
sln_values = solutions.access_values()
# do in-place modifications to the decision values
sln_values[:] = ...
```
A custom mutation function specialized for `dtype=object` looks like this:
```python
def mutation_for_object_dtype(original_values: ObjectArray) -> ObjectArray:
# Somehow produce mutated copies of the original values
mutated_values = ...
# Return the mutated values
return mutated_values
```
With these operators defined, one can instantiate the GeneticAlgorithm:
```python
ga = GeneticAlgorithm(
problem_with_object_dtype,
popsize=100,
operators=[
CrossOverForObjectDType(problem_with_object_dtype, tournament_size=4),
MutationForObjectDType(problem_with_object_dtype),
# -- or, if you chose to define the mutation as a function: --
# mutation_for_object_dtype,
],
)
```
**Multiple objectives.**
GeneticAlgorithm can work on problems with multiple objectives.
When there are multiple objectives, GeneticAlgorithm will compare the
solutions according to their pareto-ranks and their crowding distances,
like done by the NSGA-II algorithm (Deb, 2002).
References:
Sean Luke, 2013, Essentials of Metaheuristics, Lulu, second edition
available for free at http://cs.gmu.edu/~sean/book/metaheuristics/
Kalyanmoy Deb, Amrit Pratap, Sameer Agarwal, T. Meyarivan (2002).
A Fast and Elitist Multiobjective Genetic Algorithm: NSGA-II.
"""
def __init__(
self,
problem: Problem,
*,
operators: Iterable,
popsize: int,
elitist: bool = True,
re_evaluate: bool = True,
re_evaluate_parents_first: Optional[bool] = None,
_allow_empty_operator_list: bool = False,
):
"""
`__init__(...)`: Initialize the GeneticAlgorithm.
Args:
problem: The problem to optimize.
operators: Operators to be used by the genetic algorithm.
Expected as an iterable, such as a list or a tuple.
Each item within this iterable object is expected either
as an instance of [Operator][evotorch.operators.base.Operator],
or as a function which receives the decision values of
multiple solutions in a PyTorch tensor (or in an
[ObjectArray][evotorch.tools.objectarray.ObjectArray]
for when dtype is `object`) and returns a modified copy.
popsize: Population size.
elitist: Whether or not this genetic algorithm will behave in an
elitist manner. This argument controls how the genetic
algorithm will form the next generation from the parents
and the children. In elitist mode (i.e. with `elitist=True`),
the procedure to be followed by this genetic algorithm is:
(i) form an extended population which consists of
both the parents and the children,
(ii) sort the extended population from best to worst,
(iii) select the best `n` solutions as the new generation where
`n` is `popsize`.
In non-elitist mode (i.e. with `elitist=False`), the worst `m`
solutions within the parent population are replaced with
the children, `m` being the number of produced children.
re_evaluate: Whether or not to evaluate the solutions
that were already evaluated in the previous generations.
By default, this is set as True.
The reason behind this default setting is that,
in problems where the evaluation procedure is noisy,
by re-evaluating the already-evaluated solutions,
we prevent the bad solutions that were luckily evaluated
from hanging onto the population.
Instead, at every generation, each solution must go through
the evaluation procedure again and prove their worth.
For problems whose evaluation procedures are NOT noisy,
the user might consider turning re_evaluate to False
for saving computational cycles.
re_evaluate_parents_first: This is to be specified only when
`re_evaluate` is True (otherwise to be left as None).
If this is given as True, then it will be assumed that the
provided operators require the parents to be evaluated.
If this is given as False, then it will be assumed that the
provided operators work without looking at the parents'
fitnesses (in which case both parents and children can be
evaluated in a single vectorized computation cycle).
If this is left as None, then whether or not the operators
need to know the parent evaluations will be determined
automatically as follows:
if the operators contain at least one cross-over operator
then `re_evaluate_parents_first` will be internally set as
True; otherwise `re_evaluate_parents_first` will be internally
set as False.
"""
SearchAlgorithm.__init__(self, problem)
self._popsize = int(popsize)
self._elitist: bool = bool(elitist)
self._population = problem.generate_batch(self._popsize)
ExtendedPopulationMixin.__init__(
self,
re_evaluate=re_evaluate,
re_evaluate_parents_first=re_evaluate_parents_first,
operators=operators,
allow_empty_operators_list=_allow_empty_operator_list,
)
SinglePopulationAlgorithmMixin.__init__(self)
@property
def population(self) -> SolutionBatch:
"""Get the population"""
return self._population
def _step(self):
# Get the population size
popsize = self._popsize
if self._elitist:
# This is where we handle the elitist mode.
# Produce and get an extended population in a single SolutionBatch
extended_population = self._make_extended_population(split=False)
# From the extended population, take the best n solutions, n being the popsize.
self._population = extended_population.take_best(popsize)
else:
# This is where we handle the non-elitist mode.
# Take the parent solutions (ensured to be evaluated) and the children separately.
parents, children = self._make_extended_population(split=True)
# Get the number of children
num_children = len(children)
if num_children < popsize:
# If the number of children is less than the population size, then we keep the best m solutions from
# the parents, m being `popsize - num_children`.
chosen_parents = self._population.take_best(popsize - num_children)
# Combine the children with the chosen parents, and declare them as the new population.
self._population = SolutionBatch.cat([chosen_parents, children])
elif num_children == popsize:
# If the number of children is the same with the population size, then these children are declared as
# the new population.
self._population = children
else:
# If the number of children is more than the population size, then we take the best n solutions from
# these children, n being the population size. These chosen children are then declared as the new
# population.
self._population = children.take_best(self._popsize)
population: SolutionBatch
property
readonly
¶
Get the population
__init__(self, problem, *, operators, popsize, elitist=True, re_evaluate=True, re_evaluate_parents_first=None, _allow_empty_operator_list=False)
special
¶
__init__(...)
: Initialize the GeneticAlgorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem to optimize. |
required |
operators |
Iterable |
Operators to be used by the genetic algorithm.
Expected as an iterable, such as a list or a tuple.
Each item within this iterable object is expected either
as an instance of Operator,
or as a function which receives the decision values of
multiple solutions in a PyTorch tensor (or in an
ObjectArray
for when dtype is |
required |
popsize |
int |
Population size. |
required |
elitist |
bool |
Whether or not this genetic algorithm will behave in an
elitist manner. This argument controls how the genetic
algorithm will form the next generation from the parents
and the children. In elitist mode (i.e. with |
True |
re_evaluate |
bool |
Whether or not to evaluate the solutions that were already evaluated in the previous generations. By default, this is set as True. The reason behind this default setting is that, in problems where the evaluation procedure is noisy, by re-evaluating the already-evaluated solutions, we prevent the bad solutions that were luckily evaluated from hanging onto the population. Instead, at every generation, each solution must go through the evaluation procedure again and prove their worth. For problems whose evaluation procedures are NOT noisy, the user might consider turning re_evaluate to False for saving computational cycles. |
True |
re_evaluate_parents_first |
Optional[bool] |
This is to be specified only when
|
None |
Source code in evotorch/algorithms/ga.py
def __init__(
self,
problem: Problem,
*,
operators: Iterable,
popsize: int,
elitist: bool = True,
re_evaluate: bool = True,
re_evaluate_parents_first: Optional[bool] = None,
_allow_empty_operator_list: bool = False,
):
"""
`__init__(...)`: Initialize the GeneticAlgorithm.
Args:
problem: The problem to optimize.
operators: Operators to be used by the genetic algorithm.
Expected as an iterable, such as a list or a tuple.
Each item within this iterable object is expected either
as an instance of [Operator][evotorch.operators.base.Operator],
or as a function which receives the decision values of
multiple solutions in a PyTorch tensor (or in an
[ObjectArray][evotorch.tools.objectarray.ObjectArray]
for when dtype is `object`) and returns a modified copy.
popsize: Population size.
elitist: Whether or not this genetic algorithm will behave in an
elitist manner. This argument controls how the genetic
algorithm will form the next generation from the parents
and the children. In elitist mode (i.e. with `elitist=True`),
the procedure to be followed by this genetic algorithm is:
(i) form an extended population which consists of
both the parents and the children,
(ii) sort the extended population from best to worst,
(iii) select the best `n` solutions as the new generation where
`n` is `popsize`.
In non-elitist mode (i.e. with `elitist=False`), the worst `m`
solutions within the parent population are replaced with
the children, `m` being the number of produced children.
re_evaluate: Whether or not to evaluate the solutions
that were already evaluated in the previous generations.
By default, this is set as True.
The reason behind this default setting is that,
in problems where the evaluation procedure is noisy,
by re-evaluating the already-evaluated solutions,
we prevent the bad solutions that were luckily evaluated
from hanging onto the population.
Instead, at every generation, each solution must go through
the evaluation procedure again and prove their worth.
For problems whose evaluation procedures are NOT noisy,
the user might consider turning re_evaluate to False
for saving computational cycles.
re_evaluate_parents_first: This is to be specified only when
`re_evaluate` is True (otherwise to be left as None).
If this is given as True, then it will be assumed that the
provided operators require the parents to be evaluated.
If this is given as False, then it will be assumed that the
provided operators work without looking at the parents'
fitnesses (in which case both parents and children can be
evaluated in a single vectorized computation cycle).
If this is left as None, then whether or not the operators
need to know the parent evaluations will be determined
automatically as follows:
if the operators contain at least one cross-over operator
then `re_evaluate_parents_first` will be internally set as
True; otherwise `re_evaluate_parents_first` will be internally
set as False.
"""
SearchAlgorithm.__init__(self, problem)
self._popsize = int(popsize)
self._elitist: bool = bool(elitist)
self._population = problem.generate_batch(self._popsize)
ExtendedPopulationMixin.__init__(
self,
re_evaluate=re_evaluate,
re_evaluate_parents_first=re_evaluate_parents_first,
operators=operators,
allow_empty_operators_list=_allow_empty_operator_list,
)
SinglePopulationAlgorithmMixin.__init__(self)
SteadyStateGA (GeneticAlgorithm)
¶
Thin wrapper around GeneticAlgorithm for compatibility with old code.
This SteadyStateGA
class is equivalent to
GeneticAlgorithm except that
SteadyStateGA
provides an additional method named use(...)
for
specifying a cross-over and/or a mutation operator.
The method use(...)
exists only for API compatibility with the previous
versions of EvoTorch. It is recommended to specify the operators via
the keyword argument operators
instead.
Source code in evotorch/algorithms/ga.py
class SteadyStateGA(GeneticAlgorithm):
"""
Thin wrapper around GeneticAlgorithm for compatibility with old code.
This `SteadyStateGA` class is equivalent to
[GeneticAlgorithm][evotorch.algorithms.ga.GeneticAlgorithm] except that
`SteadyStateGA` provides an additional method named `use(...)` for
specifying a cross-over and/or a mutation operator.
The method `use(...)` exists only for API compatibility with the previous
versions of EvoTorch. It is recommended to specify the operators via
the keyword argument `operators` instead.
"""
def __init__(
self,
problem: Problem,
*,
popsize: int,
operators: Optional[Iterable] = None,
elitist: bool = True,
re_evaluate: bool = True,
re_evaluate_parents_first: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the SteadyStateGA.
Args:
problem: The problem to optimize.
operators: Optionally, an iterable of operators to be used by the
genetic algorithm. Each item within this iterable object is
expected either as an instance of
[Operator][evotorch.operators.base.Operator],
or as a function which receives the decision values of
multiple solutions in a PyTorch tensor (or in an
[ObjectArray][evotorch.tools.objectarray.ObjectArray]
for when dtype is `object`) and returns a modified copy.
If this is omitted, then it will be required to specify the
operators via the `use(...)` method.
popsize: Population size.
elitist: Whether or not this genetic algorithm will behave in an
elitist manner. This argument controls how the genetic
algorithm will form the next generation from the parents
and the children. In elitist mode (i.e. with `elitist=True`),
the procedure to be followed by this genetic algorithm is:
(i) form an extended population which consists of
both the parents and the children,
(ii) sort the extended population from best to worst,
(iii) select the best `n` solutions as the new generation where
`n` is `popsize`.
In non-elitist mode (i.e. with `elitist=False`), the worst `m`
solutions within the parent population are replaced with
the children, `m` being the number of produced children.
re_evaluate: Whether or not to evaluate the solutions
that were already evaluated in the previous generations.
By default, this is set as True.
The reason behind this default setting is that,
in problems where the evaluation procedure is noisy,
by re-evaluating the already-evaluated solutions,
we prevent the bad solutions that were luckily evaluated
from hanging onto the population.
Instead, at every generation, each solution must go through
the evaluation procedure again and prove their worth.
For problems whose evaluation procedures are NOT noisy,
the user might consider turning re_evaluate to False
for saving computational cycles.
re_evaluate_parents_first: This is to be specified only when
`re_evaluate` is True (otherwise to be left as None).
If this is given as True, then it will be assumed that the
provided operators require the parents to be evaluated.
If this is given as False, then it will be assumed that the
provided operators work without looking at the parents'
fitnesses (in which case both parents and children can be
evaluated in a single vectorized computation cycle).
If this is left as None, then whether or not the operators
need to know the parent evaluations will be determined
automatically as follows:
if the operators contain at least one cross-over operator
then `re_evaluate_parents_first` will be internally set as
True; otherwise `re_evaluate_parents_first` will be internally
set as False.
Additional note specific to `SteadyStateGA`: if the argument
`operators` is not given (or is given as an empty list), and
also `re_evaluate_parents_first` is left as None, then
`SteadyStateGA` will assume that the operators will be later
given via the `use(...)` method, and that these operators will
require the parents to be evaluated first (equivalent to
setting `re_evaluate_parents_first` as True).
"""
if operators is None:
operators = []
self._cross_over_op: Optional[Callable] = None
self._mutation_op: Optional[Callable] = None
self._forbid_use_method: bool = False
self._prepare_ops: bool = False
if (len(operators) == 0) and re_evaluate and (re_evaluate_parents_first is None):
re_evaluate_parents_first = True
super().__init__(
problem,
operators=operators,
popsize=popsize,
elitist=elitist,
re_evaluate=re_evaluate,
re_evaluate_parents_first=re_evaluate_parents_first,
_allow_empty_operator_list=True,
)
def use(self, operator: Callable):
"""
Specify the cross-over or the mutation operator to use.
This method exists for compatibility with previous EvoTorch code.
Instead of using this method, it is recommended to specify the
operators via the `operators` keyword argument while initializing
this class.
Using this method, one can specify one cross-over operator and one
mutation operator that will be used during the evolutionary search.
Specifying multiple cross-over operators or multiple mutation operators
is not allowed. When the cross-over and mutation operators are
specified via `use(...)`, the order of execution will always be
arranged such that the cross-over comes first and the mutation comes
comes second. If desired, one can specify only the cross-over operator
or only the mutation operator.
Please note that the `operators` keyword argument works differently,
and offers more flexibility for defining the procedure to follow at
each generation. In more details, the `operators` keyword argument
allows one to specify multiple cross-over and/or multiple mutation
operators, and those operators will be executed in the specified
order.
Args:
operator: The operator to be registered to SteadyStateGA.
If the specified operator is cross-over (i.e. an instance
of [CrossOver][evotorch.operators.base.CrossOver]),
then this operator will be registered for the cross-over
phase. If the specified operator is an operator that is
not of the cross-over type (i.e. any instance of
[Operator][evotorch.operators.base.Operator] that is not
[CrossOver][evotorch.operators.base.CrossOver]) or if it is
just a function which receives the decision values as a PyTorch
tensor (or, in the case where `dtype` of the problem is
`object` as an instance of
[ObjectArray][evotorch.tools.objectarray.ObjectArray]) and
returns a modified copy, then that operator will be registered
for the mutation phase of the genetic algorithm.
"""
if self._forbid_use_method:
raise RuntimeError(
"The method `use(...)` cannot be called anymore, because the evolutionary search has started."
)
if len(self._operators) > 0:
raise RuntimeError(
f"The method `use(...)` cannot be called"
f" because an operator list was provided while initializing this {type(self).__name__} instance."
)
if isinstance(operator, CrossOver):
if self._cross_over_op is not None:
raise ValueError(
f"The method `use(...)` received this cross-over operator as its argument:"
f" {operator} (of type {type(operator)})."
f" However, a cross-over operator was already set:"
f" {self._cross_over_op} (of type {type(self._cross_over_op)})."
)
self._cross_over_op = operator
self._prepare_ops = True
else:
if self._mutation_op is not None:
raise ValueError(
f"The method `use(...)` received this mutation operator as its argument:"
f" {operator} (of type {type(operator)})."
f" However, a mutation operator was already set:"
f" {self._mutation_op} (of type {type(self._mutation_op)})."
)
self._mutation_op = operator
self._prepare_ops = True
def _step(self):
self._forbid_use_method = True
if self._prepare_ops:
self._prepare_ops = False
if self._cross_over_op is not None:
self._operators.append(self._cross_over_op)
if self._mutation_op is not None:
self._operators.append(self._mutation_op)
else:
if len(self._operators) == 0:
raise RuntimeError(
f"This {type(self).__name__} instance does not know how to proceed, "
f" because neither the `operators` keyword argument was used during initialization"
f" nor was the `use(...)` method called later."
)
super()._step()
__init__(self, problem, *, popsize, operators=None, elitist=True, re_evaluate=True, re_evaluate_parents_first=None)
special
¶
__init__(...)
: Initialize the SteadyStateGA.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem to optimize. |
required |
operators |
Optional[Iterable] |
Optionally, an iterable of operators to be used by the
genetic algorithm. Each item within this iterable object is
expected either as an instance of
Operator,
or as a function which receives the decision values of
multiple solutions in a PyTorch tensor (or in an
ObjectArray
for when dtype is |
None |
popsize |
int |
Population size. |
required |
elitist |
bool |
Whether or not this genetic algorithm will behave in an
elitist manner. This argument controls how the genetic
algorithm will form the next generation from the parents
and the children. In elitist mode (i.e. with |
True |
re_evaluate |
bool |
Whether or not to evaluate the solutions that were already evaluated in the previous generations. By default, this is set as True. The reason behind this default setting is that, in problems where the evaluation procedure is noisy, by re-evaluating the already-evaluated solutions, we prevent the bad solutions that were luckily evaluated from hanging onto the population. Instead, at every generation, each solution must go through the evaluation procedure again and prove their worth. For problems whose evaluation procedures are NOT noisy, the user might consider turning re_evaluate to False for saving computational cycles. |
True |
re_evaluate_parents_first |
Optional[bool] |
This is to be specified only when
|
None |
Source code in evotorch/algorithms/ga.py
def __init__(
self,
problem: Problem,
*,
popsize: int,
operators: Optional[Iterable] = None,
elitist: bool = True,
re_evaluate: bool = True,
re_evaluate_parents_first: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the SteadyStateGA.
Args:
problem: The problem to optimize.
operators: Optionally, an iterable of operators to be used by the
genetic algorithm. Each item within this iterable object is
expected either as an instance of
[Operator][evotorch.operators.base.Operator],
or as a function which receives the decision values of
multiple solutions in a PyTorch tensor (or in an
[ObjectArray][evotorch.tools.objectarray.ObjectArray]
for when dtype is `object`) and returns a modified copy.
If this is omitted, then it will be required to specify the
operators via the `use(...)` method.
popsize: Population size.
elitist: Whether or not this genetic algorithm will behave in an
elitist manner. This argument controls how the genetic
algorithm will form the next generation from the parents
and the children. In elitist mode (i.e. with `elitist=True`),
the procedure to be followed by this genetic algorithm is:
(i) form an extended population which consists of
both the parents and the children,
(ii) sort the extended population from best to worst,
(iii) select the best `n` solutions as the new generation where
`n` is `popsize`.
In non-elitist mode (i.e. with `elitist=False`), the worst `m`
solutions within the parent population are replaced with
the children, `m` being the number of produced children.
re_evaluate: Whether or not to evaluate the solutions
that were already evaluated in the previous generations.
By default, this is set as True.
The reason behind this default setting is that,
in problems where the evaluation procedure is noisy,
by re-evaluating the already-evaluated solutions,
we prevent the bad solutions that were luckily evaluated
from hanging onto the population.
Instead, at every generation, each solution must go through
the evaluation procedure again and prove their worth.
For problems whose evaluation procedures are NOT noisy,
the user might consider turning re_evaluate to False
for saving computational cycles.
re_evaluate_parents_first: This is to be specified only when
`re_evaluate` is True (otherwise to be left as None).
If this is given as True, then it will be assumed that the
provided operators require the parents to be evaluated.
If this is given as False, then it will be assumed that the
provided operators work without looking at the parents'
fitnesses (in which case both parents and children can be
evaluated in a single vectorized computation cycle).
If this is left as None, then whether or not the operators
need to know the parent evaluations will be determined
automatically as follows:
if the operators contain at least one cross-over operator
then `re_evaluate_parents_first` will be internally set as
True; otherwise `re_evaluate_parents_first` will be internally
set as False.
Additional note specific to `SteadyStateGA`: if the argument
`operators` is not given (or is given as an empty list), and
also `re_evaluate_parents_first` is left as None, then
`SteadyStateGA` will assume that the operators will be later
given via the `use(...)` method, and that these operators will
require the parents to be evaluated first (equivalent to
setting `re_evaluate_parents_first` as True).
"""
if operators is None:
operators = []
self._cross_over_op: Optional[Callable] = None
self._mutation_op: Optional[Callable] = None
self._forbid_use_method: bool = False
self._prepare_ops: bool = False
if (len(operators) == 0) and re_evaluate and (re_evaluate_parents_first is None):
re_evaluate_parents_first = True
super().__init__(
problem,
operators=operators,
popsize=popsize,
elitist=elitist,
re_evaluate=re_evaluate,
re_evaluate_parents_first=re_evaluate_parents_first,
_allow_empty_operator_list=True,
)
use(self, operator)
¶
Specify the cross-over or the mutation operator to use.
This method exists for compatibility with previous EvoTorch code.
Instead of using this method, it is recommended to specify the
operators via the operators
keyword argument while initializing
this class.
Using this method, one can specify one cross-over operator and one
mutation operator that will be used during the evolutionary search.
Specifying multiple cross-over operators or multiple mutation operators
is not allowed. When the cross-over and mutation operators are
specified via use(...)
, the order of execution will always be
arranged such that the cross-over comes first and the mutation comes
comes second. If desired, one can specify only the cross-over operator
or only the mutation operator.
Please note that the operators
keyword argument works differently,
and offers more flexibility for defining the procedure to follow at
each generation. In more details, the operators
keyword argument
allows one to specify multiple cross-over and/or multiple mutation
operators, and those operators will be executed in the specified
order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
operator |
Callable |
The operator to be registered to SteadyStateGA.
If the specified operator is cross-over (i.e. an instance
of CrossOver),
then this operator will be registered for the cross-over
phase. If the specified operator is an operator that is
not of the cross-over type (i.e. any instance of
Operator that is not
CrossOver) or if it is
just a function which receives the decision values as a PyTorch
tensor (or, in the case where |
required |
Source code in evotorch/algorithms/ga.py
def use(self, operator: Callable):
"""
Specify the cross-over or the mutation operator to use.
This method exists for compatibility with previous EvoTorch code.
Instead of using this method, it is recommended to specify the
operators via the `operators` keyword argument while initializing
this class.
Using this method, one can specify one cross-over operator and one
mutation operator that will be used during the evolutionary search.
Specifying multiple cross-over operators or multiple mutation operators
is not allowed. When the cross-over and mutation operators are
specified via `use(...)`, the order of execution will always be
arranged such that the cross-over comes first and the mutation comes
comes second. If desired, one can specify only the cross-over operator
or only the mutation operator.
Please note that the `operators` keyword argument works differently,
and offers more flexibility for defining the procedure to follow at
each generation. In more details, the `operators` keyword argument
allows one to specify multiple cross-over and/or multiple mutation
operators, and those operators will be executed in the specified
order.
Args:
operator: The operator to be registered to SteadyStateGA.
If the specified operator is cross-over (i.e. an instance
of [CrossOver][evotorch.operators.base.CrossOver]),
then this operator will be registered for the cross-over
phase. If the specified operator is an operator that is
not of the cross-over type (i.e. any instance of
[Operator][evotorch.operators.base.Operator] that is not
[CrossOver][evotorch.operators.base.CrossOver]) or if it is
just a function which receives the decision values as a PyTorch
tensor (or, in the case where `dtype` of the problem is
`object` as an instance of
[ObjectArray][evotorch.tools.objectarray.ObjectArray]) and
returns a modified copy, then that operator will be registered
for the mutation phase of the genetic algorithm.
"""
if self._forbid_use_method:
raise RuntimeError(
"The method `use(...)` cannot be called anymore, because the evolutionary search has started."
)
if len(self._operators) > 0:
raise RuntimeError(
f"The method `use(...)` cannot be called"
f" because an operator list was provided while initializing this {type(self).__name__} instance."
)
if isinstance(operator, CrossOver):
if self._cross_over_op is not None:
raise ValueError(
f"The method `use(...)` received this cross-over operator as its argument:"
f" {operator} (of type {type(operator)})."
f" However, a cross-over operator was already set:"
f" {self._cross_over_op} (of type {type(self._cross_over_op)})."
)
self._cross_over_op = operator
self._prepare_ops = True
else:
if self._mutation_op is not None:
raise ValueError(
f"The method `use(...)` received this mutation operator as its argument:"
f" {operator} (of type {type(operator)})."
f" However, a mutation operator was already set:"
f" {self._mutation_op} (of type {type(self._mutation_op)})."
)
self._mutation_op = operator
self._prepare_ops = True
mapelites
¶
MAPElites (SearchAlgorithm, SinglePopulationAlgorithmMixin, ExtendedPopulationMixin)
¶
Implementation of the MAPElites algorithm.
In MAPElites, we deal with optimization problems where, in addition to the fitness, there are additional evaluation data ("features") that are computed during the phase of evaluation. To ensure a diversity of the solutions, the population is organized into cells of features.
Reference:
Jean-Baptiste Mouret and Jeff Clune (2015).
Illuminating search spaces by mapping elites.
arXiv preprint arXiv:1504.04909 (2015).
As an example, let us imagine that our problem has two features.
Let us call these features feat0
and feat1
.
Let us also imagine that we wish to organize feat0
according to
the boundaries [(-inf, 0), (0, 10), (10, 20), (20, +inf)]
and feat1
according to the boundaries [(-inf, 0), (0, 50), (50, +inf)]
.
Our population gets organized into:
+inf
^
|
f | | | |
e | pop[0] | pop[1] | pop[ 2] | pop[ 3]
a 50 -|- --------+--------+---------+---------
t | pop[4] | pop[5] | pop[ 6] | pop[ 7]
1 0 -|- --------+--------|---------+---------
| pop[8] | pop[9] | pop[10] | pop[11]
| | | |
<-----------------|--------|---------|----------->
-inf | 0 10 20 +inf
| feat0
|
v
-inf
where pop[i]
is short for population[i]
, that is, the i-th solution
of the population.
Which problems can be solved by MAPElites?
The problems that can be addressed by MAPElites are the problems with
one objective, and with its eval_data_length
(additional evaluation
data length) set as an integer greater than or equal to 1.
For example, let us imagine an optimization problem where we handle
2 features. The evaluation function for such a problem could look like:
def f(x: torch.Tensor) -> torch.Tensor:
# Somehow compute the fitness
fitness = ...
# Somehow compute the value for the first feature
feat0 = ...
# Somehow compute the value for the second feature
feat1 = ...
# Prepare an evaluation result tensor for the solution
eval_result = torch.tensor([fitness, feat0, feat1], device=x.device)
# Here, we return the eval_result.
# Notice that `eval_result` is a 1-dimensional tensor of length 3,
# where the item with index 0 is the fitness, and the items with
# indices 1 and 2 represent the two features of the solution.
# Please also note that, in vectorized mode, we would receive `n`
# solutions, and the evaluation result tensor would have to be of shape
# (n, 3).
return eval_result
The problem definition then would look like this:
from evotorch import Problem
problem = Problem(
"min",
f,
initial_bounds=(..., ...),
solution_length=...,
eval_data_length=2, # we have 2 features
)
Using MAPElites.
Let us continue using the example problem
shown above, where we have
two features.
The first step towards configuring MAPElites is to come up with a
hypergrid tensor, from in the lower and upper bound for each
feature on each cell will be expressed. The hypergrid tensor is structured
like this:
hypergrid = torch.tensor(
[
[
[
feat0_lower_bound_for_cell0,
feat0_upper_bound_for_cell0,
],
[
feat1_lower_bound_for_cell0,
feat1_upper_bound_for_cell0,
],
],
[
[
feat0_lower_bound_for_cell1,
feat0_upper_bound_for_cell1,
],
[
feat1_lower_bound_for_cell1,
feat1_upper_bound_for_cell1,
],
],
[
[
feat0_lower_bound_for_cell2,
feat0_upper_bound_for_cell2,
],
[
feat1_lower_bound_for_cell2,
feat1_upper_bound_for_cell2,
],
],
...,
],
dtype=problem.eval_dtype,
device=problem.device,
)
that is, the item with index i,j,0
represents the lower bound for the
j-th feature in i-th cell, and the item with index i,j,1
represents the
upper bound for the j-th feature in i-th cell.
Specifying lower and upper bounds for each feature and for each cell can
be tedious. MAPElites provides a static helper function named
make_feature_grid
which asks for how many bins are desired for each feature, and then
produces a hypergrid tensor. For example, if we want 10 bins for feature
feat0
and 5 bins for feature feat1
, then, we could do:
hypergrid = MAPElites.make_feature_grid(
lower_bounds=[
global_lower_bound_for_feat0,
global_lower_bound_for_feat1,
],
upper_bounds=[
global_upper_bound_for_feat0,
global_upper_bound_for_feat1,
],
num_bins=[10, 5],
dtype=problem.eval_dtype,
device=problem.device,
)
Now that hypergrid
is prepared, one can instantiate MAPElites
like
this:
searcher = MAPElites(
problem,
operators=[...], # list of operators like in GeneticAlgorithm
feature_grid=hypergrid,
)
where the keyword argument operators
is a list that contains functions
or instances of Operator, like expected
by GeneticAlgorithm.
Once MAPElites
is instantiated, it can be run like most of the search
algorithm implementations provided by EvoTorch, as shown below:
from evotorch.logging import StdOutLogger
_ = StdOutLogger(ga) # Report the evolution's progress to standard output
searcher.run(100) # Run MAPElites for 100 generations
print(dict(searcher.status)) # Show the final status dictionary
Vectorization capabilities. According to the basic definition of the MAPElites algorithm, a cell is first chosen, then mutated, and then the mutated solution is placed back into the most suitable cell (if the cell is not filled yet or if the fitness of the newly mutated solution is better than the existing solution in that cell). When vectorization, and especially GPU-based parallelization is available, picking and mutating solutions one by one can be wasteful in terms of performance. Therefore, this MAPElites implementation mutates the entire population, evaluates all of the mutated solutions, and places all of them back into the most suitable cells, all in such a way that the vectorization and/or GPU-based parallelization can be exploited.
Source code in evotorch/algorithms/mapelites.py
class MAPElites(SearchAlgorithm, SinglePopulationAlgorithmMixin, ExtendedPopulationMixin):
"""
Implementation of the MAPElites algorithm.
In MAPElites, we deal with optimization problems where, in addition
to the fitness, there are additional evaluation data ("features") that
are computed during the phase of evaluation. To ensure a diversity
of the solutions, the population is organized into cells of features.
Reference:
Jean-Baptiste Mouret and Jeff Clune (2015).
Illuminating search spaces by mapping elites.
arXiv preprint arXiv:1504.04909 (2015).
As an example, let us imagine that our problem has two features.
Let us call these features `feat0` and `feat1`.
Let us also imagine that we wish to organize `feat0` according to
the boundaries `[(-inf, 0), (0, 10), (10, 20), (20, +inf)]` and `feat1`
according to the boundaries `[(-inf, 0), (0, 50), (50, +inf)]`.
Our population gets organized into:
```text
+inf
^
|
f | | | |
e | pop[0] | pop[1] | pop[ 2] | pop[ 3]
a 50 -|- --------+--------+---------+---------
t | pop[4] | pop[5] | pop[ 6] | pop[ 7]
1 0 -|- --------+--------|---------+---------
| pop[8] | pop[9] | pop[10] | pop[11]
| | | |
<-----------------|--------|---------|----------->
-inf | 0 10 20 +inf
| feat0
|
v
-inf
```
where `pop[i]` is short for `population[i]`, that is, the i-th solution
of the population.
**Which problems can be solved by MAPElites?**
The problems that can be addressed by MAPElites are the problems with
one objective, and with its `eval_data_length` (additional evaluation
data length) set as an integer greater than or equal to 1.
For example, let us imagine an optimization problem where we handle
2 features. The evaluation function for such a problem could look like:
```python
def f(x: torch.Tensor) -> torch.Tensor:
# Somehow compute the fitness
fitness = ...
# Somehow compute the value for the first feature
feat0 = ...
# Somehow compute the value for the second feature
feat1 = ...
# Prepare an evaluation result tensor for the solution
eval_result = torch.tensor([fitness, feat0, feat1], device=x.device)
# Here, we return the eval_result.
# Notice that `eval_result` is a 1-dimensional tensor of length 3,
# where the item with index 0 is the fitness, and the items with
# indices 1 and 2 represent the two features of the solution.
# Please also note that, in vectorized mode, we would receive `n`
# solutions, and the evaluation result tensor would have to be of shape
# (n, 3).
return eval_result
```
The problem definition then would look like this:
```python
from evotorch import Problem
problem = Problem(
"min",
f,
initial_bounds=(..., ...),
solution_length=...,
eval_data_length=2, # we have 2 features
)
```
**Using MAPElites.**
Let us continue using the example `problem` shown above, where we have
two features.
The first step towards configuring MAPElites is to come up with a
hypergrid tensor, from in the lower and upper bound for each
feature on each cell will be expressed. The hypergrid tensor is structured
like this:
```python
hypergrid = torch.tensor(
[
[
[
feat0_lower_bound_for_cell0,
feat0_upper_bound_for_cell0,
],
[
feat1_lower_bound_for_cell0,
feat1_upper_bound_for_cell0,
],
],
[
[
feat0_lower_bound_for_cell1,
feat0_upper_bound_for_cell1,
],
[
feat1_lower_bound_for_cell1,
feat1_upper_bound_for_cell1,
],
],
[
[
feat0_lower_bound_for_cell2,
feat0_upper_bound_for_cell2,
],
[
feat1_lower_bound_for_cell2,
feat1_upper_bound_for_cell2,
],
],
...,
],
dtype=problem.eval_dtype,
device=problem.device,
)
```
that is, the item with index `i,j,0` represents the lower bound for the
j-th feature in i-th cell, and the item with index `i,j,1` represents the
upper bound for the j-th feature in i-th cell.
Specifying lower and upper bounds for each feature and for each cell can
be tedious. MAPElites provides a static helper function named
[make_feature_grid][evotorch.algorithms.mapelites.MAPElites.make_feature_grid]
which asks for how many bins are desired for each feature, and then
produces a hypergrid tensor. For example, if we want 10 bins for feature
`feat0` and 5 bins for feature `feat1`, then, we could do:
```python
hypergrid = MAPElites.make_feature_grid(
lower_bounds=[
global_lower_bound_for_feat0,
global_lower_bound_for_feat1,
],
upper_bounds=[
global_upper_bound_for_feat0,
global_upper_bound_for_feat1,
],
num_bins=[10, 5],
dtype=problem.eval_dtype,
device=problem.device,
)
```
Now that `hypergrid` is prepared, one can instantiate `MAPElites` like
this:
```python
searcher = MAPElites(
problem,
operators=[...], # list of operators like in GeneticAlgorithm
feature_grid=hypergrid,
)
```
where the keyword argument `operators` is a list that contains functions
or instances of [Operator][evotorch.operators.base.Operator], like expected
by [GeneticAlgorithm][evotorch.algorithms.ga.GeneticAlgorithm].
Once `MAPElites` is instantiated, it can be run like most of the search
algorithm implementations provided by EvoTorch, as shown below:
```python
from evotorch.logging import StdOutLogger
_ = StdOutLogger(ga) # Report the evolution's progress to standard output
searcher.run(100) # Run MAPElites for 100 generations
print(dict(searcher.status)) # Show the final status dictionary
```
**Vectorization capabilities.**
According to the basic definition of the MAPElites algorithm, a cell is
first chosen, then mutated, and then the mutated solution is placed back
into the most suitable cell (if the cell is not filled yet or if the
fitness of the newly mutated solution is better than the existing solution
in that cell). When vectorization, and especially GPU-based parallelization
is available, picking and mutating solutions one by one can be wasteful in
terms of performance. Therefore, this MAPElites implementation mutates the
entire population, evaluates all of the mutated solutions, and places all
of them back into the most suitable cells, all in such a way that the
vectorization and/or GPU-based parallelization can be exploited.
"""
def __init__(
self,
problem: Problem,
*,
operators: Iterable,
feature_grid: Iterable,
re_evaluate: bool = True,
re_evaluate_parents_first: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the MAPElites algorithm.
Args:
problem: The problem object to work on. This problem object
is expected to have one objective, and also have its
`eval_data_length` set as an integer that is greater than
or equal to 1.
operators: Operators to be used by the MAPElites algorithm.
Expected as an iterable, such as a list or a tuple.
Each item within this iterable object is expected either
as an instance of [Operator][evotorch.operators.base.Operator],
or as a function which receives the decision values of
multiple solutions in a PyTorch tensor and returns a modified
copy.
re_evaluate: Whether or not to evaluate the solutions
that were already evaluated in the previous generations.
By default, this is set as True.
The reason behind this default setting is that,
in problems where the evaluation procedure is noisy,
by re-evaluating the already-evaluated solutions,
we prevent the bad solutions that were luckily evaluated
from hanging onto the population.
Instead, at every generation, each solution must go through
the evaluation procedure again and prove their worth.
For problems whose evaluation procedures are NOT noisy,
the user might consider turning re_evaluate to False
for saving computational cycles.
re_evaluate_parents_first: This is to be specified only when
`re_evaluate` is True (otherwise to be left as None).
If this is given as True, then it will be assumed that the
provided operators require the parents to be evaluated.
If this is given as False, then it will be assumed that the
provided operators work without looking at the parents'
fitnesses (in which case both parents and children can be
evaluated in a single vectorized computation cycle).
If this is left as None, then whether or not the operators
need to know the parent evaluations will be determined
automatically as follows:
if the operators contain at least one cross-over operator
then `re_evaluate_parents_first` will be internally set as
True; otherwise `re_evaluate_parents_first` will be internally
set as False.
"""
problem.ensure_single_objective()
problem.ensure_numeric()
SearchAlgorithm.__init__(self, problem)
self._feature_grid = problem.as_tensor(feature_grid, use_eval_dtype=True)
self._sense = self._problem.senses[0]
self._popsize = self._feature_grid.shape[0]
self._population = problem.generate_batch(self._popsize)
self._filled = torch.zeros(self._popsize, dtype=torch.bool, device=self._population.device)
ExtendedPopulationMixin.__init__(
self,
re_evaluate=re_evaluate,
re_evaluate_parents_first=re_evaluate_parents_first,
operators=operators,
allow_empty_operators_list=False,
)
SinglePopulationAlgorithmMixin.__init__(self)
@property
def population(self) -> SolutionBatch:
"""
Get the population as a SolutionBatch object
In this MAP-Elites implementation, i-th solution corresponds to the
solution belonging to the i-th cell of the MAP-Elites hypergrid.
If `filled[i]` is True, then this means that the i-th cell is filled,
and therefore `population[i]` will get the solution belonging to the
i-th cell.
"""
return self._population
@property
def filled(self) -> torch.Tensor:
"""
Get a boolean tensor specifying whether or not the i-th cell is filled.
In this MAP-Elites implementation, i-th solution within the population
corresponds to the solution belonging to the i-th cell of the MAP-Elites
hypergrid. If `filled[i]` is True, then the solution stored in the i-th
cell satisfies the feature boundaries imposed by that cell.
If `filled[i]` is False, then the solution stored in the i-th cell
does not satisfy those boundaries, and therefore does not really belong
in that cell.
"""
from ..tools.readonlytensor import as_read_only_tensor
with torch.no_grad():
return as_read_only_tensor(self._filled)
def _step(self):
# Form an extended population from the parents and from the children
extended_population = self._make_extended_population(split=False)
# Get the most suitable solutions for each cell of the hypergrid.
# values[i, :] stores the decision values most suitable for the i-th cell.
# evals[i, :] stores the evaluation results most suitable for the i-th cell.
# if the suggested i-th solution completely satisfies the boundaries imposed by the i-th cell,
# then suitable_mask[i] will be True.
values, evals, suitable = _best_solution_considering_all_features(
self._sense,
extended_population.values.as_subclass(torch.Tensor),
extended_population.evals.as_subclass(torch.Tensor),
self._feature_grid,
)
# Place the most suitable decision values and evaluation results into the current population.
self._population.access_values(keep_evals=True)[:] = values
self._population.access_evals()[:] = evals
# If there was a suitable solution for the i-th cell, fill[i] is to be set as True.
self._filled[:] = suitable
@staticmethod
def make_feature_grid(
lower_bounds: Iterable,
upper_bounds: Iterable,
num_bins: Union[int, torch.Tensor],
*,
device: Optional[Device] = None,
dtype: Optional[DType] = None,
) -> torch.Tensor:
"""
Make a hypergrid for the MAPElites algorithm.
The [MAPElites][evotorch.algorithms.mapelites.MAPElites] organizes its
population not only according to the fitness, but also according to the
additional evaluation data which are interpreted as the additional features
of the solutions. To organize the current population according to these
[MAPElites][evotorch.algorithms.mapelites.MAPElites] requires "cells",
each cell having a lower and an upper bound for each feature.
`make_map_elites_grid(...)` is a helper function which generates the
required hypergrid of features in such a way that each cell, for each
feature, has the same interval.
The result of this function is a PyTorch tensor, which can be passed to
the `feature_grid` keyword argument of
[MAPElites][evotorch.algorithms.mapelites.MAPElites].
Args:
lower_bounds: The lower bounds, as a 1-dimensional sequence of numbers.
The length of this sequence must be equal to the number of
features, and the i-th element must express the lower bound
of the i-th feature.
upper_bounds: The upper bounds, as a 1-dimensional sequence of numbers.
The length of this sequence must be equal to the number of
features, and the i-th element must express the upper bound
of the i-th feature.
num_bins: Can be given as an integer or as a sequence of integers.
If given as an integer `n`, then there will be `n` bins for each
feature on the hypergrid. If given as a sequence of integers,
then the i-th element of the sequence will express the number of
bins for the i-th feature.
Returns:
The hypergrid, as a PyTorch tensor.
"""
cast_args = {}
if device is not None:
cast_args["device"] = torch.device(device)
if dtype is not None:
cast_args["dtype"] = to_torch_dtype(dtype)
has_casting = len(cast_args) > 0
if has_casting:
lower_bounds = torch.as_tensor(lower_bounds, **cast_args)
upper_bounds = torch.as_tensor(upper_bounds, **cast_args)
if (not isinstance(lower_bounds, torch.Tensor)) or (not isinstance(upper_bounds, torch.Tensor)):
raise TypeError(
f"While preparing the map elites hypergrid with device={device} and dtype={dtype},"
f"`lower_bounds` and `upper_bounds` were expected as tensors, but their types are different."
f" The type of `lower_bounds` is {type(lower_bounds)}."
f" The type of `upper_bounds` is {type(upper_bounds)}."
)
if lower_bounds.device != upper_bounds.device:
raise ValueError(
f"Cannot determine on which device to place the map elites grid,"
f" because `lower_bounds` and `upper_bounds` are on different devices."
f" The device of `lower_bounds` is {lower_bounds.device}."
f" The device of `upper_bounds` is {upper_bounds.device}."
)
if lower_bounds.dtype != upper_bounds.dtype:
raise ValueError(
f"Cannot determine the dtype of the map elites grid,"
f" because `lower_bounds` and `upper_bounds` have different dtypes."
f" The dtype of `lower_bounds` is {lower_bounds.dtype}."
f" The dtype of `upper_bounds` is {upper_bounds.dtype}."
)
if lower_bounds.size() != upper_bounds.size():
raise ValueError("`lower_bounds` and `upper_bounds` have incompatible shapes")
if lower_bounds.dim() != 1:
raise ValueError("Only 1D tensors are supported for `lower_bounds` and for `upper_bounds`")
dtype = lower_bounds.dtype
device = lower_bounds.device
num_bins = torch.as_tensor(num_bins, dtype=torch.int64, device=device)
if num_bins.dim() == 0:
num_bins = num_bins.expand(lower_bounds.size())
p_inf = torch.tensor([float("inf")], dtype=dtype, device=device)
n_inf = torch.tensor([float("-inf")], dtype=dtype, device=device)
def _make_feature_grid(lb, ub, num_bins):
sp = torch.linspace(lb, ub, num_bins - 1, device=device)
sp = torch.cat((n_inf, sp, p_inf))
return sp.unfold(dimension=0, size=2, step=1).unsqueeze(1)
f_grids = [_make_feature_grid(*bounds) for bounds in zip(lower_bounds, upper_bounds, num_bins)]
return torch.stack([torch.cat(c) for c in itertools.product(*f_grids)])
filled: Tensor
property
readonly
¶
Get a boolean tensor specifying whether or not the i-th cell is filled.
In this MAP-Elites implementation, i-th solution within the population
corresponds to the solution belonging to the i-th cell of the MAP-Elites
hypergrid. If filled[i]
is True, then the solution stored in the i-th
cell satisfies the feature boundaries imposed by that cell.
If filled[i]
is False, then the solution stored in the i-th cell
does not satisfy those boundaries, and therefore does not really belong
in that cell.
population: SolutionBatch
property
readonly
¶
Get the population as a SolutionBatch object
In this MAP-Elites implementation, i-th solution corresponds to the
solution belonging to the i-th cell of the MAP-Elites hypergrid.
If filled[i]
is True, then this means that the i-th cell is filled,
and therefore population[i]
will get the solution belonging to the
i-th cell.
__init__(self, problem, *, operators, feature_grid, re_evaluate=True, re_evaluate_parents_first=None)
special
¶
__init__(...)
: Initialize the MAPElites algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object to work on. This problem object
is expected to have one objective, and also have its
|
required |
operators |
Iterable |
Operators to be used by the MAPElites algorithm. Expected as an iterable, such as a list or a tuple. Each item within this iterable object is expected either as an instance of Operator, or as a function which receives the decision values of multiple solutions in a PyTorch tensor and returns a modified copy. |
required |
re_evaluate |
bool |
Whether or not to evaluate the solutions that were already evaluated in the previous generations. By default, this is set as True. The reason behind this default setting is that, in problems where the evaluation procedure is noisy, by re-evaluating the already-evaluated solutions, we prevent the bad solutions that were luckily evaluated from hanging onto the population. Instead, at every generation, each solution must go through the evaluation procedure again and prove their worth. For problems whose evaluation procedures are NOT noisy, the user might consider turning re_evaluate to False for saving computational cycles. |
True |
re_evaluate_parents_first |
Optional[bool] |
This is to be specified only when
|
None |
Source code in evotorch/algorithms/mapelites.py
def __init__(
self,
problem: Problem,
*,
operators: Iterable,
feature_grid: Iterable,
re_evaluate: bool = True,
re_evaluate_parents_first: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the MAPElites algorithm.
Args:
problem: The problem object to work on. This problem object
is expected to have one objective, and also have its
`eval_data_length` set as an integer that is greater than
or equal to 1.
operators: Operators to be used by the MAPElites algorithm.
Expected as an iterable, such as a list or a tuple.
Each item within this iterable object is expected either
as an instance of [Operator][evotorch.operators.base.Operator],
or as a function which receives the decision values of
multiple solutions in a PyTorch tensor and returns a modified
copy.
re_evaluate: Whether or not to evaluate the solutions
that were already evaluated in the previous generations.
By default, this is set as True.
The reason behind this default setting is that,
in problems where the evaluation procedure is noisy,
by re-evaluating the already-evaluated solutions,
we prevent the bad solutions that were luckily evaluated
from hanging onto the population.
Instead, at every generation, each solution must go through
the evaluation procedure again and prove their worth.
For problems whose evaluation procedures are NOT noisy,
the user might consider turning re_evaluate to False
for saving computational cycles.
re_evaluate_parents_first: This is to be specified only when
`re_evaluate` is True (otherwise to be left as None).
If this is given as True, then it will be assumed that the
provided operators require the parents to be evaluated.
If this is given as False, then it will be assumed that the
provided operators work without looking at the parents'
fitnesses (in which case both parents and children can be
evaluated in a single vectorized computation cycle).
If this is left as None, then whether or not the operators
need to know the parent evaluations will be determined
automatically as follows:
if the operators contain at least one cross-over operator
then `re_evaluate_parents_first` will be internally set as
True; otherwise `re_evaluate_parents_first` will be internally
set as False.
"""
problem.ensure_single_objective()
problem.ensure_numeric()
SearchAlgorithm.__init__(self, problem)
self._feature_grid = problem.as_tensor(feature_grid, use_eval_dtype=True)
self._sense = self._problem.senses[0]
self._popsize = self._feature_grid.shape[0]
self._population = problem.generate_batch(self._popsize)
self._filled = torch.zeros(self._popsize, dtype=torch.bool, device=self._population.device)
ExtendedPopulationMixin.__init__(
self,
re_evaluate=re_evaluate,
re_evaluate_parents_first=re_evaluate_parents_first,
operators=operators,
allow_empty_operators_list=False,
)
SinglePopulationAlgorithmMixin.__init__(self)
make_feature_grid(lower_bounds, upper_bounds, num_bins, *, device=None, dtype=None)
staticmethod
¶
Make a hypergrid for the MAPElites algorithm.
The MAPElites organizes its
population not only according to the fitness, but also according to the
additional evaluation data which are interpreted as the additional features
of the solutions. To organize the current population according to these
MAPElites requires "cells",
each cell having a lower and an upper bound for each feature.
make_map_elites_grid(...)
is a helper function which generates the
required hypergrid of features in such a way that each cell, for each
feature, has the same interval.
The result of this function is a PyTorch tensor, which can be passed to
the feature_grid
keyword argument of
MAPElites.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lower_bounds |
Iterable |
The lower bounds, as a 1-dimensional sequence of numbers. The length of this sequence must be equal to the number of features, and the i-th element must express the lower bound of the i-th feature. |
required |
upper_bounds |
Iterable |
The upper bounds, as a 1-dimensional sequence of numbers. The length of this sequence must be equal to the number of features, and the i-th element must express the upper bound of the i-th feature. |
required |
num_bins |
Union[int, torch.Tensor] |
Can be given as an integer or as a sequence of integers.
If given as an integer |
required |
Returns:
Type | Description |
---|---|
Tensor |
The hypergrid, as a PyTorch tensor. |
Source code in evotorch/algorithms/mapelites.py
@staticmethod
def make_feature_grid(
lower_bounds: Iterable,
upper_bounds: Iterable,
num_bins: Union[int, torch.Tensor],
*,
device: Optional[Device] = None,
dtype: Optional[DType] = None,
) -> torch.Tensor:
"""
Make a hypergrid for the MAPElites algorithm.
The [MAPElites][evotorch.algorithms.mapelites.MAPElites] organizes its
population not only according to the fitness, but also according to the
additional evaluation data which are interpreted as the additional features
of the solutions. To organize the current population according to these
[MAPElites][evotorch.algorithms.mapelites.MAPElites] requires "cells",
each cell having a lower and an upper bound for each feature.
`make_map_elites_grid(...)` is a helper function which generates the
required hypergrid of features in such a way that each cell, for each
feature, has the same interval.
The result of this function is a PyTorch tensor, which can be passed to
the `feature_grid` keyword argument of
[MAPElites][evotorch.algorithms.mapelites.MAPElites].
Args:
lower_bounds: The lower bounds, as a 1-dimensional sequence of numbers.
The length of this sequence must be equal to the number of
features, and the i-th element must express the lower bound
of the i-th feature.
upper_bounds: The upper bounds, as a 1-dimensional sequence of numbers.
The length of this sequence must be equal to the number of
features, and the i-th element must express the upper bound
of the i-th feature.
num_bins: Can be given as an integer or as a sequence of integers.
If given as an integer `n`, then there will be `n` bins for each
feature on the hypergrid. If given as a sequence of integers,
then the i-th element of the sequence will express the number of
bins for the i-th feature.
Returns:
The hypergrid, as a PyTorch tensor.
"""
cast_args = {}
if device is not None:
cast_args["device"] = torch.device(device)
if dtype is not None:
cast_args["dtype"] = to_torch_dtype(dtype)
has_casting = len(cast_args) > 0
if has_casting:
lower_bounds = torch.as_tensor(lower_bounds, **cast_args)
upper_bounds = torch.as_tensor(upper_bounds, **cast_args)
if (not isinstance(lower_bounds, torch.Tensor)) or (not isinstance(upper_bounds, torch.Tensor)):
raise TypeError(
f"While preparing the map elites hypergrid with device={device} and dtype={dtype},"
f"`lower_bounds` and `upper_bounds` were expected as tensors, but their types are different."
f" The type of `lower_bounds` is {type(lower_bounds)}."
f" The type of `upper_bounds` is {type(upper_bounds)}."
)
if lower_bounds.device != upper_bounds.device:
raise ValueError(
f"Cannot determine on which device to place the map elites grid,"
f" because `lower_bounds` and `upper_bounds` are on different devices."
f" The device of `lower_bounds` is {lower_bounds.device}."
f" The device of `upper_bounds` is {upper_bounds.device}."
)
if lower_bounds.dtype != upper_bounds.dtype:
raise ValueError(
f"Cannot determine the dtype of the map elites grid,"
f" because `lower_bounds` and `upper_bounds` have different dtypes."
f" The dtype of `lower_bounds` is {lower_bounds.dtype}."
f" The dtype of `upper_bounds` is {upper_bounds.dtype}."
)
if lower_bounds.size() != upper_bounds.size():
raise ValueError("`lower_bounds` and `upper_bounds` have incompatible shapes")
if lower_bounds.dim() != 1:
raise ValueError("Only 1D tensors are supported for `lower_bounds` and for `upper_bounds`")
dtype = lower_bounds.dtype
device = lower_bounds.device
num_bins = torch.as_tensor(num_bins, dtype=torch.int64, device=device)
if num_bins.dim() == 0:
num_bins = num_bins.expand(lower_bounds.size())
p_inf = torch.tensor([float("inf")], dtype=dtype, device=device)
n_inf = torch.tensor([float("-inf")], dtype=dtype, device=device)
def _make_feature_grid(lb, ub, num_bins):
sp = torch.linspace(lb, ub, num_bins - 1, device=device)
sp = torch.cat((n_inf, sp, p_inf))
return sp.unfold(dimension=0, size=2, step=1).unsqueeze(1)
f_grids = [_make_feature_grid(*bounds) for bounds in zip(lower_bounds, upper_bounds, num_bins)]
return torch.stack([torch.cat(c) for c in itertools.product(*f_grids)])
pycmaes
¶
This namespace contains the PyCMAES class, which is a wrapper
for the CMA-ES implementation of the cma
package.
PyCMAES (SearchAlgorithm, SinglePopulationAlgorithmMixin)
¶
This is an interface class between the CMAES implementation
within the cma
package developed within the GitHub repository
CMA-ES/pycma.
References:
Nikolaus Hansen, Youhei Akimoto, and Petr Baudis.
CMA-ES/pycma on Github. Zenodo, DOI:10.5281/zenodo.2559634,
February 2019.
<https://github.com/CMA-ES/pycma>
Nikolaus Hansen, Andreas Ostermeier (2001).
Completely Derandomized Self-Adaptation in Evolution Strategies.
Source code in evotorch/algorithms/pycmaes.py
class PyCMAES(SearchAlgorithm, SinglePopulationAlgorithmMixin):
"""
PyCMAES: Covariance Matrix Adaptation Evolution Strategy.
This is an interface class between the CMAES implementation
within the `cma` package developed within the GitHub repository
CMA-ES/pycma.
References:
Nikolaus Hansen, Youhei Akimoto, and Petr Baudis.
CMA-ES/pycma on Github. Zenodo, DOI:10.5281/zenodo.2559634,
February 2019.
<https://github.com/CMA-ES/pycma>
Nikolaus Hansen, Andreas Ostermeier (2001).
Completely Derandomized Self-Adaptation in Evolution Strategies.
"""
def __init__(
self,
problem: Problem,
*,
stdev_init: RealOrVector, # sigma0
popsize: Optional[int] = None, # popsize
center_init: Optional[Vector] = None, # x0
center_learning_rate: Optional[float] = None, # CMA_cmean
cov_learning_rate: Optional[float] = None, # CMA_on
rankmu_learning_rate: Optional[float] = None, # CMA_rankmu
rankone_learning_rate: Optional[float] = None, # CMA_rankone
stdev_min: Optional[Union[float, np.ndarray]] = None, # minstd
stdev_max: Optional[Union[float, np.ndarray]] = None, # maxstd
separable: bool = False, # CMA_diagonal
obj_index: Optional[int] = None,
cma_options: dict = {},
):
"""
`__init__(...)`: Initialize the PyCMAES solver.
Args:
problem: The problem object which is being worked on.
stdev_init: Initial standard deviation as a scalar or
as a 1-dimensional array.
popsize: Population size. Can be specified as an int,
or can be left as None to let the CMAES solver
decide the population size according to the length
of a solution.
center_init: Initial center point of the search distribution.
Can be given as a Solution or as a 1-D array.
If left as None, an initial center point is generated
with the help of the problem object's `generate_values(...)`
method.
center_learning_rate: Learning rate for updating the mean
of the search distribution. Leaving this as None
means that the CMAES solver is to use its own default,
which is documented as 1.0.
cov_learning_rate: Learning rate for updating the covariance
matrix of the search distribution. This hyperparameter
acts as a common multiplier for rank_one update and rank_mu
update of the covariance matrix. Leaving this as None
means that the CMAES solver is to use its own default,
which is documented as 1.0.
rankmu_learning_rate: Learning rate for the rank_mu update
of the covariance matrix of the search distribution.
Leaving this as None means that the CMAES solver is to use
its own default, which is documented as 1.0.
rankone_learning_rate: Learning rate for the rank_one update
of the covariance matrix of the search distribution.
Leaving this as None means that the CMAES solver is to use
its own default, which is documented as 1.0.
stdev_min: Minimum allowed standard deviation of the search
distribution. Leaving this as None means that no such
boundary is to be used.
Can be given as None, as a scalar, or as a 1-dimensional
array.
stdev_max: Maximum allowed standard deviation of the search
distribution. Leaving this as None means that no such
boundary is to be used.
Can be given as None, as a scalar, or as a 1-dimensional
array.
separable: Provide this as True if you would like the problem
to be treated as a separable one. Treating a problem
as separable means to adapt only the diagonal parts
of the covariance matrix and to keep the non-diagonal
parts 0. High dimensional problems result in large
covariance matrices on which operating is computationally
expensive. Therefore, for such high dimensional problems,
setting `separable` as True might be useful.
If, instead, you would like to configure on which
iterations the diagonal parts of the covariance matrix
are to be adapted, then it is recommended to leave
`separable` as False and set a new value for the key
"CMA_diagonal" via `cma_options` (see the official
documentation of pycma for details regarding the
"CMA_diagonal" setting).
obj_index: Objective index according to which evaluation
of the solution will be done.
cma_options: Any other configuration for the CMAES solver
can be passed via the cma_options dictionary.
"""
# Make sure that the cma module is installed
if cma is None:
raise ImportError(f"The class {type(self).__name__} is only available if the package `cma` is installed.")
# Initialize the base class
SearchAlgorithm.__init__(self, problem, center=self._get_center)
# Ensure that the problem is numeric
problem.ensure_numeric()
# Store the objective index
self._obj_index = problem.normalize_obj_index(obj_index)
# If `center_init` is not given, generate an initial solution
# with the help of the problem object.
# Otherwise, use the given initial solution as the starting
# point in the search space.
if center_init is None:
x0 = self._problem.generate_values(1).to("cpu").view(-1).numpy().astype(dtype=float)
else:
x0 = numpy_copy(center_init, dtype=float)
# Store the initial standard deviations
sigma0 = numpy_copy(stdev_init, dtype=float)
# Generate an options dictionary to pass to the cma solver.
inopts = {}
for k, v in cma_options.items():
if isinstance(v, torch.Tensor):
v = numpy_copy(v, dtype=float)
inopts[k] = v
# Remove the number of iterations boundary
if "maxiter" not in inopts:
inopts["maxiter"] = np.inf
# Below is a temporary helper function for safely storing the configuration items.
# This inner function updates the `inopts` variable.
def store_opt(key: str, long_name: str, value: Any, converter: Callable):
# Here, `key` represents the configuration key used by pycma
# `long_name` represents the configuration's long name used by this class
# `value` is the configuration value associated with `key`.
# Declare that this inner function accesses the `inopts` variable.
nonlocal inopts
if value is None:
# If the provided `value` is None, then there is no configuration to store.
# So, we just leave this inner function.
return
if key in inopts:
# If the given `key` already exists within `inopts`, this means that the configuration was specified
# twice: via the keyword argument `cma_options` AND via a keyword argument.
# We raise an error and inform the user about this redundancy.
raise ValueError(
f"The configuration {repr(key)} was redundantly provided"
f" both via the initialization argument {long_name}"
f" and via the cma_options dictionary."
f" {long_name}={repr(value)};"
f" cma_options[{repr(key)}]={repr(inopts[key])}."
)
inopts[key] = converter(value)
# Temporary helper function which makes sure that `x` is a numpy array or a float.
def array_or_float(x):
if is_sequence(x):
return numpy_copy(x)
else:
return float(x)
# Store the cma configuration received through the initialization arguments (and raise error if there is
# redundancy with the cma_options dictionary).
store_opt("popsize", "popsize", popsize, int)
store_opt("CMA_cmean", "center_learning_rate", center_learning_rate, float)
store_opt("CMA_on", "cov_learning_rate", cov_learning_rate, float)
store_opt("CMA_rankmu", "rankmu_learning_rate", rankmu_learning_rate, float)
store_opt("CMA_rankone", "rankone_learning_rate", rankone_learning_rate, float)
store_opt("minstd", "stdev_min", stdev_min, array_or_float)
store_opt("maxstd", "stdev_max", stdev_max, array_or_float)
if separable:
store_opt("CMA_diagonal", "separable", separable, bool)
# If the problem defines lower and upper bounds, pass these into the options dict.
def process_bounds(bounds: RealOrVector) -> np.ndarray:
if bounds is None:
return None
else:
if is_sequence(bounds):
bounds = numpy_copy(bounds)
else:
bounds = np.array(float(bounds)).repeat(self._problem.solution_length)
return bounds
lb = process_bounds(self._problem.lower_bounds)
ub = process_bounds(self._problem.upper_bounds)
register_bounds = False
if lb is not None and ub is None:
ub = np.array(np.inf).repeat(self._problem.solution_length)
register_bounds = True
elif lb is None and ub is not None:
lb = np.array(-(np.inf)).repeat(self._problem.solution_length)
register_bounds = True
elif lb is not None and ub is not None:
register_bounds = True
if register_bounds:
inopts["bounds"] = [lb, ub]
# Generate a random seed using the problem object for the sake of reproducibility.
if "seed" not in inopts:
inopts["seed"] = int(self._problem.make_randint(tuple(), n=(2**32) - 100) + 100)
# Instantiate the CMAEvolutionStrategy with the prepared configuration items.
self._es = cma.CMAEvolutionStrategy(x0, sigma0, inopts)
# Initialize the population.
self._population: SolutionBatch = self._problem.generate_batch(self._es.popsize, empty=True)
# Use the SinglePopulationAlgorithmMixin to enable additional status reports regarding the population.
SinglePopulationAlgorithmMixin.__init__(self)
@property
def population(self) -> SolutionBatch:
"""Population generated by the CMA-ES algorithm"""
return self._population
def _step(self):
"""Perform a step of the CMA-ES solver"""
asked = self._es.ask()
self._population.access_values()[:] = torch.as_tensor(
np.asarray(asked), dtype=self._problem.dtype, device=self._population.device
)
self._problem.evaluate(self._population)
scores = numpy_copy(self._population.utility(self._obj_index), dtype=float)
self._es.tell(asked, -1.0 * scores)
def _get_center(self) -> torch.Tensor:
return torch.as_tensor(self._es.result[5], dtype=self._population.dtype, device=self._population.device)
@property
def obj_index(self) -> int:
"""Index of the objective being focused on"""
return self._obj_index
obj_index: int
property
readonly
¶
Index of the objective being focused on
population: SolutionBatch
property
readonly
¶
Population generated by the CMA-ES algorithm
__init__(self, problem, *, stdev_init, popsize=None, center_init=None, center_learning_rate=None, cov_learning_rate=None, rankmu_learning_rate=None, rankone_learning_rate=None, stdev_min=None, stdev_max=None, separable=False, obj_index=None, cma_options={})
special
¶
__init__(...)
: Initialize the PyCMAES solver.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object which is being worked on. |
required |
stdev_init |
Union[float, Iterable[float], torch.Tensor] |
Initial standard deviation as a scalar or as a 1-dimensional array. |
required |
popsize |
Optional[int] |
Population size. Can be specified as an int, or can be left as None to let the CMAES solver decide the population size according to the length of a solution. |
None |
center_init |
Union[Iterable[float], torch.Tensor] |
Initial center point of the search distribution.
Can be given as a Solution or as a 1-D array.
If left as None, an initial center point is generated
with the help of the problem object's |
None |
center_learning_rate |
Optional[float] |
Learning rate for updating the mean of the search distribution. Leaving this as None means that the CMAES solver is to use its own default, which is documented as 1.0. |
None |
cov_learning_rate |
Optional[float] |
Learning rate for updating the covariance matrix of the search distribution. This hyperparameter acts as a common multiplier for rank_one update and rank_mu update of the covariance matrix. Leaving this as None means that the CMAES solver is to use its own default, which is documented as 1.0. |
None |
rankmu_learning_rate |
Optional[float] |
Learning rate for the rank_mu update of the covariance matrix of the search distribution. Leaving this as None means that the CMAES solver is to use its own default, which is documented as 1.0. |
None |
rankone_learning_rate |
Optional[float] |
Learning rate for the rank_one update of the covariance matrix of the search distribution. Leaving this as None means that the CMAES solver is to use its own default, which is documented as 1.0. |
None |
stdev_min |
Union[float, numpy.ndarray] |
Minimum allowed standard deviation of the search distribution. Leaving this as None means that no such boundary is to be used. Can be given as None, as a scalar, or as a 1-dimensional array. |
None |
stdev_max |
Union[float, numpy.ndarray] |
Maximum allowed standard deviation of the search distribution. Leaving this as None means that no such boundary is to be used. Can be given as None, as a scalar, or as a 1-dimensional array. |
None |
separable |
bool |
Provide this as True if you would like the problem
to be treated as a separable one. Treating a problem
as separable means to adapt only the diagonal parts
of the covariance matrix and to keep the non-diagonal
parts 0. High dimensional problems result in large
covariance matrices on which operating is computationally
expensive. Therefore, for such high dimensional problems,
setting |
False |
obj_index |
Optional[int] |
Objective index according to which evaluation of the solution will be done. |
None |
cma_options |
dict |
Any other configuration for the CMAES solver can be passed via the cma_options dictionary. |
{} |
Source code in evotorch/algorithms/pycmaes.py
def __init__(
self,
problem: Problem,
*,
stdev_init: RealOrVector, # sigma0
popsize: Optional[int] = None, # popsize
center_init: Optional[Vector] = None, # x0
center_learning_rate: Optional[float] = None, # CMA_cmean
cov_learning_rate: Optional[float] = None, # CMA_on
rankmu_learning_rate: Optional[float] = None, # CMA_rankmu
rankone_learning_rate: Optional[float] = None, # CMA_rankone
stdev_min: Optional[Union[float, np.ndarray]] = None, # minstd
stdev_max: Optional[Union[float, np.ndarray]] = None, # maxstd
separable: bool = False, # CMA_diagonal
obj_index: Optional[int] = None,
cma_options: dict = {},
):
"""
`__init__(...)`: Initialize the PyCMAES solver.
Args:
problem: The problem object which is being worked on.
stdev_init: Initial standard deviation as a scalar or
as a 1-dimensional array.
popsize: Population size. Can be specified as an int,
or can be left as None to let the CMAES solver
decide the population size according to the length
of a solution.
center_init: Initial center point of the search distribution.
Can be given as a Solution or as a 1-D array.
If left as None, an initial center point is generated
with the help of the problem object's `generate_values(...)`
method.
center_learning_rate: Learning rate for updating the mean
of the search distribution. Leaving this as None
means that the CMAES solver is to use its own default,
which is documented as 1.0.
cov_learning_rate: Learning rate for updating the covariance
matrix of the search distribution. This hyperparameter
acts as a common multiplier for rank_one update and rank_mu
update of the covariance matrix. Leaving this as None
means that the CMAES solver is to use its own default,
which is documented as 1.0.
rankmu_learning_rate: Learning rate for the rank_mu update
of the covariance matrix of the search distribution.
Leaving this as None means that the CMAES solver is to use
its own default, which is documented as 1.0.
rankone_learning_rate: Learning rate for the rank_one update
of the covariance matrix of the search distribution.
Leaving this as None means that the CMAES solver is to use
its own default, which is documented as 1.0.
stdev_min: Minimum allowed standard deviation of the search
distribution. Leaving this as None means that no such
boundary is to be used.
Can be given as None, as a scalar, or as a 1-dimensional
array.
stdev_max: Maximum allowed standard deviation of the search
distribution. Leaving this as None means that no such
boundary is to be used.
Can be given as None, as a scalar, or as a 1-dimensional
array.
separable: Provide this as True if you would like the problem
to be treated as a separable one. Treating a problem
as separable means to adapt only the diagonal parts
of the covariance matrix and to keep the non-diagonal
parts 0. High dimensional problems result in large
covariance matrices on which operating is computationally
expensive. Therefore, for such high dimensional problems,
setting `separable` as True might be useful.
If, instead, you would like to configure on which
iterations the diagonal parts of the covariance matrix
are to be adapted, then it is recommended to leave
`separable` as False and set a new value for the key
"CMA_diagonal" via `cma_options` (see the official
documentation of pycma for details regarding the
"CMA_diagonal" setting).
obj_index: Objective index according to which evaluation
of the solution will be done.
cma_options: Any other configuration for the CMAES solver
can be passed via the cma_options dictionary.
"""
# Make sure that the cma module is installed
if cma is None:
raise ImportError(f"The class {type(self).__name__} is only available if the package `cma` is installed.")
# Initialize the base class
SearchAlgorithm.__init__(self, problem, center=self._get_center)
# Ensure that the problem is numeric
problem.ensure_numeric()
# Store the objective index
self._obj_index = problem.normalize_obj_index(obj_index)
# If `center_init` is not given, generate an initial solution
# with the help of the problem object.
# Otherwise, use the given initial solution as the starting
# point in the search space.
if center_init is None:
x0 = self._problem.generate_values(1).to("cpu").view(-1).numpy().astype(dtype=float)
else:
x0 = numpy_copy(center_init, dtype=float)
# Store the initial standard deviations
sigma0 = numpy_copy(stdev_init, dtype=float)
# Generate an options dictionary to pass to the cma solver.
inopts = {}
for k, v in cma_options.items():
if isinstance(v, torch.Tensor):
v = numpy_copy(v, dtype=float)
inopts[k] = v
# Remove the number of iterations boundary
if "maxiter" not in inopts:
inopts["maxiter"] = np.inf
# Below is a temporary helper function for safely storing the configuration items.
# This inner function updates the `inopts` variable.
def store_opt(key: str, long_name: str, value: Any, converter: Callable):
# Here, `key` represents the configuration key used by pycma
# `long_name` represents the configuration's long name used by this class
# `value` is the configuration value associated with `key`.
# Declare that this inner function accesses the `inopts` variable.
nonlocal inopts
if value is None:
# If the provided `value` is None, then there is no configuration to store.
# So, we just leave this inner function.
return
if key in inopts:
# If the given `key` already exists within `inopts`, this means that the configuration was specified
# twice: via the keyword argument `cma_options` AND via a keyword argument.
# We raise an error and inform the user about this redundancy.
raise ValueError(
f"The configuration {repr(key)} was redundantly provided"
f" both via the initialization argument {long_name}"
f" and via the cma_options dictionary."
f" {long_name}={repr(value)};"
f" cma_options[{repr(key)}]={repr(inopts[key])}."
)
inopts[key] = converter(value)
# Temporary helper function which makes sure that `x` is a numpy array or a float.
def array_or_float(x):
if is_sequence(x):
return numpy_copy(x)
else:
return float(x)
# Store the cma configuration received through the initialization arguments (and raise error if there is
# redundancy with the cma_options dictionary).
store_opt("popsize", "popsize", popsize, int)
store_opt("CMA_cmean", "center_learning_rate", center_learning_rate, float)
store_opt("CMA_on", "cov_learning_rate", cov_learning_rate, float)
store_opt("CMA_rankmu", "rankmu_learning_rate", rankmu_learning_rate, float)
store_opt("CMA_rankone", "rankone_learning_rate", rankone_learning_rate, float)
store_opt("minstd", "stdev_min", stdev_min, array_or_float)
store_opt("maxstd", "stdev_max", stdev_max, array_or_float)
if separable:
store_opt("CMA_diagonal", "separable", separable, bool)
# If the problem defines lower and upper bounds, pass these into the options dict.
def process_bounds(bounds: RealOrVector) -> np.ndarray:
if bounds is None:
return None
else:
if is_sequence(bounds):
bounds = numpy_copy(bounds)
else:
bounds = np.array(float(bounds)).repeat(self._problem.solution_length)
return bounds
lb = process_bounds(self._problem.lower_bounds)
ub = process_bounds(self._problem.upper_bounds)
register_bounds = False
if lb is not None and ub is None:
ub = np.array(np.inf).repeat(self._problem.solution_length)
register_bounds = True
elif lb is None and ub is not None:
lb = np.array(-(np.inf)).repeat(self._problem.solution_length)
register_bounds = True
elif lb is not None and ub is not None:
register_bounds = True
if register_bounds:
inopts["bounds"] = [lb, ub]
# Generate a random seed using the problem object for the sake of reproducibility.
if "seed" not in inopts:
inopts["seed"] = int(self._problem.make_randint(tuple(), n=(2**32) - 100) + 100)
# Instantiate the CMAEvolutionStrategy with the prepared configuration items.
self._es = cma.CMAEvolutionStrategy(x0, sigma0, inopts)
# Initialize the population.
self._population: SolutionBatch = self._problem.generate_batch(self._es.popsize, empty=True)
# Use the SinglePopulationAlgorithmMixin to enable additional status reports regarding the population.
SinglePopulationAlgorithmMixin.__init__(self)
restarter
special
¶
This namespace contains the implementations of various restart mechanisms
modify_restart
¶
IPOP (ModifyingRestart)
¶
Source code in evotorch/algorithms/restarter/modify_restart.py
class IPOP(ModifyingRestart):
def __init__(
self,
problem: Problem,
algorithm_class: Type[SearchAlgorithm],
algorithm_args: dict = {},
min_fitness_stdev: float = 1e-9,
popsize_multiplier: float = 2,
):
"""IPOP restart, terminates algorithm when minimum standard deviation in fitness values is hit, multiplies the population size in that case
References:
Glasmachers, Tobias, and Oswin Krause.
"The hessian estimation evolution strategy."
PPSN 2020
Args:
problem (Problem): A Problem to solve
algorithm_class (Type[SearchAlgorithm]): The class of the search algorithm to restart
algorithm_args (dict): Arguments to pass to the search algorithm on restart
min_fitness_stdev (float): The minimum standard deviation in fitnesses; going below this will trigger a restart
popsize_multiplier (float): A multiplier on the population size within algorithm_args
"""
super().__init__(problem, algorithm_class, algorithm_args)
self.min_fitness_stdev = min_fitness_stdev
self.popsize_multiplier = popsize_multiplier
def _search_algorithm_terminated(self) -> bool:
# Additional check on standard deviation of fitnesses of population
if self.search_algorithm.population.evals.std() < self.min_fitness_stdev:
return True
return super()._search_algorithm_terminated()
def _modify_algorithm_args(self) -> None:
# Only modify arguments if this isn't the first restart
if self.num_restarts >= 1:
new_args = deepcopy(self._algorithm_args)
# Multiply population size
new_args["popsize"] = int(self.popsize_multiplier * len(self.search_algorithm.population))
self._algorithm_args = new_args
__init__(self, problem, algorithm_class, algorithm_args={}, min_fitness_stdev=1e-09, popsize_multiplier=2)
special
¶
IPOP restart, terminates algorithm when minimum standard deviation in fitness values is hit, multiplies the population size in that case
References
Glasmachers, Tobias, and Oswin Krause. "The hessian estimation evolution strategy." PPSN 2020
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
A Problem to solve |
required |
algorithm_class |
Type[SearchAlgorithm] |
The class of the search algorithm to restart |
required |
algorithm_args |
dict |
Arguments to pass to the search algorithm on restart |
{} |
min_fitness_stdev |
float |
The minimum standard deviation in fitnesses; going below this will trigger a restart |
1e-09 |
popsize_multiplier |
float |
A multiplier on the population size within algorithm_args |
2 |
Source code in evotorch/algorithms/restarter/modify_restart.py
def __init__(
self,
problem: Problem,
algorithm_class: Type[SearchAlgorithm],
algorithm_args: dict = {},
min_fitness_stdev: float = 1e-9,
popsize_multiplier: float = 2,
):
"""IPOP restart, terminates algorithm when minimum standard deviation in fitness values is hit, multiplies the population size in that case
References:
Glasmachers, Tobias, and Oswin Krause.
"The hessian estimation evolution strategy."
PPSN 2020
Args:
problem (Problem): A Problem to solve
algorithm_class (Type[SearchAlgorithm]): The class of the search algorithm to restart
algorithm_args (dict): Arguments to pass to the search algorithm on restart
min_fitness_stdev (float): The minimum standard deviation in fitnesses; going below this will trigger a restart
popsize_multiplier (float): A multiplier on the population size within algorithm_args
"""
super().__init__(problem, algorithm_class, algorithm_args)
self.min_fitness_stdev = min_fitness_stdev
self.popsize_multiplier = popsize_multiplier
restart
¶
Restart (SearchAlgorithm)
¶
Source code in evotorch/algorithms/restarter/restart.py
class Restart(SearchAlgorithm):
def __init__(
self,
problem: Problem,
algorithm_class: Type[SearchAlgorithm],
algorithm_args: dict = {},
**kwargs: Any,
):
"""Base class for independent restarts methods
Args:
problem (Problem): A Problem to solve
algorithm_class (Type[SearchAlgorithm]): The class of the search algorithm to restart
algorithm_args (dict): Arguments to pass to the search algorithm on restart
"""
SearchAlgorithm.__init__(
self,
problem,
search_algorithm=self._get_sa_status,
num_restarts=self._get_num_restarts,
algorithm_terminated=self._search_algorithm_terminated,
**kwargs,
)
self._algorithm_class = algorithm_class
self._algorithm_args = algorithm_args
self.num_restarts = 0
self._restart()
def _get_sa_status(self) -> dict:
"""Status dictionary of search algorithm"""
return self.search_algorithm.status
def _get_num_restarts(self) -> int:
"""Number of restarts (including the first start) so far"""
return self.num_restarts
def _restart(self) -> None:
"""Restart the search algorithm"""
self.search_algorithm = self._algorithm_class(self._problem, **self._algorithm_args)
self.num_restarts += 1
def _search_algorithm_terminated(self) -> bool:
"""Boolean flag for search algorithm terminated"""
return self.search_algorithm.is_terminated
def _step(self):
# Step the search algorithm
self.search_algorithm.step()
# If stepping the search algorithm has reached a terminal state, restart the search algorithm
if self._search_algorithm_terminated():
self._restart()
__init__(self, problem, algorithm_class, algorithm_args={}, **kwargs)
special
¶
Base class for independent restarts methods
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
A Problem to solve |
required |
algorithm_class |
Type[SearchAlgorithm] |
The class of the search algorithm to restart |
required |
algorithm_args |
dict |
Arguments to pass to the search algorithm on restart |
{} |
Source code in evotorch/algorithms/restarter/restart.py
def __init__(
self,
problem: Problem,
algorithm_class: Type[SearchAlgorithm],
algorithm_args: dict = {},
**kwargs: Any,
):
"""Base class for independent restarts methods
Args:
problem (Problem): A Problem to solve
algorithm_class (Type[SearchAlgorithm]): The class of the search algorithm to restart
algorithm_args (dict): Arguments to pass to the search algorithm on restart
"""
SearchAlgorithm.__init__(
self,
problem,
search_algorithm=self._get_sa_status,
num_restarts=self._get_num_restarts,
algorithm_terminated=self._search_algorithm_terminated,
**kwargs,
)
self._algorithm_class = algorithm_class
self._algorithm_args = algorithm_args
self.num_restarts = 0
self._restart()
searchalgorithm
¶
This namespace contains SearchAlgorithm
, the base class for all
evolutionary algorithms.
LazyReporter
¶
This class provides an interface of storing and reporting status. This class is designed to be inherited by other classes.
Let us assume that we have the following class inheriting from LazyReporter:
class Example(LazyReporter):
def __init__(self):
LazyReporter.__init__(self, a=self._get_a, b=self._get_b)
def _get_a(self):
return ... # return the status 'a'
def _get_b(self):
return ... # return the status 'b'
At its initialization phase, this Example class registers its methods
_get_a
and _get_b
as its status providers.
Having the LazyReporter interface, the Example class gains a status
property:
ex = Example()
print(ex.status["a"]) # Get the status 'a'
print(ex.status["b"]) # Get the status 'b'
Once a status is queried, its computation result is stored to be re-used later. After running the code above, if we query the status 'a' again:
then the status 'a' is not computed again (i.e. _get_a
is not
called again). Instead, the stored status value of 'a' is re-used.
To force re-computation of the status values, one can execute:
Or the Example instance can clear its status from within one of its methods:
Source code in evotorch/algorithms/searchalgorithm.py
class LazyReporter:
"""
This class provides an interface of storing and reporting status.
This class is designed to be inherited by other classes.
Let us assume that we have the following class inheriting from
LazyReporter:
```python
class Example(LazyReporter):
def __init__(self):
LazyReporter.__init__(self, a=self._get_a, b=self._get_b)
def _get_a(self):
return ... # return the status 'a'
def _get_b(self):
return ... # return the status 'b'
```
At its initialization phase, this Example class registers its methods
``_get_a`` and ``_get_b`` as its status providers.
Having the LazyReporter interface, the Example class gains a ``status``
property:
```python
ex = Example()
print(ex.status["a"]) # Get the status 'a'
print(ex.status["b"]) # Get the status 'b'
```
Once a status is queried, its computation result is stored to be re-used
later. After running the code above, if we query the status 'a' again:
```python
print(ex.status["a"]) # Getting the status 'a' again
```
then the status 'a' is not computed again (i.e. ``_get_a`` is not
called again). Instead, the stored status value of 'a' is re-used.
To force re-computation of the status values, one can execute:
```python
ex.clear_status()
```
Or the Example instance can clear its status from within one of its
methods:
```python
class Example(LazyReporter):
...
def some_method(self):
...
self.clear_status()
```
"""
@staticmethod
def _missing_status_producer():
return None
def __init__(self, **kwargs):
"""
`__init__(...)`: Initialize the LazyReporter instance.
Args:
kwargs: Keyword arguments, mapping the status keys to the
methods or functions providing the status values.
"""
self.__getters = kwargs
self.__computed = {}
def get_status_value(self, key: Any) -> Any:
"""
Get the specified status value.
Args:
key: The key (i.e. the name) of the status variable.
"""
if key not in self.__computed:
self.__computed[key] = self.__getters[key]()
return self.__computed[key]
def has_status_key(self, key: Any) -> bool:
"""
Return True if there is a status variable with the specified key.
Otherwise, return False.
Args:
key: The key (i.e. the name) of the status variable whose
existence is to be checked.
Returns:
True if there is such a key; False otherwise.
"""
return key in self.__getters
def iter_status_keys(self):
"""Iterate over the status keys."""
return self.__getters.keys()
def clear_status(self):
"""Clear all the stored values of the status variables."""
self.__computed.clear()
def is_status_computed(self, key) -> bool:
"""
Return True if the specified status is computed yet.
Return False otherwise.
Args:
key: The key (i.e. the name) of the status variable.
Returns:
True if the status of the given key is computed; False otherwise.
"""
return key in self.__computed
def update_status(self, additional_status: Mapping):
"""
Update the stored status with an external dict-like object.
The given dict-like object can override existing status keys
with new values, and also bring new keys to the status.
Args:
additional_status: A dict-like object storing the status update.
"""
for k, v in additional_status.items():
if k not in self.__getters:
self.__getters[k] = LazyReporter._missing_status_producer
self.__computed[k] = v
def add_status_getters(self, getters: Mapping):
"""
Register additional status-getting functions.
Args:
getters: A dictionary-like object where the keys are the
additional status variable names, and values are functions
which are expected to compute/retrieve the values for those
status variables.
"""
self.__getters.update(getters)
@property
def status(self) -> "LazyStatusDict":
"""Get a LazyStatusDict which is bound to this LazyReporter."""
return LazyStatusDict(self)
status: LazyStatusDict
property
readonly
¶
Get a LazyStatusDict which is bound to this LazyReporter.
__init__(self, **kwargs)
special
¶
__init__(...)
: Initialize the LazyReporter instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs |
Keyword arguments, mapping the status keys to the methods or functions providing the status values. |
{} |
Source code in evotorch/algorithms/searchalgorithm.py
add_status_getters(self, getters)
¶
Register additional status-getting functions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
getters |
Mapping |
A dictionary-like object where the keys are the additional status variable names, and values are functions which are expected to compute/retrieve the values for those status variables. |
required |
Source code in evotorch/algorithms/searchalgorithm.py
def add_status_getters(self, getters: Mapping):
"""
Register additional status-getting functions.
Args:
getters: A dictionary-like object where the keys are the
additional status variable names, and values are functions
which are expected to compute/retrieve the values for those
status variables.
"""
self.__getters.update(getters)
clear_status(self)
¶
get_status_value(self, key)
¶
Get the specified status value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Any |
The key (i.e. the name) of the status variable. |
required |
Source code in evotorch/algorithms/searchalgorithm.py
has_status_key(self, key)
¶
Return True if there is a status variable with the specified key. Otherwise, return False.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Any |
The key (i.e. the name) of the status variable whose existence is to be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
True if there is such a key; False otherwise. |
Source code in evotorch/algorithms/searchalgorithm.py
def has_status_key(self, key: Any) -> bool:
"""
Return True if there is a status variable with the specified key.
Otherwise, return False.
Args:
key: The key (i.e. the name) of the status variable whose
existence is to be checked.
Returns:
True if there is such a key; False otherwise.
"""
return key in self.__getters
is_status_computed(self, key)
¶
Return True if the specified status is computed yet. Return False otherwise.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
The key (i.e. the name) of the status variable. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the status of the given key is computed; False otherwise. |
Source code in evotorch/algorithms/searchalgorithm.py
iter_status_keys(self)
¶
update_status(self, additional_status)
¶
Update the stored status with an external dict-like object. The given dict-like object can override existing status keys with new values, and also bring new keys to the status.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
additional_status |
Mapping |
A dict-like object storing the status update. |
required |
Source code in evotorch/algorithms/searchalgorithm.py
def update_status(self, additional_status: Mapping):
"""
Update the stored status with an external dict-like object.
The given dict-like object can override existing status keys
with new values, and also bring new keys to the status.
Args:
additional_status: A dict-like object storing the status update.
"""
for k, v in additional_status.items():
if k not in self.__getters:
self.__getters[k] = LazyReporter._missing_status_producer
self.__computed[k] = v
LazyStatusDict (Mapping)
¶
A Mapping subclass used by the status
property of a LazyReporter
.
The interface of this object is similar to a read-only dictionary.
Source code in evotorch/algorithms/searchalgorithm.py
class LazyStatusDict(Mapping):
"""
A Mapping subclass used by the `status` property of a `LazyReporter`.
The interface of this object is similar to a read-only dictionary.
"""
def __init__(self, lazy_reporter: LazyReporter):
"""
`__init__(...)`: Initialize the LazyStatusDict object.
Args:
lazy_reporter: The LazyReporter object whose status is to be
accessed.
"""
super().__init__()
self.__lazy_reporter = lazy_reporter
def __getitem__(self, key: Any) -> Any:
result = self.__lazy_reporter.get_status_value(key)
if isinstance(result, (torch.Tensor, ObjectArray)):
result = as_read_only_tensor(result)
return result
def __len__(self) -> int:
return len(list(self.__lazy_reporter.iter_status_keys()))
def __iter__(self):
for k in self.__lazy_reporter.iter_status_keys():
yield k
def __contains__(self, key: Any) -> bool:
return self.__lazy_reporter.has_status_key(key)
def _to_string(self) -> str:
with io.StringIO() as f:
print("<" + type(self).__name__, file=f)
for k in self.__lazy_reporter.iter_status_keys():
if self.__lazy_reporter.is_status_computed(k):
r = repr(self.__lazy_reporter.get_status_value(k))
else:
r = "<not yet computed>"
print(" ", k, "=", r, file=f)
print(">", end="", file=f)
f.seek(0)
entire_str = f.read()
return entire_str
def __str__(self) -> str:
return self._to_string()
def __repr__(self) -> str:
return self._to_string()
__init__(self, lazy_reporter)
special
¶
__init__(...)
: Initialize the LazyStatusDict object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lazy_reporter |
LazyReporter |
The LazyReporter object whose status is to be accessed. |
required |
SearchAlgorithm (LazyReporter)
¶
Base class for all evolutionary search algorithms.
An algorithm developer is expected to inherit from this base class,
and override the method named _step()
to define how a single
step of this new algorithm is performed.
For each core status dictionary element, a new method is expected
to exist within the inheriting class. These status reporting
methods are then registered via the keyword arguments of the
__init__(...)
method of SearchAlgorithm
.
To sum up, a newly developed algorithm inheriting from this base class is expected in this structure:
from evotorch import Problem
class MyNewAlgorithm(SearchAlgorithm):
def __init__(self, problem: Problem):
SearchAlgorithm.__init__(
self, problem, status1=self._get_status1, status2=self._get_status2, ...
)
def _step(self):
# Code that defines how a step of this algorithm
# should work goes here.
...
def _get_status1(self):
# The value returned by this function will be shown
# in the status dictionary, associated with the key
# 'status1'.
return ...
def _get_status2(self):
# The value returned by this function will be shown
# in the status dictionary, associated with the key
# 'status2'.
return ...
Source code in evotorch/algorithms/searchalgorithm.py
class SearchAlgorithm(LazyReporter):
"""
Base class for all evolutionary search algorithms.
An algorithm developer is expected to inherit from this base class,
and override the method named `_step()` to define how a single
step of this new algorithm is performed.
For each core status dictionary element, a new method is expected
to exist within the inheriting class. These status reporting
methods are then registered via the keyword arguments of the
`__init__(...)` method of `SearchAlgorithm`.
To sum up, a newly developed algorithm inheriting from this base
class is expected in this structure:
```python
from evotorch import Problem
class MyNewAlgorithm(SearchAlgorithm):
def __init__(self, problem: Problem):
SearchAlgorithm.__init__(
self, problem, status1=self._get_status1, status2=self._get_status2, ...
)
def _step(self):
# Code that defines how a step of this algorithm
# should work goes here.
...
def _get_status1(self):
# The value returned by this function will be shown
# in the status dictionary, associated with the key
# 'status1'.
return ...
def _get_status2(self):
# The value returned by this function will be shown
# in the status dictionary, associated with the key
# 'status2'.
return ...
```
"""
def __init__(self, problem: Problem, **kwargs):
"""
Initialize the SearchAlgorithm instance.
Args:
problem: Problem to work with.
kwargs: Any additional keyword argument, in the form of `k=f`,
is accepted in this manner: for each pair of `k` and `f`,
`k` is accepted as the status key (i.e. a status variable
name), and `f` is accepted as a function (probably a method
of the inheriting class) that will generate the value of that
status variable.
"""
super().__init__(**kwargs)
self._problem = problem
self._before_step_hook = Hook()
self._after_step_hook = Hook()
self._log_hook = Hook()
self._end_of_run_hook = Hook()
self._steps_count: int = 0
self._first_step_datetime: Optional[datetime] = None
@property
def problem(self) -> Problem:
"""
The problem object which is being worked on.
"""
return self._problem
@property
def before_step_hook(self) -> Hook:
"""
Use this Hook to add more behavior to the search algorithm
to be performed just before executing a step.
"""
return self._before_step_hook
@property
def after_step_hook(self) -> Hook:
"""
Use this Hook to add more behavior to the search algorithm
to be performed just after executing a step.
The dictionaries returned by the functions registered into
this Hook will be accumulated and added into the status
dictionary of the search algorithm.
"""
return self._after_step_hook
@property
def log_hook(self) -> Hook:
"""
Use this Hook to add more behavior to the search algorithm
at the moment of logging the constructed status dictionary.
This Hook is executed after the execution of `after_step_hook`
is complete.
The functions in this Hook are assumed to expect a single
argument, that is the status dictionary of the search algorithm.
"""
return self._log_hook
@property
def end_of_run_hook(self) -> Hook:
"""
Use this Hook to add more behavior to the search algorithm
at the end of a run.
This Hook is executed after all the generations of a run
are done.
The functions in this Hook are assumed to expect a single
argument, that is the status dictionary of the search algorithm.
"""
return self._end_of_run_hook
@property
def step_count(self) -> int:
"""
Number of search steps performed.
This is equivalent to the number of generations, or to the
number of iterations.
"""
return self._steps_count
@property
def steps_count(self) -> int:
"""
Deprecated alias for the `step_count` property.
It is recommended to use the `step_count` property instead.
"""
return self._steps_count
def step(self):
"""
Perform a step of the search algorithm.
"""
self._before_step_hook()
self.clear_status()
if self._first_step_datetime is None:
self._first_step_datetime = datetime.now()
self._step()
self._steps_count += 1
self.update_status({"iter": self._steps_count})
self.update_status(self._problem.status)
extra_status = self._after_step_hook.accumulate_dict()
self.update_status(extra_status)
if len(self._log_hook) >= 1:
self._log_hook(dict(self.status))
def _step(self):
"""
Algorithm developers are expected to override this method
in an inheriting subclass.
The code which defines how a step of the evolutionary algorithm
is performed goes here.
"""
raise NotImplementedError
def run(self, num_generations: int, *, reset_first_step_datetime: bool = True):
"""
Run the algorithm for the given number of generations
(i.e. iterations).
Args:
num_generations: Number of generations.
reset_first_step_datetime: If this argument is given as True,
then, the datetime of the first search step will be forgotten.
Forgetting the first step's datetime means that the first step
taken by this new run will be the new first step datetime.
"""
if reset_first_step_datetime:
self.reset_first_step_datetime()
for _ in range(int(num_generations)):
self.step()
if len(self._end_of_run_hook) >= 1:
self._end_of_run_hook(dict(self.status))
@property
def first_step_datetime(self) -> Optional[datetime]:
"""
Get the datetime when the algorithm took the first search step.
If a step is not taken at all, then the result will be None.
"""
return self._first_step_datetime
def reset_first_step_datetime(self):
"""
Reset (or forget) the first step's datetime.
"""
self._first_step_datetime = None
@property
def is_terminated(self) -> bool:
"""Whether the algorithm is in a terminal state"""
return False
after_step_hook: Hook
property
readonly
¶
Use this Hook to add more behavior to the search algorithm to be performed just after executing a step.
The dictionaries returned by the functions registered into this Hook will be accumulated and added into the status dictionary of the search algorithm.
before_step_hook: Hook
property
readonly
¶
Use this Hook to add more behavior to the search algorithm to be performed just before executing a step.
end_of_run_hook: Hook
property
readonly
¶
Use this Hook to add more behavior to the search algorithm at the end of a run.
This Hook is executed after all the generations of a run are done.
The functions in this Hook are assumed to expect a single argument, that is the status dictionary of the search algorithm.
first_step_datetime: Optional[datetime.datetime]
property
readonly
¶
Get the datetime when the algorithm took the first search step. If a step is not taken at all, then the result will be None.
is_terminated: bool
property
readonly
¶
Whether the algorithm is in a terminal state
log_hook: Hook
property
readonly
¶
Use this Hook to add more behavior to the search algorithm at the moment of logging the constructed status dictionary.
This Hook is executed after the execution of after_step_hook
is complete.
The functions in this Hook are assumed to expect a single argument, that is the status dictionary of the search algorithm.
problem: Problem
property
readonly
¶
The problem object which is being worked on.
step_count: int
property
readonly
¶
Number of search steps performed.
This is equivalent to the number of generations, or to the number of iterations.
steps_count: int
property
readonly
¶
Deprecated alias for the step_count
property.
It is recommended to use the step_count
property instead.
__init__(self, problem, **kwargs)
special
¶
Initialize the SearchAlgorithm instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
Problem to work with. |
required |
kwargs |
Any additional keyword argument, in the form of |
{} |
Source code in evotorch/algorithms/searchalgorithm.py
def __init__(self, problem: Problem, **kwargs):
"""
Initialize the SearchAlgorithm instance.
Args:
problem: Problem to work with.
kwargs: Any additional keyword argument, in the form of `k=f`,
is accepted in this manner: for each pair of `k` and `f`,
`k` is accepted as the status key (i.e. a status variable
name), and `f` is accepted as a function (probably a method
of the inheriting class) that will generate the value of that
status variable.
"""
super().__init__(**kwargs)
self._problem = problem
self._before_step_hook = Hook()
self._after_step_hook = Hook()
self._log_hook = Hook()
self._end_of_run_hook = Hook()
self._steps_count: int = 0
self._first_step_datetime: Optional[datetime] = None
reset_first_step_datetime(self)
¶
run(self, num_generations, *, reset_first_step_datetime=True)
¶
Run the algorithm for the given number of generations (i.e. iterations).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
num_generations |
int |
Number of generations. |
required |
reset_first_step_datetime |
bool |
If this argument is given as True, then, the datetime of the first search step will be forgotten. Forgetting the first step's datetime means that the first step taken by this new run will be the new first step datetime. |
True |
Source code in evotorch/algorithms/searchalgorithm.py
def run(self, num_generations: int, *, reset_first_step_datetime: bool = True):
"""
Run the algorithm for the given number of generations
(i.e. iterations).
Args:
num_generations: Number of generations.
reset_first_step_datetime: If this argument is given as True,
then, the datetime of the first search step will be forgotten.
Forgetting the first step's datetime means that the first step
taken by this new run will be the new first step datetime.
"""
if reset_first_step_datetime:
self.reset_first_step_datetime()
for _ in range(int(num_generations)):
self.step()
if len(self._end_of_run_hook) >= 1:
self._end_of_run_hook(dict(self.status))
step(self)
¶
Perform a step of the search algorithm.
Source code in evotorch/algorithms/searchalgorithm.py
def step(self):
"""
Perform a step of the search algorithm.
"""
self._before_step_hook()
self.clear_status()
if self._first_step_datetime is None:
self._first_step_datetime = datetime.now()
self._step()
self._steps_count += 1
self.update_status({"iter": self._steps_count})
self.update_status(self._problem.status)
extra_status = self._after_step_hook.accumulate_dict()
self.update_status(extra_status)
if len(self._log_hook) >= 1:
self._log_hook(dict(self.status))
SinglePopulationAlgorithmMixin
¶
A mixin class that can be inherited by a SearchAlgorithm subclass.
This mixin class assumes that the inheriting class has the following members:
problem
: The problem object that is associated with the search algorithm. This attribute is already provided by the SearchAlgorithm base class.population
: An attribute or a (possibly read-only) property which stores the population of the search algorithm as aSolutionBatch
instance.
This mixin class also assumes that the inheriting class might
contain an attribute (or a property) named obj_index
.
If there is such an attribute and its value is not None, then this
mixin class assumes that obj_index
represents the index of the
objective that is being focused on.
Upon initialization, this mixin class first determines whether or not
the algorithm is a single-objective one.
In more details, if there is an attribute named obj_index
(and its
value is not None), or if the associated problem has only one objective,
then this mixin class assumes that the inheriting SearchAlgorithm is a
single objective algorithm.
Otherwise, it is assumed that the underlying algorithm works (or might
work) on multiple objectives.
In the single-objective case, this mixin class brings the inheriting
SearchAlgorithm the ability to report the following:
pop_best
(best solution of the population),
pop_best_eval
(evaluation result of the population's best solution),
mean_eval
(mean evaluation result of the population),
median_eval
(median evaluation result of the population).
In the multi-objective case, for each objective i
, this mixin class
brings the inheriting SearchAlgorithm the ability to report the following:
obj<i>_pop_best
(best solution of the population according),
obj<i>_pop_best_eval
(evaluation result of the population's best
solution),
obj<i>_mean_eval
(mean evaluation result of the population)
obj<iP_median_eval
(median evaluation result of the population).
Source code in evotorch/algorithms/searchalgorithm.py
class SinglePopulationAlgorithmMixin:
"""
A mixin class that can be inherited by a SearchAlgorithm subclass.
This mixin class assumes that the inheriting class has the following
members:
- `problem`: The problem object that is associated with the search
algorithm. This attribute is already provided by the SearchAlgorithm
base class.
- `population`: An attribute or a (possibly read-only) property which
stores the population of the search algorithm as a `SolutionBatch`
instance.
This mixin class also assumes that the inheriting class _might_
contain an attribute (or a property) named `obj_index`.
If there is such an attribute and its value is not None, then this
mixin class assumes that `obj_index` represents the index of the
objective that is being focused on.
Upon initialization, this mixin class first determines whether or not
the algorithm is a single-objective one.
In more details, if there is an attribute named `obj_index` (and its
value is not None), or if the associated problem has only one objective,
then this mixin class assumes that the inheriting SearchAlgorithm is a
single objective algorithm.
Otherwise, it is assumed that the underlying algorithm works (or might
work) on multiple objectives.
In the single-objective case, this mixin class brings the inheriting
SearchAlgorithm the ability to report the following:
`pop_best` (best solution of the population),
`pop_best_eval` (evaluation result of the population's best solution),
`mean_eval` (mean evaluation result of the population),
`median_eval` (median evaluation result of the population).
In the multi-objective case, for each objective `i`, this mixin class
brings the inheriting SearchAlgorithm the ability to report the following:
`obj<i>_pop_best` (best solution of the population according),
`obj<i>_pop_best_eval` (evaluation result of the population's best
solution),
`obj<i>_mean_eval` (mean evaluation result of the population)
`obj<iP_median_eval` (median evaluation result of the population).
"""
class ObjectiveStatusReporter:
REPORTABLES = {"pop_best", "pop_best_eval", "mean_eval", "median_eval"}
def __init__(
self,
algorithm: SearchAlgorithm,
*,
obj_index: int,
to_report: str,
):
self.__algorithm = algorithm
self.__obj_index = int(obj_index)
if to_report not in self.REPORTABLES:
raise ValueError(f"Unrecognized report request: {to_report}")
self.__to_report = to_report
@property
def population(self) -> SolutionBatch:
return self.__algorithm.population
@property
def obj_index(self) -> int:
return self.__obj_index
def get_status_value(self, status_key: str) -> Any:
return self.__algorithm.get_status_value(status_key)
def has_status_key(self, status_key: str) -> bool:
return self.__algorithm.has_status_key(status_key)
def _get_pop_best(self):
i = self.population.argbest(self.obj_index)
return clone(self.population[i])
def _get_pop_best_eval(self):
pop_best = None
pop_best_keys = ("pop_best", f"obj{self.obj_index}_pop_best")
for pop_best_key in pop_best_keys:
if self.has_status_key(pop_best_key):
pop_best = self.get_status_value(pop_best_key)
break
if (pop_best is not None) and pop_best.is_evaluated:
return float(pop_best.evals[self.obj_index])
else:
return None
@torch.no_grad()
def _get_mean_eval(self):
return float(torch.mean(self.population.access_evals(self.obj_index)))
@torch.no_grad()
def _get_median_eval(self):
return float(torch.median(self.population.access_evals(self.obj_index)))
def __call__(self):
return getattr(self, "_get_" + self.__to_report)()
def __init__(self, *, exclude: Optional[Iterable] = None, enable: bool = True):
if not enable:
return
ObjectiveStatusReporter = self.ObjectiveStatusReporter
reportables = ObjectiveStatusReporter.REPORTABLES
single_obj: Optional[int] = None
self.__exclude = set() if exclude is None else set(exclude)
if hasattr(self, "obj_index") and (self.obj_index is not None):
single_obj = self.obj_index
elif len(self.problem.senses) == 1:
single_obj = 0
if single_obj is not None:
for reportable in reportables:
if reportable not in self.__exclude:
self.add_status_getters(
{reportable: ObjectiveStatusReporter(self, obj_index=single_obj, to_report=reportable)}
)
else:
for i_obj in range(len(self.problem.senses)):
for reportable in reportables:
if reportable not in self.__exclude:
self.add_status_getters(
{
f"obj{i_obj}_{reportable}": ObjectiveStatusReporter(
self, obj_index=i_obj, to_report=reportable
),
}
)