Index
This namespace contains the implementations of various evolutionary algorithms.
cmaes
¶
This namespace contains the CMAES class, which is a wrapper
for the CMA-ES implementation of the cma
package.
CMAES (SearchAlgorithm, SinglePopulationAlgorithmMixin)
¶
This is an interface class between the CMAES implementation
within the cma
package developed within the GitHub repository
CMA-ES/pycma.
References:
Nikolaus Hansen, Youhei Akimoto, and Petr Baudis.
CMA-ES/pycma on Github. Zenodo, DOI:10.5281/zenodo.2559634,
February 2019.
<https://github.com/CMA-ES/pycma>
Nikolaus Hansen, Andreas Ostermeier (2001).
Completely Derandomized Self-Adaptation in Evolution Strategies.
Source code in evotorch/algorithms/cmaes.py
class CMAES(SearchAlgorithm, SinglePopulationAlgorithmMixin):
"""
CMAES: Covariance Matrix Adaptation Evolution Strategy.
This is an interface class between the CMAES implementation
within the `cma` package developed within the GitHub repository
CMA-ES/pycma.
References:
Nikolaus Hansen, Youhei Akimoto, and Petr Baudis.
CMA-ES/pycma on Github. Zenodo, DOI:10.5281/zenodo.2559634,
February 2019.
<https://github.com/CMA-ES/pycma>
Nikolaus Hansen, Andreas Ostermeier (2001).
Completely Derandomized Self-Adaptation in Evolution Strategies.
"""
def __init__(
self,
problem: Problem,
*,
stdev_init: RealOrVector, # sigma0
popsize: Optional[int] = None, # popsize
center_init: Optional[Vector] = None, # x0
center_learning_rate: Optional[float] = None, # CMA_cmean
cov_learning_rate: Optional[float] = None, # CMA_on
rankmu_learning_rate: Optional[float] = None, # CMA_rankmu
rankone_learning_rate: Optional[float] = None, # CMA_rankone
stdev_min: Optional[Union[float, np.ndarray]] = None, # minstd
stdev_max: Optional[Union[float, np.ndarray]] = None, # maxstd
separable: bool = False, # CMA_diagonal
obj_index: Optional[int] = None,
cma_options: dict = {},
):
"""
`__init__(...)`: Initialize the CMAES solver.
Args:
problem: The problem object which is being worked on.
stdev_init: Initial standard deviation as a scalar or
as a 1-dimensional array.
popsize: Population size. Can be specified as an int,
or can be left as None to let the CMAES solver
decide the population size according to the length
of a solution.
center_init: Initial center point of the search distribution.
Can be given as a SolutionVector or as a 1-D array.
If left as None, an initial center point is generated
with the help of the problem object's `generate_values(...)`
method.
center_learning_rate: Learning rate for updating the mean
of the search distribution. Leaving this as None
means that the CMAES solver is to use its own default,
which is documented as 1.0.
cov_learning_rate: Learning rate for updating the covariance
matrix of the search distribution. This hyperparameter
acts as a common multiplier for rank_one update and rank_mu
update of the covariance matrix. Leaving this as None
means that the CMAES solver is to use its own default,
which is documented as 1.0.
rankmu_learning_rate: Learning rate for the rank_mu update
of the covariance matrix of the search distribution.
Leaving this as None means that the CMAES solver is to use
its own default, which is documented as 1.0.
rankone_learning_rate: Learning rate for the rank_one update
of the covariance matrix of the search distribution.
Leaving this as None means that the CMAES solver is to use
its own default, which is documented as 1.0.
stdev_min: Minimum allowed standard deviation of the search
distribution. Leaving this as None means that no such
boundary is to be used.
Can be given as None, as a scalar, or as a 1-dimensional
array.
stdev_max: Maximum allowed standard deviation of the search
distribution. Leaving this as None means that no such
boundary is to be used.
Can be given as None, as a scalar, or as a 1-dimensional
array.
separable: Provide this as True if you would like the problem
to be treated as a separable one. Treating a problem
as separable means to adapt only the diagonal parts
of the covariance matrix and to keep the non-diagonal
parts 0. High dimensional problems result in large
covariance matrices on which operating is computationally
expensive. Therefore, for such high dimensional problems,
setting `separable` as True might be useful.
If, instead, you would like to configure on which
iterations the diagonal parts of the covariance matrix
are to be adapted, then it is recommended to leave
`separable` as False and set a new value for the key
"CMA_diagonal" via `cma_options` (see the official
documentation of pycma for details regarding the
"CMA_diagonal" setting).
obj_index: Objective index according to which evaluation
of the solution will be done.
cma_options: Any other configuration for the CMAES solver
can be passed via the cma_options dictionary.
"""
# Make sure that the cma module is installed
if cma is None:
raise ImportError(f"The class {type(self).__name__} is only available if the package `cma` is installed.")
# Initialize the base class
SearchAlgorithm.__init__(self, problem, center=self._get_center)
# Initialize the population.
self._population: SolutionBatch = self._problem.generate_batch(popsize, empty=True)
# Ensure that the problem is numeric
problem.ensure_numeric()
# Store the objective index
self._obj_index = problem.normalize_obj_index(obj_index)
# If `center_init` is not given, generate an initial solution
# with the help of the problem object.
# Otherwise, use the given initial solution as the starting
# point in the search space.
if center_init is None:
x0 = self._problem.generate_values(1).to("cpu").view(-1).numpy().astype(dtype=float)
else:
x0 = numpy_copy(center_init, dtype=float)
# Store the initial standard deviations
sigma0 = numpy_copy(stdev_init, dtype=float)
# Generate an options dictionary to pass to the cma solver.
inopts = {}
for k, v in cma_options.items():
if isinstance(v, torch.Tensor):
v = numpy_copy(v, dtype=float)
inopts[k] = v
# Remove the number of iterations boundary
if "maxiter" not in inopts:
inopts["maxiter"] = np.inf
# Below is a temporary helper function for safely storing the configuration items.
# This inner function updates the `inopts` variable.
def store_opt(key: str, long_name: str, value: Any, converter: Callable):
# Here, `key` represents the configuration key used by pycma
# `long_name` represents the configuration's long name used by this class
# `value` is the configuration value associated with `key`.
# Declare that this inner function accesses the `inopts` variable.
nonlocal inopts
if value is None:
# If the provided `value` is None, then there is no configuration to store.
# So, we just leave this inner function.
return
if key in inopts:
# If the given `key` already exists within `inopts`, this means that the configuration was specified
# twice: via the keyword argument `cma_options` AND via a keyword argument.
# We raise an error and inform the user about this redundancy.
raise ValueError(
f"The configuration {repr(key)} was redundantly provided"
f" both via the initialization argument {long_name}"
f" and via the cma_options dictionary."
f" {long_name}={repr(value)};"
f" cma_options[{repr(key)}]={repr(inopts[key])}."
)
inopts[key] = converter(value)
# Temporary helper function which makes sure that `x` is a numpy array or a float.
def array_or_float(x):
if is_sequence(x):
return numpy_copy(x)
else:
return float(x)
# Store the cma configuration received through the initialization arguments (and raise error if there is
# redundancy with the cma_options dictionary).
store_opt("popsize", "popsize", popsize, int)
store_opt("CMA_cmean", "center_learning_rate", center_learning_rate, float)
store_opt("CMA_on", "cov_learning_rate", cov_learning_rate, float)
store_opt("CMA_rankmu", "rankmu_learning_rate", rankmu_learning_rate, float)
store_opt("CMA_rankone", "rankone_learning_rate", rankone_learning_rate, float)
store_opt("minstd", "stdev_min", stdev_min, array_or_float)
store_opt("maxstd", "stdev_max", stdev_max, array_or_float)
if separable:
store_opt("CMA_diagonal", "separable", separable, bool)
# If the problem defines lower and upper bounds, pass these into the options dict.
def process_bounds(bounds: RealOrVector) -> np.ndarray:
if bounds is None:
return None
else:
if is_sequence(bounds):
bounds = numpy_copy(bounds)
else:
bounds = np.array(float(bounds)).repeat(self._problem.solution_length)
return bounds
lb = process_bounds(self._problem.lower_bounds)
ub = process_bounds(self._problem.upper_bounds)
register_bounds = False
if lb is not None and ub is None:
ub = np.array(np.inf).repeat(self._problem.solution_length)
register_bounds = True
elif lb is None and ub is not None:
lb = np.array(-(np.inf)).repeat(self._problem.solution_length)
register_bounds = True
elif lb is not None and ub is not None:
register_bounds = True
if register_bounds:
inopts["bounds"] = [lb, ub]
# Generate a random seed using the problem object for the sake of reproducibility.
if "seed" not in inopts:
inopts["seed"] = int(self._problem.make_randint(tuple(), n=(2**32) - 100) + 100)
# Instantiate the CMAEvolutionStrategy with the prepared configuration items.
self._es = cma.CMAEvolutionStrategy(x0, sigma0, inopts)
# Use the SinglePopulationAlgorithmMixin to enable additional status reports regarding the population.
SinglePopulationAlgorithmMixin.__init__(self)
@property
def population(self) -> SolutionBatch:
"""Population generated by the CMA-ES algorithm"""
return self._population
def _step(self):
"""Perform a step of the CMA-ES solver"""
asked = self._es.ask()
self._population.access_values()[:] = torch.as_tensor(
np.asarray(asked), dtype=self._problem.dtype, device=self._population.device
)
self._problem.evaluate(self._population)
scores = numpy_copy(self._population.utility(self._obj_index), dtype=float)
self._es.tell(asked, -1.0 * scores)
def _get_center(self) -> torch.Tensor:
return torch.as_tensor(self._es.result[5], dtype=self._population.dtype, device=self._population.device)
@property
def obj_index(self) -> int:
"""Index of the objective being focused on"""
return self._obj_index
obj_index: int
property
readonly
¶
Index of the objective being focused on
population: SolutionBatch
property
readonly
¶
Population generated by the CMA-ES algorithm
__init__(self, problem, *, stdev_init, popsize=None, center_init=None, center_learning_rate=None, cov_learning_rate=None, rankmu_learning_rate=None, rankone_learning_rate=None, stdev_min=None, stdev_max=None, separable=False, obj_index=None, cma_options={})
special
¶
__init__(...)
: Initialize the CMAES solver.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object which is being worked on. |
required |
stdev_init |
Union[float, Iterable[float], torch.Tensor] |
Initial standard deviation as a scalar or as a 1-dimensional array. |
required |
popsize |
Optional[int] |
Population size. Can be specified as an int, or can be left as None to let the CMAES solver decide the population size according to the length of a solution. |
None |
center_init |
Union[Iterable[float], torch.Tensor] |
Initial center point of the search distribution.
Can be given as a SolutionVector or as a 1-D array.
If left as None, an initial center point is generated
with the help of the problem object's |
None |
center_learning_rate |
Optional[float] |
Learning rate for updating the mean of the search distribution. Leaving this as None means that the CMAES solver is to use its own default, which is documented as 1.0. |
None |
cov_learning_rate |
Optional[float] |
Learning rate for updating the covariance matrix of the search distribution. This hyperparameter acts as a common multiplier for rank_one update and rank_mu update of the covariance matrix. Leaving this as None means that the CMAES solver is to use its own default, which is documented as 1.0. |
None |
rankmu_learning_rate |
Optional[float] |
Learning rate for the rank_mu update of the covariance matrix of the search distribution. Leaving this as None means that the CMAES solver is to use its own default, which is documented as 1.0. |
None |
rankone_learning_rate |
Optional[float] |
Learning rate for the rank_one update of the covariance matrix of the search distribution. Leaving this as None means that the CMAES solver is to use its own default, which is documented as 1.0. |
None |
stdev_min |
Union[float, numpy.ndarray] |
Minimum allowed standard deviation of the search distribution. Leaving this as None means that no such boundary is to be used. Can be given as None, as a scalar, or as a 1-dimensional array. |
None |
stdev_max |
Union[float, numpy.ndarray] |
Maximum allowed standard deviation of the search distribution. Leaving this as None means that no such boundary is to be used. Can be given as None, as a scalar, or as a 1-dimensional array. |
None |
separable |
bool |
Provide this as True if you would like the problem
to be treated as a separable one. Treating a problem
as separable means to adapt only the diagonal parts
of the covariance matrix and to keep the non-diagonal
parts 0. High dimensional problems result in large
covariance matrices on which operating is computationally
expensive. Therefore, for such high dimensional problems,
setting |
False |
obj_index |
Optional[int] |
Objective index according to which evaluation of the solution will be done. |
None |
cma_options |
dict |
Any other configuration for the CMAES solver can be passed via the cma_options dictionary. |
{} |
Source code in evotorch/algorithms/cmaes.py
def __init__(
self,
problem: Problem,
*,
stdev_init: RealOrVector, # sigma0
popsize: Optional[int] = None, # popsize
center_init: Optional[Vector] = None, # x0
center_learning_rate: Optional[float] = None, # CMA_cmean
cov_learning_rate: Optional[float] = None, # CMA_on
rankmu_learning_rate: Optional[float] = None, # CMA_rankmu
rankone_learning_rate: Optional[float] = None, # CMA_rankone
stdev_min: Optional[Union[float, np.ndarray]] = None, # minstd
stdev_max: Optional[Union[float, np.ndarray]] = None, # maxstd
separable: bool = False, # CMA_diagonal
obj_index: Optional[int] = None,
cma_options: dict = {},
):
"""
`__init__(...)`: Initialize the CMAES solver.
Args:
problem: The problem object which is being worked on.
stdev_init: Initial standard deviation as a scalar or
as a 1-dimensional array.
popsize: Population size. Can be specified as an int,
or can be left as None to let the CMAES solver
decide the population size according to the length
of a solution.
center_init: Initial center point of the search distribution.
Can be given as a SolutionVector or as a 1-D array.
If left as None, an initial center point is generated
with the help of the problem object's `generate_values(...)`
method.
center_learning_rate: Learning rate for updating the mean
of the search distribution. Leaving this as None
means that the CMAES solver is to use its own default,
which is documented as 1.0.
cov_learning_rate: Learning rate for updating the covariance
matrix of the search distribution. This hyperparameter
acts as a common multiplier for rank_one update and rank_mu
update of the covariance matrix. Leaving this as None
means that the CMAES solver is to use its own default,
which is documented as 1.0.
rankmu_learning_rate: Learning rate for the rank_mu update
of the covariance matrix of the search distribution.
Leaving this as None means that the CMAES solver is to use
its own default, which is documented as 1.0.
rankone_learning_rate: Learning rate for the rank_one update
of the covariance matrix of the search distribution.
Leaving this as None means that the CMAES solver is to use
its own default, which is documented as 1.0.
stdev_min: Minimum allowed standard deviation of the search
distribution. Leaving this as None means that no such
boundary is to be used.
Can be given as None, as a scalar, or as a 1-dimensional
array.
stdev_max: Maximum allowed standard deviation of the search
distribution. Leaving this as None means that no such
boundary is to be used.
Can be given as None, as a scalar, or as a 1-dimensional
array.
separable: Provide this as True if you would like the problem
to be treated as a separable one. Treating a problem
as separable means to adapt only the diagonal parts
of the covariance matrix and to keep the non-diagonal
parts 0. High dimensional problems result in large
covariance matrices on which operating is computationally
expensive. Therefore, for such high dimensional problems,
setting `separable` as True might be useful.
If, instead, you would like to configure on which
iterations the diagonal parts of the covariance matrix
are to be adapted, then it is recommended to leave
`separable` as False and set a new value for the key
"CMA_diagonal" via `cma_options` (see the official
documentation of pycma for details regarding the
"CMA_diagonal" setting).
obj_index: Objective index according to which evaluation
of the solution will be done.
cma_options: Any other configuration for the CMAES solver
can be passed via the cma_options dictionary.
"""
# Make sure that the cma module is installed
if cma is None:
raise ImportError(f"The class {type(self).__name__} is only available if the package `cma` is installed.")
# Initialize the base class
SearchAlgorithm.__init__(self, problem, center=self._get_center)
# Initialize the population.
self._population: SolutionBatch = self._problem.generate_batch(popsize, empty=True)
# Ensure that the problem is numeric
problem.ensure_numeric()
# Store the objective index
self._obj_index = problem.normalize_obj_index(obj_index)
# If `center_init` is not given, generate an initial solution
# with the help of the problem object.
# Otherwise, use the given initial solution as the starting
# point in the search space.
if center_init is None:
x0 = self._problem.generate_values(1).to("cpu").view(-1).numpy().astype(dtype=float)
else:
x0 = numpy_copy(center_init, dtype=float)
# Store the initial standard deviations
sigma0 = numpy_copy(stdev_init, dtype=float)
# Generate an options dictionary to pass to the cma solver.
inopts = {}
for k, v in cma_options.items():
if isinstance(v, torch.Tensor):
v = numpy_copy(v, dtype=float)
inopts[k] = v
# Remove the number of iterations boundary
if "maxiter" not in inopts:
inopts["maxiter"] = np.inf
# Below is a temporary helper function for safely storing the configuration items.
# This inner function updates the `inopts` variable.
def store_opt(key: str, long_name: str, value: Any, converter: Callable):
# Here, `key` represents the configuration key used by pycma
# `long_name` represents the configuration's long name used by this class
# `value` is the configuration value associated with `key`.
# Declare that this inner function accesses the `inopts` variable.
nonlocal inopts
if value is None:
# If the provided `value` is None, then there is no configuration to store.
# So, we just leave this inner function.
return
if key in inopts:
# If the given `key` already exists within `inopts`, this means that the configuration was specified
# twice: via the keyword argument `cma_options` AND via a keyword argument.
# We raise an error and inform the user about this redundancy.
raise ValueError(
f"The configuration {repr(key)} was redundantly provided"
f" both via the initialization argument {long_name}"
f" and via the cma_options dictionary."
f" {long_name}={repr(value)};"
f" cma_options[{repr(key)}]={repr(inopts[key])}."
)
inopts[key] = converter(value)
# Temporary helper function which makes sure that `x` is a numpy array or a float.
def array_or_float(x):
if is_sequence(x):
return numpy_copy(x)
else:
return float(x)
# Store the cma configuration received through the initialization arguments (and raise error if there is
# redundancy with the cma_options dictionary).
store_opt("popsize", "popsize", popsize, int)
store_opt("CMA_cmean", "center_learning_rate", center_learning_rate, float)
store_opt("CMA_on", "cov_learning_rate", cov_learning_rate, float)
store_opt("CMA_rankmu", "rankmu_learning_rate", rankmu_learning_rate, float)
store_opt("CMA_rankone", "rankone_learning_rate", rankone_learning_rate, float)
store_opt("minstd", "stdev_min", stdev_min, array_or_float)
store_opt("maxstd", "stdev_max", stdev_max, array_or_float)
if separable:
store_opt("CMA_diagonal", "separable", separable, bool)
# If the problem defines lower and upper bounds, pass these into the options dict.
def process_bounds(bounds: RealOrVector) -> np.ndarray:
if bounds is None:
return None
else:
if is_sequence(bounds):
bounds = numpy_copy(bounds)
else:
bounds = np.array(float(bounds)).repeat(self._problem.solution_length)
return bounds
lb = process_bounds(self._problem.lower_bounds)
ub = process_bounds(self._problem.upper_bounds)
register_bounds = False
if lb is not None and ub is None:
ub = np.array(np.inf).repeat(self._problem.solution_length)
register_bounds = True
elif lb is None and ub is not None:
lb = np.array(-(np.inf)).repeat(self._problem.solution_length)
register_bounds = True
elif lb is not None and ub is not None:
register_bounds = True
if register_bounds:
inopts["bounds"] = [lb, ub]
# Generate a random seed using the problem object for the sake of reproducibility.
if "seed" not in inopts:
inopts["seed"] = int(self._problem.make_randint(tuple(), n=(2**32) - 100) + 100)
# Instantiate the CMAEvolutionStrategy with the prepared configuration items.
self._es = cma.CMAEvolutionStrategy(x0, sigma0, inopts)
# Use the SinglePopulationAlgorithmMixin to enable additional status reports regarding the population.
SinglePopulationAlgorithmMixin.__init__(self)
distributed
special
¶
gaussian
¶
CEM (GaussianSearchAlgorithm)
¶
The cross-entropy method (CEM) (Rubinstein, 1999).
This CEM implementation is focused on continuous optimization, and follows the variant explained in Duan et al. (2016).
The adaptive population size mechanism explained in Toklu et al. (2020)
(and previously used in the accompanying source code of the study
Salimans et al. (2017)) is supported, where the population size in an
iteration keeps increasing until a certain numberof interactions with
the simulator of the reinforcement learning environment is made.
See the initialization arguments num_interactions
, popsize_max
.
References:
Rubinstein, R. (1999). The cross-entropy method for combinatorial
and continuous optimization.
Methodology and computing in applied probability, 1(2), 127-190.
Duan, Y., Chen, X., Houthooft, R., Schulman, J., Abbeel, P. (2016).
Benchmarking deep reinforcement learning for continuous control.
International conference on machine learning. PMLR, 2016.
Salimans, T., Ho, J., Chen, X., Sidor, S. and Sutskever, I. (2017).
Evolution Strategies as a Scalable Alternative to
Reinforcement Learning.
Toklu, N.E., Liskowski, P., Srivastava, R.K. (2020).
ClipUp: A Simple and Powerful Optimizer
for Distribution-based Policy Evolution.
Parallel Problem Solving from Nature (PPSN 2020).
Source code in evotorch/algorithms/distributed/gaussian.py
class CEM(GaussianSearchAlgorithm):
"""
The cross-entropy method (CEM) (Rubinstein, 1999).
This CEM implementation is focused on continuous optimization,
and follows the variant explained in Duan et al. (2016).
The adaptive population size mechanism explained in Toklu et al. (2020)
(and previously used in the accompanying source code of the study
Salimans et al. (2017)) is supported, where the population size in an
iteration keeps increasing until a certain numberof interactions with
the simulator of the reinforcement learning environment is made.
See the initialization arguments `num_interactions`, `popsize_max`.
References:
Rubinstein, R. (1999). The cross-entropy method for combinatorial
and continuous optimization.
Methodology and computing in applied probability, 1(2), 127-190.
Duan, Y., Chen, X., Houthooft, R., Schulman, J., Abbeel, P. (2016).
Benchmarking deep reinforcement learning for continuous control.
International conference on machine learning. PMLR, 2016.
Salimans, T., Ho, J., Chen, X., Sidor, S. and Sutskever, I. (2017).
Evolution Strategies as a Scalable Alternative to
Reinforcement Learning.
Toklu, N.E., Liskowski, P., Srivastava, R.K. (2020).
ClipUp: A Simple and Powerful Optimizer
for Distribution-based Policy Evolution.
Parallel Problem Solving from Nature (PPSN 2020).
"""
DISTRIBUTION_TYPE = SeparableGaussian
DISTRIBUTION_PARAMS = NotImplemented # To be filled by the CEM instance
def __init__(
self,
problem: Problem,
*,
popsize: int,
parenthood_ratio: float,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[Union[float, RealOrVector]] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the search algorithm.
Args:
problem: The problem object to work on.
popsize: The population size.
parenthood_ratio: Expected as a float larger than 0 and smaller
than 1. For example, setting this value to 0.1 means that
the top 10% of the population will be declared as the parents,
and those parents will be used for updating the population.
The amount of parents is always computed according to the
specified `popsize`, not according to the adapted population
size, and not according to `popsize_max`.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
center_init: The initial center solution.
Can be left as None.
stdev_min: The minimum value for the standard deviation
values of the Gaussian search distribution.
Can be left as None (which is the default),
or can be given as a scalar or as a 1-dimensional array.
stdev_max: The maximum value for the standard deviation
values of the Gaussian search distribution.
Can be left as None (which is the default),
or can be given as a scalar or as a 1-dimensional array.
stdev_max_change: The maximum update ratio allowed on the
standard deviation. Expected as None if no such limiter
is needed, or as a real number within 0.0 and 1.0 otherwise.
In the PGPE implementation of Ha (2017, 2018), a value of
0.2 (20%) was used.
For this CEM implementation, the default is None.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
self.DISTRIBUTION_PARAMS = {"parenthood_ratio": float(parenthood_ratio)}
super().__init__(
problem,
popsize=popsize,
center_learning_rate=1.0,
stdev_learning_rate=1.0,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=None,
optimizer_config=None,
ranking_method=None,
center_init=center_init,
stdev_min=stdev_min,
stdev_max=stdev_max,
stdev_max_change=stdev_max_change,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
)
DISTRIBUTION_TYPE (Distribution)
¶
Separable Multivariate Gaussian, as used by PGPE
Source code in evotorch/algorithms/distributed/gaussian.py
class SeparableGaussian(Distribution):
"""Separable Multivariate Gaussian, as used by PGPE"""
MANDATORY_PARAMETERS = {"mu", "sigma"}
OPTIONAL_PARAMETERS = {"divide_mu_grad_by", "divide_sigma_grad_by", "parenthood_ratio"}
def __init__(
self,
parameters: dict,
*,
solution_length: Optional[int] = None,
device: Optional[Device] = None,
dtype: Optional[DType] = None,
):
[mu_length] = parameters["mu"].shape
[sigma_length] = parameters["sigma"].shape
if solution_length is None:
solution_length = mu_length
else:
if solution_length != mu_length:
raise ValueError(
f"The argument `solution_length` does not match the length of `mu` provided in `parameters`."
f" solution_length={solution_length},"
f' parameters["mu"]={mu_length}.'
)
if mu_length != sigma_length:
raise ValueError(
f"The tensors `mu` and `sigma` provided within `parameters` have mismatching lengths."
f' parameters["mu"]={mu_length},'
f' parameters["sigma"]={sigma_length}.'
)
super().__init__(
solution_length=solution_length,
parameters=parameters,
device=device,
dtype=dtype,
)
@property
def mu(self) -> torch.Tensor:
return self.parameters["mu"]
@mu.setter
def mu(self, new_mu: Iterable):
self.parameters["mu"] = torch.as_tensor(new_mu, dtype=self.dtype, device=self.device)
@property
def sigma(self) -> torch.Tensor:
return self.parameters["sigma"]
@sigma.setter
def sigma(self, new_sigma: Iterable):
self.parameters["sigma"] = torch.as_tensor(new_sigma, dtype=self.dtype, device=self.device)
def _fill(self, out: torch.Tensor, *, generator: Optional[torch.Generator] = None):
self.make_gaussian(out=out, center=self.mu, stdev=self.sigma, generator=generator)
def _divide_grad(self, param_name: str, grad: torch.Tensor, weights: torch.Tensor) -> torch.Tensor:
option = f"divide_{param_name}_grad_by"
if option in self.parameters:
div_by_what = self.parameters[option]
if div_by_what == "num_solutions":
[num_solutions] = weights.shape
grad = grad / num_solutions
elif div_by_what == "num_directions":
[num_solutions] = weights.shape
num_directions = num_solutions // 2
grad = grad / num_directions
elif div_by_what == "total_weight":
total_weight = torch.sum(torch.abs(weights))
grad = grad / total_weight
elif div_by_what == "weight_stdev":
weight_stdev = torch.std(weights)
grad = grad / weight_stdev
else:
raise ValueError(f"The parameter {option} has an unrecognized value: {div_by_what}")
return grad
def _compute_gradients_via_parenthood_ratio(self, samples: torch.Tensor, weights: torch.Tensor) -> dict:
[num_samples, _] = samples.shape
num_elites = math.floor(num_samples * self.parameters["parenthood_ratio"])
elite_indices = weights.argsort(descending=True)[:num_elites]
elites = samples[elite_indices, :]
return {
"mu": torch.mean(elites, dim=0) - self.parameters["mu"],
"sigma": torch.std(elites, dim=0) - self.parameters["sigma"],
}
def _compute_gradients(self, samples: torch.Tensor, weights: torch.Tensor, ranking_used: Optional[str]) -> dict:
if "parenthood_ratio" in self.parameters:
return self._compute_gradients_via_parenthood_ratio(samples, weights)
else:
mu = self.mu
sigma = self.sigma
# Compute the scaled noises, that is, the noise vectors which
# were used for generating the solutions
# (solution = scaled_noise + center)
scaled_noises = samples - mu
# Make sure that the weights (utilities) are 0-centered
# (Otherwise the formulations would have to consider a bias term)
if ranking_used not in ("centered", "normalized"):
weights = weights - torch.mean(weights)
mu_grad = self._divide_grad(
"mu",
total(dot(weights, scaled_noises)),
weights,
)
sigma_grad = self._divide_grad(
"sigma",
total(dot(weights, ((scaled_noises**2) - (sigma**2)) / sigma)),
weights,
)
return {
"mu": mu_grad,
"sigma": sigma_grad,
}
def update_parameters(
self,
gradients: dict,
*,
learning_rates: Optional[dict] = None,
optimizers: Optional[dict] = None,
) -> "SeparableGaussian":
mu_grad = gradients["mu"]
sigma_grad = gradients["sigma"]
new_mu = self.mu + self._follow_gradient("mu", mu_grad, learning_rates=learning_rates, optimizers=optimizers)
new_sigma = self.sigma + self._follow_gradient(
"sigma", sigma_grad, learning_rates=learning_rates, optimizers=optimizers
)
return self.modified_copy(mu=new_mu, sigma=new_sigma)
def relative_entropy(dist_0: "SeparableGaussian", dist_1: "SeparableGaussian") -> float:
mu_0 = dist_0.parameters["mu"]
mu_1 = dist_1.parameters["mu"]
sigma_0 = dist_0.parameters["sigma"]
sigma_1 = dist_1.parameters["sigma"]
cov_0 = sigma_0.pow(2.0)
cov_1 = sigma_1.pow(2.0)
mu_delta = mu_1 - mu_0
trace_cov = torch.sum(cov_0 / cov_1)
k = dist_0.solution_length
scaled_mu = torch.sum(mu_delta.pow(2.0) / cov_1)
log_det = torch.sum(torch.log(cov_1)) - torch.sum(torch.log(cov_0))
return 0.5 * (trace_cov - k + scaled_mu + log_det)
update_parameters(self, gradients, *, learning_rates=None, optimizers=None)
¶
Do an update on the distribution by following the given gradients.
It is expected that the inheriting class has its own implementation for this method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gradients |
dict |
Gradients, as a dictionary, which will be used for computing the necessary updates. |
required |
learning_rates |
Optional[dict] |
A dictionary which contains learning rates for parameters that will be updated using a learning rate coefficient. |
None |
optimizers |
Optional[dict] |
A dictionary which contains optimizer objects for parameters that will be updated using an adaptive optimizer. |
None |
Returns:
Type | Description |
---|---|
SeparableGaussian |
The updated copy of the distribution. |
Source code in evotorch/algorithms/distributed/gaussian.py
def update_parameters(
self,
gradients: dict,
*,
learning_rates: Optional[dict] = None,
optimizers: Optional[dict] = None,
) -> "SeparableGaussian":
mu_grad = gradients["mu"]
sigma_grad = gradients["sigma"]
new_mu = self.mu + self._follow_gradient("mu", mu_grad, learning_rates=learning_rates, optimizers=optimizers)
new_sigma = self.sigma + self._follow_gradient(
"sigma", sigma_grad, learning_rates=learning_rates, optimizers=optimizers
)
return self.modified_copy(mu=new_mu, sigma=new_sigma)
__init__(self, problem, *, popsize, parenthood_ratio, stdev_init=None, radius_init=None, num_interactions=None, popsize_max=None, center_init=None, stdev_min=None, stdev_max=None, stdev_max_change=None, obj_index=None, distributed=False, popsize_weighted_grad_avg=None)
special
¶
__init__(...)
: Initialize the search algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object to work on. |
required |
popsize |
int |
The population size. |
required |
parenthood_ratio |
float |
Expected as a float larger than 0 and smaller
than 1. For example, setting this value to 0.1 means that
the top 10% of the population will be declared as the parents,
and those parents will be used for updating the population.
The amount of parents is always computed according to the
specified |
required |
stdev_init |
Union[float, Iterable[float], torch.Tensor] |
The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
radius_init |
Union[float, Iterable[float], torch.Tensor] |
The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
num_interactions |
Optional[int] |
When given as an integer n, it is ensured that a population has interacted with the GymProblem's environment n times. If this target has not been reached yet, then the population is declared too small, and gets extended with more samples, until n amount of interactions is reached. When given as None, popsize is the only configuration affecting the size of a population. |
None |
popsize_max |
Optional[int] |
Having |
None |
center_init |
Union[float, Iterable[float], torch.Tensor] |
The initial center solution. Can be left as None. |
None |
stdev_min |
Union[float, Iterable[float], torch.Tensor] |
The minimum value for the standard deviation values of the Gaussian search distribution. Can be left as None (which is the default), or can be given as a scalar or as a 1-dimensional array. |
None |
stdev_max |
Union[float, Iterable[float], torch.Tensor] |
The maximum value for the standard deviation values of the Gaussian search distribution. Can be left as None (which is the default), or can be given as a scalar or as a 1-dimensional array. |
None |
stdev_max_change |
Union[float, Iterable[float], torch.Tensor] |
The maximum update ratio allowed on the standard deviation. Expected as None if no such limiter is needed, or as a real number within 0.0 and 1.0 otherwise. In the PGPE implementation of Ha (2017, 2018), a value of 0.2 (20%) was used. For this CEM implementation, the default is None. |
None |
obj_index |
Optional[int] |
Index of the objective according to which the gradient estimations will be done. For single-objective problems, this can be left as None. |
None |
distributed |
bool |
Whether or not the gradient computation will
be distributed. If |
False |
popsize_weighted_grad_avg |
Optional[bool] |
Only to be used in distributed mode.
(where being in distributed mode means |
None |
Source code in evotorch/algorithms/distributed/gaussian.py
def __init__(
self,
problem: Problem,
*,
popsize: int,
parenthood_ratio: float,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[Union[float, RealOrVector]] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the search algorithm.
Args:
problem: The problem object to work on.
popsize: The population size.
parenthood_ratio: Expected as a float larger than 0 and smaller
than 1. For example, setting this value to 0.1 means that
the top 10% of the population will be declared as the parents,
and those parents will be used for updating the population.
The amount of parents is always computed according to the
specified `popsize`, not according to the adapted population
size, and not according to `popsize_max`.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
center_init: The initial center solution.
Can be left as None.
stdev_min: The minimum value for the standard deviation
values of the Gaussian search distribution.
Can be left as None (which is the default),
or can be given as a scalar or as a 1-dimensional array.
stdev_max: The maximum value for the standard deviation
values of the Gaussian search distribution.
Can be left as None (which is the default),
or can be given as a scalar or as a 1-dimensional array.
stdev_max_change: The maximum update ratio allowed on the
standard deviation. Expected as None if no such limiter
is needed, or as a real number within 0.0 and 1.0 otherwise.
In the PGPE implementation of Ha (2017, 2018), a value of
0.2 (20%) was used.
For this CEM implementation, the default is None.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
self.DISTRIBUTION_PARAMS = {"parenthood_ratio": float(parenthood_ratio)}
super().__init__(
problem,
popsize=popsize,
center_learning_rate=1.0,
stdev_learning_rate=1.0,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=None,
optimizer_config=None,
ranking_method=None,
center_init=center_init,
stdev_min=stdev_min,
stdev_max=stdev_max,
stdev_max_change=stdev_max_change,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
)
GaussianSearchAlgorithm (SearchAlgorithm, SinglePopulationAlgorithmMixin)
¶
Base class for search algorithms based on Gaussian distribution.
Source code in evotorch/algorithms/distributed/gaussian.py
class GaussianSearchAlgorithm(SearchAlgorithm, SinglePopulationAlgorithmMixin):
"""
Base class for search algorithms based on Gaussian distribution.
"""
DISTRIBUTION_TYPE = NotImplemented
DISTRIBUTION_PARAMS = NotImplemented
def __init__(
self,
problem: Problem,
*,
popsize: int,
center_learning_rate: float,
stdev_learning_rate: float,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer=None,
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = None,
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[RealOrVector] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
ensure_even_popsize: bool = False,
):
# Ensure that the problem is numeric
problem.ensure_numeric()
# The distribution-based algorithms we consider here cannot handle strict lower and upper bound constraints.
# Therefore, we ensure that the given problem is unbounded.
problem.ensure_unbounded()
# Initialize the SearchAlgorithm, which is the parent class
SearchAlgorithm.__init__(
self,
problem,
center=self._get_mu,
stdev=self._get_sigma,
mean_eval=self._get_mean_eval,
)
self._ensure_even_popsize = bool(ensure_even_popsize)
if not distributed:
# self.add_status_getters({"median_eval": self._get_median_eval})
if num_interactions is not None:
self.add_status_getters({"popsize": self._get_popsize})
if self._ensure_even_popsize:
if (popsize % 2) != 0:
raise ValueError(
f"`popsize` was expected as an even number. However, the received `popsize` is {popsize}."
)
if center_init is None:
# If a starting point for the search distribution is not given,
# then we use the problem object to generate us one.
mu = problem.generate_values(1).reshape(-1)
else:
# If a starting point for the search distribution is given,
# then we make sure that its length, dtype, and device
# are correct.
mu = problem.ensure_tensor_length_and_dtype(center_init, allow_scalar=False, about="center_init")
# Get the standard deviation or the radius configuration from the arguments
stdev_init = to_stdev_init(
solution_length=problem.solution_length, stdev_init=stdev_init, radius_init=radius_init
)
# Make sure that the provided initial standard deviation is
# of correct length, dtype, and device.
sigma = problem.ensure_tensor_length_and_dtype(stdev_init, about="stdev_init", allow_scalar=False)
# Create the distribution
dist_cls = self.DISTRIBUTION_TYPE
dist_params = deepcopy(self.DISTRIBUTION_PARAMS) if self.DISTRIBUTION_PARAMS is not None else {}
dist_params.update({"mu": mu, "sigma": sigma})
self._distribution: Distribution = dist_cls(dist_params, dtype=problem.dtype, device=problem.device)
# Store the following keyword arguments to use later
self._popsize = int(popsize)
self._popsize_max = None if popsize_max is None else int(popsize_max)
self._num_interactions = None if num_interactions is None else int(num_interactions)
self._center_learning_rate = float(center_learning_rate)
self._stdev_learning_rate = float(stdev_learning_rate)
self._optimizer = self._initialize_optimizer(self._center_learning_rate, optimizer, optimizer_config)
self._ranking_method = None if ranking_method is None else str(ranking_method)
self._stdev_min = (
None
if stdev_min is None
else problem.ensure_tensor_length_and_dtype(stdev_min, about="stdev_min", allow_scalar=True)
)
self._stdev_max = (
None
if stdev_max is None
else problem.ensure_tensor_length_and_dtype(stdev_max, about="stdev_max", allow_scalar=True)
)
self._stdev_max_change = (
None
if stdev_max_change is None
else problem.ensure_tensor_length_and_dtype(stdev_max_change, about="stdev_max_change", allow_scalar=True)
)
self._obj_index = problem.normalize_obj_index(obj_index)
if distributed and (problem.num_actors > 0):
# If the algorithm is initialized in distributed mode, and also if the problem is configured
# for parallelization, then the _step method becomes an alias for _step_distributed
self._step = self._step_distributed
else:
# Otherwise, the _step method becomes an alias for _step_non_distributed
self._step = self._step_non_distributed
if popsize_weighted_grad_avg is None:
self._popsize_weighted_grad_avg = num_interactions is None
else:
if not distributed:
raise ValueError(
"The argument `popsize_weighted_grad_avg` can only be used in distributed mode."
" (i.e. when the argument `distributed` is given as True)."
" When `distributed` is False, please leave `popsize_weighted_grad_avg` as None."
)
self._popsize_weighted_grad_avg = bool(popsize_weighted_grad_avg)
self._mean_eval: Optional[float] = None
self._population: Optional[SolutionBatch] = None
self._first_iter: bool = True
# We would like to add the reporting capabilities of the mixin class `singlePopulationAlgorithmMixin`.
# However, we exclude "mean_eval" from the reporting services requested from `SinglePopulationAlgorithmMixin`
# because this class has its own reporting mechanism for `mean_eval`.
# Additionally, we enable the reporting services of `SinglePopulationAlgorithmMixin` only when we are
# in the non-distributed mode. This is because we do not have a centrally stored population at all in the
# distributed mode.
SinglePopulationAlgorithmMixin.__init__(self, exclude="mean_eval", enable=(not distributed))
def _initialize_optimizer(
self, learning_rate: float, optimizer=None, optimizer_config: Optional[dict] = None
) -> object:
if optimizer is None:
return None
elif isinstance(optimizer, str):
center_optim_cls = get_optimizer_class(optimizer, optimizer_config)
return center_optim_cls(
stepsize=float(learning_rate),
dtype=self._distribution.dtype,
solution_length=self._distribution.solution_length,
device=self._distribution.device,
)
else:
return optimizer
def _step(self):
raise NotImplementedError
def _step_distributed(self):
# Use the problem object's `sample_and_compute_gradients` method
# to do parallelized and distributed gradient computation
fetched = self.problem.sample_and_compute_gradients(
self._distribution,
self._popsize,
popsize_max=self._popsize_max,
obj_index=self._obj_index,
num_interactions=self._num_interactions,
ranking_method=self._ranking_method,
ensure_even_popsize=self._ensure_even_popsize,
)
# The method `sample_and_compute_gradients(...)` returns a list of dictionaries, each dictionary being
# the result of a different remote computation.
# For each remote computation, the list will contain a dictionary that looks like this:
# {"gradients": <gradients dictionary here>, "num_solutions": ..., "mean_eval": ...}
# We will now accumulate all the gradients, num_solutions, and mean_evals in their own lists.
# So, in the end, we will have a list of gradients, a list of num_solutions, and a list of
# mean_eval.
# These lists will be stored by the following temporary class:
class list_of:
gradients = []
num_solutions = []
mean_eval = []
# We are now filling the lists declared above
n = len(fetched)
for i in range(n):
list_of.gradients.append(fetched[i]["gradients"])
list_of.num_solutions.append(fetched[i]["num_solutions"])
list_of.mean_eval.append(fetched[i]["mean_eval"])
# Here, we get the keys of our gradient dictionaries.
# For most simple Gaussian distributions, grad_keys should be {"mu", "sigma"}.
grad_keys = set(list_of.gradients[0].keys())
# We now find the total number of solutions and the overall average mean_eval.
# The overall average mean will be reported to the user.
total_num_solutions = 0
total_weighted_eval = 0
for i in range(n):
total_num_solutions += list_of.num_solutions[i]
total_weighted_eval += float(list_of.num_solutions[i] * list_of.mean_eval[i])
avg_mean_eval = total_weighted_eval / total_num_solutions
# For each gradient (in most cases among 'mu' and 'sigma'), we allocate a new 0-filled tensor.
avg_gradients = {}
for key in grad_keys:
avg_gradients[key] = self._distribution.make_zeros(num_solutions=1).reshape(-1)
# Below, we iterate over all collected results and add their gradients, in a weighted manner, onto the
# `avg_gradients` we allocated above.
# At the end, `avg_gradients` will store the weighted-averaged gradients to be followed by the algorithm.
for i in range(n):
# For each collected result, we compute a weight for the gradient, which is the number of solutions
# sampled divided by the total number of sampled solutions.
num_solutions = list_of.num_solutions[i]
if self._popsize_weighted_grad_avg:
# If we are to weigh each gradient by its popsize (i.e. by its sample size)
# then the its weight is computed as its number of solutions divided by the
# total number of solutions
weight = num_solutions / total_num_solutions
else:
# If we are NOT to weigh each gradient by its popsize (i.e. by its sample size)
# then the weight of this gradient simply becomes 1 divided by the number of gradients.
weight = 1 / n
for key in grad_keys:
grad = list_of.gradients[i][key]
avg_gradients[key] += weight * grad
self._update_distribution(avg_gradients)
self._mean_eval = avg_mean_eval
def _step_non_distributed(self):
# First, we define an inner function which fills the current population by sampling from the distribution.
def fill_and_eval_pop():
# This inner function is responsible for filling the main population with samples
# and evaluate them.
if self._num_interactions is None:
# If num_interactions is configured as None, this means that we are not going to adapt
# the population size according to the number of simulation interactions reported
# by the problem object.
# We first make sure that the population (which is to be of constant size, since we are
# not in the adaptive population size mode) is allocated.
if self._population is None:
self._population = SolutionBatch(
self.problem, popsize=self._popsize, device=self._distribution.device, empty=True
)
# Now, we do in-place sampling on the population.
self._distribution.sample(out=self._population.access_values(), generator=self.problem)
# Finally, here, the solutions are evaluated.
self.problem.evaluate(self._population)
else:
# If num_interactions is not None, then this means that we have a threshold for the number
# of simulator interactions to reach before declaring the phase of sampling complete.
# In other words, we have to adapt our population size according to the number of simulator
# interactions reported by the problem object.
# The 'total_interaction_count' status reported by the problem object shows the global interaction count.
# Therefore, to properly count the simulator interactions we made during this generation, we need
# to get the interaction count before starting our sampling and evaluation operations.
first_num_interactions = self.problem.status.get("total_interaction_count", 0)
# We will keep allocating and evaluating new populations until the interaction count threshold is reached.
# These newly allocated populations will eventually concatenated into one.
# The not-yet-concatenated populations and the total allocated population size will be stored below:
populations = []
total_popsize = 0
# Below, we repeatedly allocate, sample, and evaluate, until our thresholds are reached.
while True:
# Allocate a new population
newpop = SolutionBatch(
self.problem,
popsize=self._popsize,
like=self._population,
empty=True,
)
# Update the total population size
total_popsize += len(newpop)
# Sample new solutions within the newly allocated population
self._distribution.sample(out=newpop.access_values(), generator=self.problem)
# Evaluate the new population
self.problem.evaluate(newpop)
# Add the newly allocated and evaluated population into the populations list
populations.append(newpop)
# In addition to the num_interactions threshold, we might also have a popsize_max threshold.
# We now check this threshold.
if (self._popsize_max is not None) and (total_popsize >= self._popsize_max):
# If the popsize_max threshold is reached, we leave the loop.
break
# We now compute the number of interactions we have made during this while loop.
interactions_made = self.problem.status["total_interaction_count"] - first_num_interactions
if interactions_made > self._num_interactions:
# If the number of interactions exceeds our threshold, we leave the loop.
break
# Finally, we concatenate all our populations into one.
self._population = SolutionBatch.cat(populations)
if self._first_iter:
# If we are computing the first generation, we just sample from our distribution and evaluate
# the solutions.
fill_and_eval_pop()
self._first_iter = False
else:
# If we are computing next generations, then we need to compute the gradients of the last
# generation, sample a new population, and evaluate the new population's solutions.
samples = self._population.access_values(keep_evals=True)
fitnesses = self._population.access_evals()[:, self._obj_index]
obj_sense = self.problem.senses[self._obj_index]
ranking_method = self._ranking_method
gradients = self._distribution.compute_gradients(
samples, fitnesses, objective_sense=obj_sense, ranking_method=ranking_method
)
self._update_distribution(gradients)
fill_and_eval_pop()
def _update_distribution(self, gradients: dict):
# This is where we follow the gradients with the help of the stored Distribution object.
# First, we check whether or not we will need to do a controlled update on the
# standard deviation (do we have imposed lower and upper bounds for the standard deviation,
# and do we have a maximum change limiter?)
controlled_stdev_update = (
(self._stdev_min is not None) or (self._stdev_max is not None) or (self._stdev_max_change is not None)
)
if controlled_stdev_update:
# If the standard deviation update needs to be controlled, we store the standard deviation just before
# the update. We will use this later.
old_sigma = self._distribution.sigma
# Here, we determine for which distribution parameter we have a learning rate and for which distribution
# parameter we have an optimizer.
learning_rates = {}
optimizers = {}
if self._optimizer is not None:
# If there is an optimizer, then we declare that "mu" has an optimizer
optimizers["mu"] = self._optimizer
else:
# If we do not have an optimizer, then we declare that "mu" has a raw learning rate coefficient
learning_rates["mu"] = self._center_learning_rate
# Here, we declare that "sigma" has a learning rate
learning_rates["sigma"] = self._stdev_learning_rate
# With the help of the Distribution object's `update_parameters(...)` method, we follow the gradients
updated_dist = self._distribution.update_parameters(
gradients, learning_rates=learning_rates, optimizers=optimizers
)
if controlled_stdev_update:
# If our standard deviation update needs to be controlled, then, considering the pre-update
# standard deviation, we ensure that the update constraints (lower and upper bounds and maximum change)
# are not violated.
updated_dist = updated_dist.modified_copy(
sigma=modify_tensor(
old_sigma,
updated_dist.sigma,
lb=self._stdev_min,
ub=self._stdev_max,
max_change=self._stdev_max_change,
)
)
# Now we can declare that our main distribution is the updated one
self._distribution = updated_dist
def _get_mu(self) -> torch.Tensor:
return self._distribution.parameters["mu"]
def _get_sigma(self) -> torch.Tensor:
return self._distribution.parameters["sigma"]
def _get_mean_eval(self) -> Optional[float]:
if self._population is None:
return self._mean_eval
else:
return float(torch.mean(self._population.evals[:, self._obj_index]))
# def _get_median_eval(self) -> Optional[float]:
# if self._population is None:
# return None
# else:
# return float(torch.median(self._population.evals[:, self._obj_index]))
def _get_popsize(self) -> int:
return 0 if self._population is None else len(self._population)
@property
def population(self) -> Optional[SolutionBatch]:
"""
The population, represented by a SolutionBatch.
If the population is not initialized yet, the retrieved value will
be None.
Also note that, if this algorithm is in distributed mode, the
retrieved value will be None, since the distributed mode causes the
population to be generated in the remote actors, and not in the main
process.
"""
return self._population
@property
def obj_index(self) -> int:
"""
Index of the focused objective
"""
return self._obj_index
obj_index: int
property
readonly
¶
Index of the focused objective
population: Optional[evotorch.core.SolutionBatch]
property
readonly
¶
The population, represented by a SolutionBatch.
If the population is not initialized yet, the retrieved value will be None. Also note that, if this algorithm is in distributed mode, the retrieved value will be None, since the distributed mode causes the population to be generated in the remote actors, and not in the main process.
PGPE (GaussianSearchAlgorithm)
¶
This implementation is the symmetric-sampling variant proposed in the paper Sehnke et al. (2010).
Inspired by the PGPE implementations used in the studies of Ha (2017, 2019), and by the evolution strategy variant of Salimans et al. (2017), this PGPE implementation uses 0-centered ranking by default. The default optimizer for this PGPE implementation is ClipUp (Toklu et al., 2020).
References:
Frank Sehnke, Christian Osendorfer, Thomas Ruckstiess,
Alex Graves, Jan Peters, Jurgen Schmidhuber (2010).
Parameter-exploring Policy Gradients.
Neural Networks 23(4), 551-559.
David Ha (2017). Evolving Stable Strategies.
<http://blog.otoro.net/2017/11/12/evolving-stable-strategies/>
Salimans, T., Ho, J., Chen, X., Sidor, S. and Sutskever, I. (2017).
Evolution Strategies as a Scalable Alternative to
Reinforcement Learning.
David Ha (2019). Reinforcement Learning for Improving Agent Design.
Artificial life 25 (4), 352-365.
Toklu, N.E., Liskowski, P., Srivastava, R.K. (2020).
ClipUp: A Simple and Powerful Optimizer
for Distribution-based Policy Evolution.
Parallel Problem Solving from Nature (PPSN 2020).
Source code in evotorch/algorithms/distributed/gaussian.py
class PGPE(GaussianSearchAlgorithm):
"""
PGPE: Policy gradient with parameter-based exploration.
This implementation is the symmetric-sampling variant proposed
in the paper Sehnke et al. (2010).
Inspired by the PGPE implementations used in the studies
of Ha (2017, 2019), and by the evolution strategy variant of
Salimans et al. (2017), this PGPE implementation uses 0-centered
ranking by default.
The default optimizer for this PGPE implementation is ClipUp
(Toklu et al., 2020).
References:
Frank Sehnke, Christian Osendorfer, Thomas Ruckstiess,
Alex Graves, Jan Peters, Jurgen Schmidhuber (2010).
Parameter-exploring Policy Gradients.
Neural Networks 23(4), 551-559.
David Ha (2017). Evolving Stable Strategies.
<http://blog.otoro.net/2017/11/12/evolving-stable-strategies/>
Salimans, T., Ho, J., Chen, X., Sidor, S. and Sutskever, I. (2017).
Evolution Strategies as a Scalable Alternative to
Reinforcement Learning.
David Ha (2019). Reinforcement Learning for Improving Agent Design.
Artificial life 25 (4), 352-365.
Toklu, N.E., Liskowski, P., Srivastava, R.K. (2020).
ClipUp: A Simple and Powerful Optimizer
for Distribution-based Policy Evolution.
Parallel Problem Solving from Nature (PPSN 2020).
"""
DISTRIBUTION_TYPE = NotImplemented # To be filled by the PGPE instance
DISTRIBUTION_PARAMS = NotImplemented # To be filled by the PGPE instance
def __init__(
self,
problem: Problem,
*,
popsize: int,
center_learning_rate: float,
stdev_learning_rate: float,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer="clipup",
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = "centered",
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[RealOrVector] = 0.2,
symmetric: bool = True,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the PGPE algorithm.
Args:
problem: The problem object which is being worked on.
The problem must have its dtype defined
(which means it works on Solution objects,
not with custom Solution objects).
Also, the problem must be single-objective.
popsize: The population size.
In the case of PGPE, `popsize` is expected as an even number
in non-distributed mode. In distributed mode, PGPE will
ensure that each sub-population size assigned to a remote
actor is an even number.
This behavior is because PGPE does symmetric sampling
(i.e. solutions are sampled in pairs).
center_learning_rate: The learning rate for the center
of the search distribution.
stdev_learning_rate: The learning rate for the standard
deviation values of the search distribution.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
optimizer: The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
As in the study of Salimans et al. (2017),
the default is 'clipup'.
Note that, for ClipUp, the default maximum speed is set
as twice the given `center_learning_rate`.
This maximum speed can be configured by passing
`{"max_speed": ...}` to `optimizer_config`.
optimizer_config: Configuration which will be passed
to the optimizer as keyword arguments.
See `evotorch.optimizers` for details about
which optimizer accepts which keyword arguments.
ranking_method: Which ranking method will be used for
fitness shaping. See the documentation of
`evotorch.ranking.rank(...)` for details.
As in the study of Salimans et al. (2017),
the default is 'centered'.
Can be given as None if no such ranking is required.
center_init: The initial center solution.
Can be left as None.
stdev_min: Lower bound for the standard deviation value/array.
Can be given as a real number, or as an array of real numbers.
stdev_max: Upper bound for the standard deviation value/array.
Can be given as a real number, or as an array of real numbers.
stdev_max_change: The maximum update ratio allowed on the
standard deviation. Expected as None if no such limiter
is needed, or as a real number within 0.0 and 1.0 otherwise.
Like in the implementation of Ha (2017, 2018),
the default value for this setting is 0.2, meaning that
the update on the standard deviation values can not be
more than 20% of their original values.
device: The device in which the population is to be stored.
The default value is 'cpu'.
symmetric: Whether or not the solutions will be sampled
in a symmetric/mirrored/antithetic manner.
The default is True.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
if symmetric:
self.DISTRIBUTION_TYPE = SymmetricSeparableGaussian
divide_by = "num_directions"
else:
self.DISTRIBUTION_TYPE = SeparableGaussian
divide_by = "num_solutions"
self.DISTRIBUTION_PARAMS = {"divide_mu_grad_by": divide_by, "divide_sigma_grad_by": divide_by}
super().__init__(
problem,
popsize=popsize,
center_learning_rate=center_learning_rate,
stdev_learning_rate=stdev_learning_rate,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=optimizer,
optimizer_config=optimizer_config,
ranking_method=ranking_method,
center_init=center_init,
stdev_min=stdev_min,
stdev_max=stdev_max,
stdev_max_change=stdev_max_change,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
ensure_even_popsize=symmetric,
)
__init__(self, problem, *, popsize, center_learning_rate, stdev_learning_rate, stdev_init=None, radius_init=None, num_interactions=None, popsize_max=None, optimizer='clipup', optimizer_config=None, ranking_method='centered', center_init=None, stdev_min=None, stdev_max=None, stdev_max_change=0.2, symmetric=True, obj_index=None, distributed=False, popsize_weighted_grad_avg=None)
special
¶
__init__(...)
: Initialize the PGPE algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object which is being worked on. The problem must have its dtype defined (which means it works on Solution objects, not with custom Solution objects). Also, the problem must be single-objective. |
required |
popsize |
int |
The population size.
In the case of PGPE, |
required |
center_learning_rate |
float |
The learning rate for the center of the search distribution. |
required |
stdev_learning_rate |
float |
The learning rate for the standard deviation values of the search distribution. |
required |
stdev_init |
Union[float, Iterable[float], torch.Tensor] |
The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
radius_init |
Union[float, Iterable[float], torch.Tensor] |
The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
num_interactions |
Optional[int] |
When given as an integer n, it is ensured that a population has interacted with the GymProblem's environment n times. If this target has not been reached yet, then the population is declared too small, and gets extended with more samples, until n amount of interactions is reached. When given as None, popsize is the only configuration affecting the size of a population. |
None |
popsize_max |
Optional[int] |
Having |
None |
optimizer |
The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
As in the study of Salimans et al. (2017),
the default is 'clipup'.
Note that, for ClipUp, the default maximum speed is set
as twice the given |
'clipup' |
|
optimizer_config |
Optional[dict] |
Configuration which will be passed
to the optimizer as keyword arguments.
See |
None |
ranking_method |
Optional[str] |
Which ranking method will be used for
fitness shaping. See the documentation of
|
'centered' |
center_init |
Union[float, Iterable[float], torch.Tensor] |
The initial center solution. Can be left as None. |
None |
stdev_min |
Union[float, Iterable[float], torch.Tensor] |
Lower bound for the standard deviation value/array. Can be given as a real number, or as an array of real numbers. |
None |
stdev_max |
Union[float, Iterable[float], torch.Tensor] |
Upper bound for the standard deviation value/array. Can be given as a real number, or as an array of real numbers. |
None |
stdev_max_change |
Union[float, Iterable[float], torch.Tensor] |
The maximum update ratio allowed on the standard deviation. Expected as None if no such limiter is needed, or as a real number within 0.0 and 1.0 otherwise. Like in the implementation of Ha (2017, 2018), the default value for this setting is 0.2, meaning that the update on the standard deviation values can not be more than 20% of their original values. |
0.2 |
device |
The device in which the population is to be stored. The default value is 'cpu'. |
required | |
symmetric |
bool |
Whether or not the solutions will be sampled in a symmetric/mirrored/antithetic manner. The default is True. |
True |
obj_index |
Optional[int] |
Index of the objective according to which the gradient estimations will be done. For single-objective problems, this can be left as None. |
None |
distributed |
bool |
Whether or not the gradient computation will
be distributed. If |
False |
popsize_weighted_grad_avg |
Optional[bool] |
Only to be used in distributed mode.
(where being in distributed mode means |
None |
Source code in evotorch/algorithms/distributed/gaussian.py
def __init__(
self,
problem: Problem,
*,
popsize: int,
center_learning_rate: float,
stdev_learning_rate: float,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer="clipup",
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = "centered",
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[RealOrVector] = 0.2,
symmetric: bool = True,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the PGPE algorithm.
Args:
problem: The problem object which is being worked on.
The problem must have its dtype defined
(which means it works on Solution objects,
not with custom Solution objects).
Also, the problem must be single-objective.
popsize: The population size.
In the case of PGPE, `popsize` is expected as an even number
in non-distributed mode. In distributed mode, PGPE will
ensure that each sub-population size assigned to a remote
actor is an even number.
This behavior is because PGPE does symmetric sampling
(i.e. solutions are sampled in pairs).
center_learning_rate: The learning rate for the center
of the search distribution.
stdev_learning_rate: The learning rate for the standard
deviation values of the search distribution.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
optimizer: The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
As in the study of Salimans et al. (2017),
the default is 'clipup'.
Note that, for ClipUp, the default maximum speed is set
as twice the given `center_learning_rate`.
This maximum speed can be configured by passing
`{"max_speed": ...}` to `optimizer_config`.
optimizer_config: Configuration which will be passed
to the optimizer as keyword arguments.
See `evotorch.optimizers` for details about
which optimizer accepts which keyword arguments.
ranking_method: Which ranking method will be used for
fitness shaping. See the documentation of
`evotorch.ranking.rank(...)` for details.
As in the study of Salimans et al. (2017),
the default is 'centered'.
Can be given as None if no such ranking is required.
center_init: The initial center solution.
Can be left as None.
stdev_min: Lower bound for the standard deviation value/array.
Can be given as a real number, or as an array of real numbers.
stdev_max: Upper bound for the standard deviation value/array.
Can be given as a real number, or as an array of real numbers.
stdev_max_change: The maximum update ratio allowed on the
standard deviation. Expected as None if no such limiter
is needed, or as a real number within 0.0 and 1.0 otherwise.
Like in the implementation of Ha (2017, 2018),
the default value for this setting is 0.2, meaning that
the update on the standard deviation values can not be
more than 20% of their original values.
device: The device in which the population is to be stored.
The default value is 'cpu'.
symmetric: Whether or not the solutions will be sampled
in a symmetric/mirrored/antithetic manner.
The default is True.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
if symmetric:
self.DISTRIBUTION_TYPE = SymmetricSeparableGaussian
divide_by = "num_directions"
else:
self.DISTRIBUTION_TYPE = SeparableGaussian
divide_by = "num_solutions"
self.DISTRIBUTION_PARAMS = {"divide_mu_grad_by": divide_by, "divide_sigma_grad_by": divide_by}
super().__init__(
problem,
popsize=popsize,
center_learning_rate=center_learning_rate,
stdev_learning_rate=stdev_learning_rate,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=optimizer,
optimizer_config=optimizer_config,
ranking_method=ranking_method,
center_init=center_init,
stdev_min=stdev_min,
stdev_max=stdev_max,
stdev_max_change=stdev_max_change,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
ensure_even_popsize=symmetric,
)
SNES (GaussianSearchAlgorithm)
¶
Inspired by the implementation at: http://schaul.site44.com/code/snes.py
Reference:
Schaul, T., Glasmachers, T., Schmidhuber, J. (2011).
High Dimensions and Heavy Tails for Natural Evolution Strategies.
Proceedings of the 13th annual conference on Genetic and evolutionary
computation (GECCO 2011).
Source code in evotorch/algorithms/distributed/gaussian.py
class SNES(GaussianSearchAlgorithm):
"""
SNES: Separable Natural Evolution Strategies
Inspired by the implementation at: http://schaul.site44.com/code/snes.py
Reference:
Schaul, T., Glasmachers, T., Schmidhuber, J. (2011).
High Dimensions and Heavy Tails for Natural Evolution Strategies.
Proceedings of the 13th annual conference on Genetic and evolutionary
computation (GECCO 2011).
"""
DISTRIBUTION_TYPE = ExpSeparableGaussian
DISTRIBUTION_PARAMS = None
def __init__(
self,
problem: Problem,
*,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
popsize: Optional[int] = None,
center_learning_rate: Optional[float] = None,
stdev_learning_rate: Optional[float] = None,
scale_learning_rate: bool = True,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer=None,
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = "nes",
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[RealOrVector] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the SNES algorithm.
Args:
problem: The problem object which is being worked on.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
popsize: Population size. Can be specified as an int,
or can be left as None to let the solver decide.
In the case of SNES, `popsize` can be left as None,
in which case the default `popsize` will be computed
as `4 + floor(3 * log(n))` where `n` is the length
of a solution.
center_learning_rate: Learning rate for updating the mean
of the search distribution. Default value is 1.0
stdev_learning_rate: Learning rate for updating the covariance
matrix of the search distribution.
The default value is `0.2 * (3 + log(n)) / sqrt(n)`
where `n` is the length of a solution.
scale_learning_rate: For SNES, there is a default standard
deviation learning rate value which is computed as
`0.2 * (3 + log(n)) / sqrt(n)` (where `n` is the solution
length).
If scale_learning_rate is True (which is the default),
then the effective learning rate for the standard deviation
becomes the provided `stdev_learning_rate` multiplied by this
default value. If `scale_learning_rate` is False, then the
effective standard deviation learning rate becomes
equal to the provided `stdev_learning_rate` value.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
optimizer: The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
As in the study of Salimans et al. (2017),
the default is 'clipup'.
Note that, for ClipUp, the default maximum speed is set
as twice the given `center_learning_rate`.
This maximum speed can be configured by passing
`{"max_speed": ...}` to `optimizer_config`.
optimizer_config: Configuration which will be passed
to the optimizer as keyword arguments.
See `evotorch.optimizers` for details about
which optimizer accepts which keyword arguments.
ranking_method: Which ranking method will be used for
fitness shaping. See the documentation of
`evotorch.ranking.rank(...)` for details.
As in the study of Salimans et al. (2017),
the default is 'centered'.
Can be given as None if no such ranking is required.
center_init: The initial center solution.
Can be left as None.
stdev_min: Minimum values for the standard deviation.
Expected as a 1-dimensional array to serve as a limiter
to the diagonals of the covariance matrix's square root.
stdev_max: Maximum values for the standard deviation.
Expected as a 1-dimensional array to serve as a limiter
to the diagonals of the covariance matrix's square root.
stdev_max_change: Maximum change allowed for when updating
the square roort of the covariance matrix.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
if popsize is None:
popsize = int(4 + math.floor(3 * math.log(problem.solution_length)))
if center_learning_rate is None:
center_learning_rate = 1.0
def default_stdev_lr():
n = problem.solution_length
return 0.2 * (3 + math.log(n)) / math.sqrt(n)
if stdev_learning_rate is None:
stdev_learning_rate = default_stdev_lr()
else:
stdev_learning_rate = float(stdev_learning_rate)
if scale_learning_rate:
stdev_learning_rate *= default_stdev_lr()
super().__init__(
problem,
popsize=popsize,
center_learning_rate=center_learning_rate,
stdev_learning_rate=stdev_learning_rate,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=optimizer,
optimizer_config=optimizer_config,
ranking_method=ranking_method,
center_init=center_init,
stdev_min=stdev_min,
stdev_max=stdev_max,
stdev_max_change=stdev_max_change,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
)
DISTRIBUTION_TYPE (SeparableGaussian)
¶
exponentialseparable Multivariate Gaussian, as used by SNES
Source code in evotorch/algorithms/distributed/gaussian.py
class ExpSeparableGaussian(SeparableGaussian):
"""exponentialseparable Multivariate Gaussian, as used by SNES"""
MANDATORY_PARAMETERS = {"mu", "sigma"}
OPTIONAL_PARAMETERS = set()
def _compute_gradients(self, samples: torch.Tensor, weights: torch.Tensor, ranking_used: Optional[str]) -> dict:
if ranking_used != "nes":
weights = weights / torch.sum(torch.abs(weights))
scaled_noises = samples - self.mu
raw_noises = scaled_noises / self.sigma
mu_grad = total(dot(weights, scaled_noises))
sigma_grad = total(dot(weights, (raw_noises**2) - 1))
return {"mu": mu_grad, "sigma": sigma_grad}
def update_parameters(
self,
gradients: dict,
*,
learning_rates: Optional[dict] = None,
optimizers: Optional[dict] = None,
) -> "ExpSeparableGaussian":
mu_grad = gradients["mu"]
sigma_grad = gradients["sigma"]
new_mu = self.mu + self._follow_gradient("mu", mu_grad, learning_rates=learning_rates, optimizers=optimizers)
new_sigma = self.sigma * torch.exp(
0.5 * self._follow_gradient("sigma", sigma_grad, learning_rates=learning_rates, optimizers=optimizers)
)
return self.modified_copy(mu=new_mu, sigma=new_sigma)
update_parameters(self, gradients, *, learning_rates=None, optimizers=None)
¶
Do an update on the distribution by following the given gradients.
It is expected that the inheriting class has its own implementation for this method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gradients |
dict |
Gradients, as a dictionary, which will be used for computing the necessary updates. |
required |
learning_rates |
Optional[dict] |
A dictionary which contains learning rates for parameters that will be updated using a learning rate coefficient. |
None |
optimizers |
Optional[dict] |
A dictionary which contains optimizer objects for parameters that will be updated using an adaptive optimizer. |
None |
Returns:
Type | Description |
---|---|
ExpSeparableGaussian |
The updated copy of the distribution. |
Source code in evotorch/algorithms/distributed/gaussian.py
def update_parameters(
self,
gradients: dict,
*,
learning_rates: Optional[dict] = None,
optimizers: Optional[dict] = None,
) -> "ExpSeparableGaussian":
mu_grad = gradients["mu"]
sigma_grad = gradients["sigma"]
new_mu = self.mu + self._follow_gradient("mu", mu_grad, learning_rates=learning_rates, optimizers=optimizers)
new_sigma = self.sigma * torch.exp(
0.5 * self._follow_gradient("sigma", sigma_grad, learning_rates=learning_rates, optimizers=optimizers)
)
return self.modified_copy(mu=new_mu, sigma=new_sigma)
__init__(self, problem, *, stdev_init=None, radius_init=None, popsize=None, center_learning_rate=None, stdev_learning_rate=None, scale_learning_rate=True, num_interactions=None, popsize_max=None, optimizer=None, optimizer_config=None, ranking_method='nes', center_init=None, stdev_min=None, stdev_max=None, stdev_max_change=None, obj_index=None, distributed=False, popsize_weighted_grad_avg=None)
special
¶
__init__(...)
: Initialize the SNES algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object which is being worked on. |
required |
stdev_init |
Union[float, Iterable[float], torch.Tensor] |
The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
radius_init |
Union[float, Iterable[float], torch.Tensor] |
The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
popsize |
Optional[int] |
Population size. Can be specified as an int,
or can be left as None to let the solver decide.
In the case of SNES, |
None |
center_learning_rate |
Optional[float] |
Learning rate for updating the mean of the search distribution. Default value is 1.0 |
None |
stdev_learning_rate |
Optional[float] |
Learning rate for updating the covariance
matrix of the search distribution.
The default value is |
None |
scale_learning_rate |
bool |
For SNES, there is a default standard
deviation learning rate value which is computed as
|
True |
num_interactions |
Optional[int] |
When given as an integer n, it is ensured that a population has interacted with the GymProblem's environment n times. If this target has not been reached yet, then the population is declared too small, and gets extended with more samples, until n amount of interactions is reached. When given as None, popsize is the only configuration affecting the size of a population. |
None |
popsize_max |
Optional[int] |
Having |
None |
num_interactions |
Optional[int] |
When given as an integer n, it is ensured that a population has interacted with the GymProblem's environment n times. If this target has not been reached yet, then the population is declared too small, and gets extended with more samples, until n amount of interactions is reached. When given as None, popsize is the only configuration affecting the size of a population. |
None |
popsize_max |
Optional[int] |
Having |
None |
optimizer |
The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
As in the study of Salimans et al. (2017),
the default is 'clipup'.
Note that, for ClipUp, the default maximum speed is set
as twice the given |
None |
|
optimizer_config |
Optional[dict] |
Configuration which will be passed
to the optimizer as keyword arguments.
See |
None |
ranking_method |
Optional[str] |
Which ranking method will be used for
fitness shaping. See the documentation of
|
'nes' |
center_init |
Union[float, Iterable[float], torch.Tensor] |
The initial center solution. Can be left as None. |
None |
stdev_min |
Union[float, Iterable[float], torch.Tensor] |
Minimum values for the standard deviation. Expected as a 1-dimensional array to serve as a limiter to the diagonals of the covariance matrix's square root. |
None |
stdev_max |
Union[float, Iterable[float], torch.Tensor] |
Maximum values for the standard deviation. Expected as a 1-dimensional array to serve as a limiter to the diagonals of the covariance matrix's square root. |
None |
stdev_max_change |
Union[float, Iterable[float], torch.Tensor] |
Maximum change allowed for when updating the square roort of the covariance matrix. |
None |
obj_index |
Optional[int] |
Index of the objective according to which the gradient estimations will be done. For single-objective problems, this can be left as None. |
None |
distributed |
bool |
Whether or not the gradient computation will
be distributed. If |
False |
popsize_weighted_grad_avg |
Optional[bool] |
Only to be used in distributed mode.
(where being in distributed mode means |
None |
Source code in evotorch/algorithms/distributed/gaussian.py
def __init__(
self,
problem: Problem,
*,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
popsize: Optional[int] = None,
center_learning_rate: Optional[float] = None,
stdev_learning_rate: Optional[float] = None,
scale_learning_rate: bool = True,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer=None,
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = "nes",
center_init: Optional[RealOrVector] = None,
stdev_min: Optional[RealOrVector] = None,
stdev_max: Optional[RealOrVector] = None,
stdev_max_change: Optional[RealOrVector] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the SNES algorithm.
Args:
problem: The problem object which is being worked on.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
popsize: Population size. Can be specified as an int,
or can be left as None to let the solver decide.
In the case of SNES, `popsize` can be left as None,
in which case the default `popsize` will be computed
as `4 + floor(3 * log(n))` where `n` is the length
of a solution.
center_learning_rate: Learning rate for updating the mean
of the search distribution. Default value is 1.0
stdev_learning_rate: Learning rate for updating the covariance
matrix of the search distribution.
The default value is `0.2 * (3 + log(n)) / sqrt(n)`
where `n` is the length of a solution.
scale_learning_rate: For SNES, there is a default standard
deviation learning rate value which is computed as
`0.2 * (3 + log(n)) / sqrt(n)` (where `n` is the solution
length).
If scale_learning_rate is True (which is the default),
then the effective learning rate for the standard deviation
becomes the provided `stdev_learning_rate` multiplied by this
default value. If `scale_learning_rate` is False, then the
effective standard deviation learning rate becomes
equal to the provided `stdev_learning_rate` value.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
optimizer: The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
As in the study of Salimans et al. (2017),
the default is 'clipup'.
Note that, for ClipUp, the default maximum speed is set
as twice the given `center_learning_rate`.
This maximum speed can be configured by passing
`{"max_speed": ...}` to `optimizer_config`.
optimizer_config: Configuration which will be passed
to the optimizer as keyword arguments.
See `evotorch.optimizers` for details about
which optimizer accepts which keyword arguments.
ranking_method: Which ranking method will be used for
fitness shaping. See the documentation of
`evotorch.ranking.rank(...)` for details.
As in the study of Salimans et al. (2017),
the default is 'centered'.
Can be given as None if no such ranking is required.
center_init: The initial center solution.
Can be left as None.
stdev_min: Minimum values for the standard deviation.
Expected as a 1-dimensional array to serve as a limiter
to the diagonals of the covariance matrix's square root.
stdev_max: Maximum values for the standard deviation.
Expected as a 1-dimensional array to serve as a limiter
to the diagonals of the covariance matrix's square root.
stdev_max_change: Maximum change allowed for when updating
the square roort of the covariance matrix.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
if popsize is None:
popsize = int(4 + math.floor(3 * math.log(problem.solution_length)))
if center_learning_rate is None:
center_learning_rate = 1.0
def default_stdev_lr():
n = problem.solution_length
return 0.2 * (3 + math.log(n)) / math.sqrt(n)
if stdev_learning_rate is None:
stdev_learning_rate = default_stdev_lr()
else:
stdev_learning_rate = float(stdev_learning_rate)
if scale_learning_rate:
stdev_learning_rate *= default_stdev_lr()
super().__init__(
problem,
popsize=popsize,
center_learning_rate=center_learning_rate,
stdev_learning_rate=stdev_learning_rate,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=optimizer,
optimizer_config=optimizer_config,
ranking_method=ranking_method,
center_init=center_init,
stdev_min=stdev_min,
stdev_max=stdev_max,
stdev_max_change=stdev_max_change,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
)
XNES (GaussianSearchAlgorithm)
¶
Inspired by the implementation at: http://schaul.site44.com/code/xnes.py https://github.com/pybrain/pybrain/blob/master/pybrain/optimization/distributionbased/xnes.py
Reference
Glasmachers, Tobias, et al. Exponential natural evolution strategies. Proceedings of the 12th annual conference on Genetic and evolutionary computation (GECCO 2010).
Source code in evotorch/algorithms/distributed/gaussian.py
class XNES(GaussianSearchAlgorithm):
"""
XNES: Exponential Natural Evolution Strategies
Inspired by the implementation at:
http://schaul.site44.com/code/xnes.py
https://github.com/pybrain/pybrain/blob/master/pybrain/optimization/distributionbased/xnes.py
Reference:
Glasmachers, Tobias, et al.
Exponential natural evolution strategies.
Proceedings of the 12th annual conference on Genetic and evolutionary
computation (GECCO 2010).
"""
DISTRIBUTION_TYPE = ExpGaussian
DISTRIBUTION_PARAMS = None
def __init__(
self,
problem: Problem,
*,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
popsize: Optional[int] = None,
center_learning_rate: Optional[float] = None,
stdev_learning_rate: Optional[float] = None,
scale_learning_rate: bool = True,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer=None,
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = "nes",
center_init: Optional[RealOrVector] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the XNES algorithm.
Args:
problem: The problem object which is being worked on.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
popsize: Population size. Can be specified as an int,
or can be left as None to let the solver decide.
In the case of SNES, `popsize` can be left as None,
in which case the default `popsize` will be computed
as `4 + floor(3 * log(n))` where `n` is the length
of a solution.
center_learning_rate: Learning rate for updating the mean
of the search distribution. Default value is 1.0
stdev_learning_rate: Learning rate for updating the covariance
matrix of the search distribution.
The default value is `0.6 * (3 + log(n)) / (n * sqrt(n))`
where `n` is the length of a solution.
scale_learning_rate: For SNES, there is a default standard
deviation learning rate value which is computed as
`0.6 * (3 + log(n)) / (n * sqrt(n))` (where `n` is the solution
length).
If scale_learning_rate is True (which is the default),
then the effective learning rate for the standard deviation
becomes the provided `stdev_learning_rate` multiplied by this
default value. If `scale_learning_rate` is False, then the
effective standard deviation learning rate becomes
equal to the provided `stdev_learning_rate` value.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
optimizer: The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
As in the study of Salimans et al. (2017),
the default is 'clipup'.
Note that, for ClipUp, the default maximum speed is set
as twice the given `center_learning_rate`.
This maximum speed can be configured by passing
`{"max_speed": ...}` to `optimizer_config`.
optimizer_config: Configuration which will be passed
to the optimizer as keyword arguments.
See `evotorch.optimizers` for details about
which optimizer accepts which keyword arguments.
ranking_method: Which ranking method will be used for
fitness shaping. See the documentation of
`evotorch.ranking.rank(...)` for details.
As in the study of Salimans et al. (2017),
the default is 'centered'.
Can be given as None if no such ranking is required.
center_init: The initial center solution.
Can be left as None.
stdev_min: Minimum values for the standard deviation.
Expected as a 1-dimensional array to serve as a limiter
to the diagonals of the covariance matrix's square root.
stdev_max: Maximum values for the standard deviation.
Expected as a 1-dimensional array to serve as a limiter
to the diagonals of the covariance matrix's square root.
stdev_max_change: Maximum change allowed for when updating
the square roort of the covariance matrix.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
if popsize is None:
popsize = int(4 + math.floor(3 * math.log(problem.solution_length)))
if center_learning_rate is None:
center_learning_rate = 1.0
def default_stdev_lr():
n = problem.solution_length
return 0.6 * (3 + math.log(n)) / (n * math.sqrt(n))
if stdev_learning_rate is None:
stdev_learning_rate = default_stdev_lr()
else:
stdev_learning_rate = float(stdev_learning_rate)
if scale_learning_rate:
stdev_learning_rate *= default_stdev_lr()
super().__init__(
problem,
popsize=popsize,
center_learning_rate=center_learning_rate,
stdev_learning_rate=stdev_learning_rate,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=optimizer,
optimizer_config=optimizer_config,
ranking_method=ranking_method,
center_init=center_init,
stdev_min=None,
stdev_max=None,
stdev_max_change=None,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
)
DISTRIBUTION_TYPE (Distribution)
¶
exponential Multivariate Gaussian, as used by XNES
Source code in evotorch/algorithms/distributed/gaussian.py
class ExpGaussian(Distribution):
"""exponential Multivariate Gaussian, as used by XNES"""
# Corresponding to mu and A in symbols used in xNES paper
MANDATORY_PARAMETERS = {"mu", "sigma"}
# Inverse of sigma, numerically more stable to track this independently to sigma
OPTIONAL_PARAMETERS = {"sigma_inv"}
def __init__(
self,
parameters: dict,
*,
solution_length: Optional[int] = None,
device: Optional[Device] = None,
dtype: Optional[DType] = None,
):
[mu_length] = parameters["mu"].shape
# Make sigma 2D
if len(parameters["sigma"].shape) == 1:
parameters["sigma"] = torch.diag(parameters["sigma"])
# Automatically generate sigma_inv if not provided
if "sigma_inv" not in parameters:
parameters["sigma_inv"] = torch.inverse(parameters["sigma"])
[sigma_length, _] = parameters["sigma"].shape
if solution_length is None:
solution_length = mu_length
else:
if solution_length != mu_length:
raise ValueError(
f"The argument `solution_length` does not match the length of `mu` provided in `parameters`."
f" solution_length={solution_length},"
f' parameters["mu"]={mu_length}.'
)
if mu_length != sigma_length:
raise ValueError(
f"The tensors `mu` and `sigma` provided within `parameters` have mismatching lengths."
f' parameters["mu"]={mu_length},'
f' parameters["sigma"]={sigma_length}.'
)
super().__init__(
solution_length=solution_length,
parameters=parameters,
device=device,
dtype=dtype,
)
# Make identity matrix as this is used throughout in gradient computation
self.eye = self.make_zeros((solution_length, solution_length))
self.eye[range(self.solution_length), range(self.solution_length)] = 1.0
@property
def mu(self) -> torch.Tensor:
"""Getter for mu
Returns:
mu (torch.Tensor): The center of the search distribution
"""
return self.parameters["mu"]
@mu.setter
def mu(self, new_mu: Iterable):
"""Setter for mu
Args:
new_mu (torch.Tensor): The new value of mu
"""
self.parameters["mu"] = torch.as_tensor(new_mu, dtype=self.dtype, device=self.device)
@property
def cov(self) -> torch.Tensor:
"""The covariance matrix A^T A"""
return self.sigma.transpose(0, 1) @ self.sigma
@property
def sigma(self) -> torch.Tensor:
"""Getter for sigma
Returns:
sigma (torch.Tensor): The square root of the covariance matrix
"""
return self.parameters["sigma"]
@property
def sigma_inv(self) -> torch.Tensor:
"""Getter for sigma_inv
Returns:
sigma_inv (torch.Tensor): The inverse square root of the covariance matrix
"""
if "sigma_inv" in self.parameters:
return self.parameters["sigma_inv"]
else:
return torch.inverse(self.parameters["sigma"])
@property
def A(self) -> torch.Tensor:
"""Alias for self.sigma, for notational consistency with paper"""
return self.sigma
@property
def A_inv(self) -> torch.Tensor:
"""Alias for self.sigma_inv, for notational consistency with paper"""
return self.sigma_inv
@sigma.setter
def sigma(self, new_sigma: Iterable):
"""Setter for sigma
Args:
new_sigma (torch.Tensor): The new value of sigma, the square root of the covariance matrix
"""
self.parameters["sigma"] = torch.as_tensor(new_sigma, dtype=self.dtype, device=self.device)
def to_global_coordinates(self, local_coordinates: torch.Tensor) -> torch.Tensor:
"""Map samples from local coordinate space N(0, I_d) to global coordinate space N(mu, A^T A)
This function is the inverse of to_local_coordinates
Args:
local_coordinates (torch.Tensor): The local coordinates sampled from N(0, I_d)
Returns:
global_coordinates (torch.Tensor): The global coordinates sampled from N(mu, A^T A)
"""
# Global samples are constructed as x = mu + A z where z is local coordinate
# We use transpose here to simplify the batched application of A
return self.mu.unsqueeze(0) + (self.A @ local_coordinates.T).T
def to_local_coordinates(self, global_coordinates: torch.Tensor) -> torch.Tensor:
"""Map samples from global coordinate space N(mu, A^T A) to local coordinate space N(0, I_d)
This function is the inverse of to_global_coordinates
Args:
global_coordinates (torch.Tensor): The global coordinates sampled from N(mu, A^T A)
Returns:
local_coordinates (torch.Tensor): The local coordinates sampled from N(0, I_d)
"""
# Global samples are constructed as x = mu + A z where z is local coordinate
# Therefore, we can recover z according to z = A_inv (x - mu)
return (self.A_inv @ (global_coordinates - self.mu.unsqueeze(0)).T).T
def _fill(self, out: torch.Tensor, *, generator: Optional[torch.Generator] = None):
"""Fill a tensor with samples from N(mu, A^T A)
Args:
out (torch.Tensor): The tensor to fill
generator (Optional[torch.Generator]): A generator to use to generate random values
"""
# Fill with local coordinates from N(0, I_d)
self.make_gaussian(out=out, generator=generator)
# Map local coordinates to global coordinate system
out[:] = self.to_global_coordinates(out)
def _compute_gradients(self, samples: torch.Tensor, weights: torch.Tensor, ranking_used: Optional[str]) -> dict:
"""Compute the gradients with respect to a given set of samples and weights
Args:
samples (torch.Tensor): Samples drawn from N(mu, A^T A), ideally using self._fill
weights (torch.Tensor): Weights e.g. fitnesses or utilities assigned to samples
ranking_used (optional[str]): The ranking method used to compute weights
Returns:
grads (dict): A dictionary containing the approximated natural gradient on d and M
"""
# Compute the local coordinates
local_coordinates = self.to_local_coordinates(samples)
# Make sure that the weights (utilities) are 0-centered
# (Otherwise the formulations would have to consider a bias term)
if ranking_used not in ("centered", "normalized"):
weights = weights - torch.mean(weights)
d_grad = total(dot(weights, local_coordinates))
local_coordinates_outer = local_coordinates.unsqueeze(1) * local_coordinates.unsqueeze(2)
M_grad = torch.sum(
weights.unsqueeze(-1).unsqueeze(-1) * (local_coordinates_outer - self.eye.unsqueeze(0)), dim=0
)
return {
"d": d_grad,
"M": M_grad,
}
def update_parameters(
self,
gradients: dict,
*,
learning_rates: Optional[dict] = None,
optimizers: Optional[dict] = None,
) -> "ExpGaussian":
d_grad = gradients["d"]
M_grad = gradients["M"]
if "d" not in learning_rates:
learning_rates["d"] = learning_rates["mu"]
if "M" not in learning_rates:
learning_rates["M"] = learning_rates["sigma"]
# Follow gradients for d, and M
update_d = self._follow_gradient("d", d_grad, learning_rates=learning_rates, optimizers=optimizers)
update_M = self._follow_gradient("M", M_grad, learning_rates=learning_rates, optimizers=optimizers)
# Fold into parameters mu, A and A inv
new_mu = self.mu + torch.mv(self.A, update_d)
new_A = self.A @ torch.matrix_exp(0.5 * update_M)
new_A_inv = torch.matrix_exp(-0.5 * update_M) @ self.A_inv
# Return modified distribution
return self.modified_copy(mu=new_mu, sigma=new_A, sigma_inv=new_A_inv)
A: Tensor
property
readonly
¶
Alias for self.sigma, for notational consistency with paper
A_inv: Tensor
property
readonly
¶
Alias for self.sigma_inv, for notational consistency with paper
cov: Tensor
property
readonly
¶
The covariance matrix A^T A
mu: Tensor
property
writable
¶
Getter for mu
Returns:
Type | Description |
---|---|
mu (torch.Tensor) |
The center of the search distribution |
sigma: Tensor
property
writable
¶
Getter for sigma
Returns:
Type | Description |
---|---|
sigma (torch.Tensor) |
The square root of the covariance matrix |
sigma_inv: Tensor
property
readonly
¶
Getter for sigma_inv
Returns:
Type | Description |
---|---|
sigma_inv (torch.Tensor) |
The inverse square root of the covariance matrix |
to_global_coordinates(self, local_coordinates)
¶
Map samples from local coordinate space N(0, I_d) to global coordinate space N(mu, A^T A) This function is the inverse of to_local_coordinates
Parameters:
Name | Type | Description | Default |
---|---|---|---|
local_coordinates |
torch.Tensor |
The local coordinates sampled from N(0, I_d) |
required |
Returns:
Type | Description |
---|---|
global_coordinates (torch.Tensor) |
The global coordinates sampled from N(mu, A^T A) |
Source code in evotorch/algorithms/distributed/gaussian.py
def to_global_coordinates(self, local_coordinates: torch.Tensor) -> torch.Tensor:
"""Map samples from local coordinate space N(0, I_d) to global coordinate space N(mu, A^T A)
This function is the inverse of to_local_coordinates
Args:
local_coordinates (torch.Tensor): The local coordinates sampled from N(0, I_d)
Returns:
global_coordinates (torch.Tensor): The global coordinates sampled from N(mu, A^T A)
"""
# Global samples are constructed as x = mu + A z where z is local coordinate
# We use transpose here to simplify the batched application of A
return self.mu.unsqueeze(0) + (self.A @ local_coordinates.T).T
to_local_coordinates(self, global_coordinates)
¶
Map samples from global coordinate space N(mu, A^T A) to local coordinate space N(0, I_d) This function is the inverse of to_global_coordinates
Parameters:
Name | Type | Description | Default |
---|---|---|---|
global_coordinates |
torch.Tensor |
The global coordinates sampled from N(mu, A^T A) |
required |
Returns:
Type | Description |
---|---|
local_coordinates (torch.Tensor) |
The local coordinates sampled from N(0, I_d) |
Source code in evotorch/algorithms/distributed/gaussian.py
def to_local_coordinates(self, global_coordinates: torch.Tensor) -> torch.Tensor:
"""Map samples from global coordinate space N(mu, A^T A) to local coordinate space N(0, I_d)
This function is the inverse of to_global_coordinates
Args:
global_coordinates (torch.Tensor): The global coordinates sampled from N(mu, A^T A)
Returns:
local_coordinates (torch.Tensor): The local coordinates sampled from N(0, I_d)
"""
# Global samples are constructed as x = mu + A z where z is local coordinate
# Therefore, we can recover z according to z = A_inv (x - mu)
return (self.A_inv @ (global_coordinates - self.mu.unsqueeze(0)).T).T
update_parameters(self, gradients, *, learning_rates=None, optimizers=None)
¶
Do an update on the distribution by following the given gradients.
It is expected that the inheriting class has its own implementation for this method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
gradients |
dict |
Gradients, as a dictionary, which will be used for computing the necessary updates. |
required |
learning_rates |
Optional[dict] |
A dictionary which contains learning rates for parameters that will be updated using a learning rate coefficient. |
None |
optimizers |
Optional[dict] |
A dictionary which contains optimizer objects for parameters that will be updated using an adaptive optimizer. |
None |
Returns:
Type | Description |
---|---|
ExpGaussian |
The updated copy of the distribution. |
Source code in evotorch/algorithms/distributed/gaussian.py
def update_parameters(
self,
gradients: dict,
*,
learning_rates: Optional[dict] = None,
optimizers: Optional[dict] = None,
) -> "ExpGaussian":
d_grad = gradients["d"]
M_grad = gradients["M"]
if "d" not in learning_rates:
learning_rates["d"] = learning_rates["mu"]
if "M" not in learning_rates:
learning_rates["M"] = learning_rates["sigma"]
# Follow gradients for d, and M
update_d = self._follow_gradient("d", d_grad, learning_rates=learning_rates, optimizers=optimizers)
update_M = self._follow_gradient("M", M_grad, learning_rates=learning_rates, optimizers=optimizers)
# Fold into parameters mu, A and A inv
new_mu = self.mu + torch.mv(self.A, update_d)
new_A = self.A @ torch.matrix_exp(0.5 * update_M)
new_A_inv = torch.matrix_exp(-0.5 * update_M) @ self.A_inv
# Return modified distribution
return self.modified_copy(mu=new_mu, sigma=new_A, sigma_inv=new_A_inv)
__init__(self, problem, *, stdev_init=None, radius_init=None, popsize=None, center_learning_rate=None, stdev_learning_rate=None, scale_learning_rate=True, num_interactions=None, popsize_max=None, optimizer=None, optimizer_config=None, ranking_method='nes', center_init=None, obj_index=None, distributed=False, popsize_weighted_grad_avg=None)
special
¶
__init__(...)
: Initialize the XNES algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object which is being worked on. |
required |
stdev_init |
Union[float, Iterable[float], torch.Tensor] |
The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
radius_init |
Union[float, Iterable[float], torch.Tensor] |
The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument |
None |
popsize |
Optional[int] |
Population size. Can be specified as an int,
or can be left as None to let the solver decide.
In the case of SNES, |
None |
center_learning_rate |
Optional[float] |
Learning rate for updating the mean of the search distribution. Default value is 1.0 |
None |
stdev_learning_rate |
Optional[float] |
Learning rate for updating the covariance
matrix of the search distribution.
The default value is |
None |
scale_learning_rate |
bool |
For SNES, there is a default standard
deviation learning rate value which is computed as
|
True |
num_interactions |
Optional[int] |
When given as an integer n, it is ensured that a population has interacted with the GymProblem's environment n times. If this target has not been reached yet, then the population is declared too small, and gets extended with more samples, until n amount of interactions is reached. When given as None, popsize is the only configuration affecting the size of a population. |
None |
popsize_max |
Optional[int] |
Having |
None |
num_interactions |
Optional[int] |
When given as an integer n, it is ensured that a population has interacted with the GymProblem's environment n times. If this target has not been reached yet, then the population is declared too small, and gets extended with more samples, until n amount of interactions is reached. When given as None, popsize is the only configuration affecting the size of a population. |
None |
optimizer |
The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
As in the study of Salimans et al. (2017),
the default is 'clipup'.
Note that, for ClipUp, the default maximum speed is set
as twice the given |
None |
|
optimizer_config |
Optional[dict] |
Configuration which will be passed
to the optimizer as keyword arguments.
See |
None |
ranking_method |
Optional[str] |
Which ranking method will be used for
fitness shaping. See the documentation of
|
'nes' |
center_init |
Union[float, Iterable[float], torch.Tensor] |
The initial center solution. Can be left as None. |
None |
stdev_min |
Minimum values for the standard deviation. Expected as a 1-dimensional array to serve as a limiter to the diagonals of the covariance matrix's square root. |
required | |
stdev_max |
Maximum values for the standard deviation. Expected as a 1-dimensional array to serve as a limiter to the diagonals of the covariance matrix's square root. |
required | |
stdev_max_change |
Maximum change allowed for when updating the square roort of the covariance matrix. |
required | |
obj_index |
Optional[int] |
Index of the objective according to which the gradient estimations will be done. For single-objective problems, this can be left as None. |
None |
distributed |
bool |
Whether or not the gradient computation will
be distributed. If |
False |
popsize_weighted_grad_avg |
Optional[bool] |
Only to be used in distributed mode.
(where being in distributed mode means |
None |
Source code in evotorch/algorithms/distributed/gaussian.py
def __init__(
self,
problem: Problem,
*,
stdev_init: Optional[RealOrVector] = None,
radius_init: Optional[RealOrVector] = None,
popsize: Optional[int] = None,
center_learning_rate: Optional[float] = None,
stdev_learning_rate: Optional[float] = None,
scale_learning_rate: bool = True,
num_interactions: Optional[int] = None,
popsize_max: Optional[int] = None,
optimizer=None,
optimizer_config: Optional[dict] = None,
ranking_method: Optional[str] = "nes",
center_init: Optional[RealOrVector] = None,
obj_index: Optional[int] = None,
distributed: bool = False,
popsize_weighted_grad_avg: Optional[bool] = None,
):
"""
`__init__(...)`: Initialize the XNES algorithm.
Args:
problem: The problem object which is being worked on.
stdev_init: The initial standard deviation of the search
distribution, expressed as a scalar or as an array.
Determines the initial coverage area of the search
distribution.
If one wishes to configure the coverage area via the
argument `radius_init` instead, then `stdev_init` is expected
as None.
radius_init: The initial radius of the search distribution,
expressed as a scalar.
Determines the initial coverage area of the search
distribution.
Here, "radius" is defined as the norm of the search
distribution.
If one wishes to configure the coverage area via the
argument `stdev_init` instead, then `radius_init` is expected
as None.
popsize: Population size. Can be specified as an int,
or can be left as None to let the solver decide.
In the case of SNES, `popsize` can be left as None,
in which case the default `popsize` will be computed
as `4 + floor(3 * log(n))` where `n` is the length
of a solution.
center_learning_rate: Learning rate for updating the mean
of the search distribution. Default value is 1.0
stdev_learning_rate: Learning rate for updating the covariance
matrix of the search distribution.
The default value is `0.6 * (3 + log(n)) / (n * sqrt(n))`
where `n` is the length of a solution.
scale_learning_rate: For SNES, there is a default standard
deviation learning rate value which is computed as
`0.6 * (3 + log(n)) / (n * sqrt(n))` (where `n` is the solution
length).
If scale_learning_rate is True (which is the default),
then the effective learning rate for the standard deviation
becomes the provided `stdev_learning_rate` multiplied by this
default value. If `scale_learning_rate` is False, then the
effective standard deviation learning rate becomes
equal to the provided `stdev_learning_rate` value.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
popsize_max: Having `num_interactions` set as an integer
might cause the effective population size jump to
unnecesarily large numbers. To prevent this,
one can set `popsize_max` to specify an upper
bound for the effective population size.
num_interactions: When given as an integer n,
it is ensured that a population has interacted with
the GymProblem's environment n times. If this target
has not been reached yet, then the population is declared
too small, and gets extended with more samples,
until n amount of interactions is reached.
When given as None, popsize is the only configuration
affecting the size of a population.
optimizer: The optimizer to be used while following the
estimated the gradients.
Can be given as None if a momentum-based optimizer
is not required.
Otherwise, can be given as a str containing the name
of the optimizer (e.g. 'adam', 'clipup');
or as an instance of evotorch.optimizers.TorchOptimizer
or evotorch.optimizers.ClipUp.
As in the study of Salimans et al. (2017),
the default is 'clipup'.
Note that, for ClipUp, the default maximum speed is set
as twice the given `center_learning_rate`.
This maximum speed can be configured by passing
`{"max_speed": ...}` to `optimizer_config`.
optimizer_config: Configuration which will be passed
to the optimizer as keyword arguments.
See `evotorch.optimizers` for details about
which optimizer accepts which keyword arguments.
ranking_method: Which ranking method will be used for
fitness shaping. See the documentation of
`evotorch.ranking.rank(...)` for details.
As in the study of Salimans et al. (2017),
the default is 'centered'.
Can be given as None if no such ranking is required.
center_init: The initial center solution.
Can be left as None.
stdev_min: Minimum values for the standard deviation.
Expected as a 1-dimensional array to serve as a limiter
to the diagonals of the covariance matrix's square root.
stdev_max: Maximum values for the standard deviation.
Expected as a 1-dimensional array to serve as a limiter
to the diagonals of the covariance matrix's square root.
stdev_max_change: Maximum change allowed for when updating
the square roort of the covariance matrix.
obj_index: Index of the objective according to which the
gradient estimations will be done.
For single-objective problems, this can be left as None.
distributed: Whether or not the gradient computation will
be distributed. If `distributed` is given as False and
the problem is not parallelized, then everything will
be centralized (i.e. the entire computation will happen
in the main process).
If `distributed` is given as False, and the problem
is parallelized, then the population will be created
in the main process and then sent to remote workers
for parallelized evaluation, and then the remote fitnesses
will be collected by the main process again for computing
the search gradients.
If `distributed` is given as True, and the problem
is parallelized, then the search algorithm itself will
be distributed, in the sense that each remote actor will
generate its own population (such that the total population
size across all these actors becomes equal to `popsize`)
and will compute its own gradient, and then the main process
will collect these gradients, compute the averaged gradients
and update the main search distribution.
Non-distributed mode has the advantage of keeping the
population in the main process, which is good when one wishes
to do detailed monitoring during the evolutionary process,
but has the disadvantage of having to pass the solutions to
the remote actors and having to collect fitnesses, which
might result in increased interprocess communication traffic.
On the other hand, while it is not possible to monitor the
population in distributed mode, the distributed mode has the
advantage of significantly reducing the interprocess
communication traffic, since the only things communicated
with the remote actors are the search distributions (not the
solutions) and the gradients.
popsize_weighted_grad_avg: Only to be used in distributed mode.
(where being in distributed mode means `distributed` is given
as True). In distributed mode, each actor remotely samples
its own solution batches and computes its own gradients.
These gradients are then collected, and a final average
gradient is computed.
If `popsize_weighted_grad_avg` is True, then, while averaging
over the gradients, each gradient will have its own weight
that is computed according to how many solutions were sampled
by the actor that produced the gradient.
If `popsize_weighted_grad_avg` is False, then, there will not
be weighted averaging (or, each gradient will have equal
weight).
If `popsize_weighted_grad_avg` is None, then, the gradient
weights will be equal a value for `num_interactions` is given
(because `num_interactions` affects the number of solutions
according to the episode lengths, and popsize-weighting the
gradients could be misleading); and the gradient weights will
be weighted according to the sub-population (i.e. sub-batch)
sizes if `num_interactions` is left as None.
The default value for `popsize_weighted_grad_avg` is None.
When the distributed mode is disabled (i.e. when `distributed`
is False), then the argument `popsize_weighted_grad_avg` is
expected as None.
"""
if popsize is None:
popsize = int(4 + math.floor(3 * math.log(problem.solution_length)))
if center_learning_rate is None:
center_learning_rate = 1.0
def default_stdev_lr():
n = problem.solution_length
return 0.6 * (3 + math.log(n)) / (n * math.sqrt(n))
if stdev_learning_rate is None:
stdev_learning_rate = default_stdev_lr()
else:
stdev_learning_rate = float(stdev_learning_rate)
if scale_learning_rate:
stdev_learning_rate *= default_stdev_lr()
super().__init__(
problem,
popsize=popsize,
center_learning_rate=center_learning_rate,
stdev_learning_rate=stdev_learning_rate,
stdev_init=stdev_init,
radius_init=radius_init,
popsize_max=popsize_max,
num_interactions=num_interactions,
optimizer=optimizer,
optimizer_config=optimizer_config,
ranking_method=ranking_method,
center_init=center_init,
stdev_min=None,
stdev_max=None,
stdev_max_change=None,
obj_index=obj_index,
distributed=distributed,
popsize_weighted_grad_avg=popsize_weighted_grad_avg,
)
ga
¶
Genetic algorithm variants: SteadyStateGA, Cosyne.
Cosyne (SearchAlgorithm, SinglePopulationAlgorithmMixin)
¶
Implementation of the CoSyNE algorithm.
References:
F.Gomez, J.Schmidhuber, R.Miikkulainen, M.Mitchell (2008).
Accelerated Neural Evolution through Cooperatively Coevolved Synapses.
Journal of Machine Learning Research 9 (5).
Source code in evotorch/algorithms/ga.py
class Cosyne(SearchAlgorithm, SinglePopulationAlgorithmMixin):
"""
Implementation of the CoSyNE algorithm.
References:
F.Gomez, J.Schmidhuber, R.Miikkulainen, M.Mitchell (2008).
Accelerated Neural Evolution through Cooperatively Coevolved Synapses.
Journal of Machine Learning Research 9 (5).
"""
def __init__(
self,
problem: Problem,
*,
popsize: int,
tournament_size: int,
mutation_stdev: Optional[float],
mutation_probability: Optional[float],
permute_all: bool = False,
num_elites: Optional[int] = None,
elitism_ratio: Optional[float] = None,
eta: Optional[float] = None,
num_children: Optional[int] = None,
):
"""
`__init__(...)`: Initialize the Cosyne instance.
Args:
problem: The problem object to work on.
popsize: Population size, as an integer.
tournament_size: Tournament size, for tournament selection.
mutation_stdev: Standard deviation of the Gaussian mutation.
mutation_probability: Elementwise Gaussian mutation probability.
permute_all: If given as True, all solutions are subject to
permutation. If given as False (which is the default),
there will be a selection procedure for each decision
variable.
num_elites: Optionally expected as an integer, specifying the
number of elites to pass to the next generation.
Cannot be used together with the argument `elitism_ratio`.
elitism_ratio: Optionally expected as a real number between
0 and 1, specifying the amount of elites to pass to the
next generation. For example, 0.1 means that the best 10%
of the population are accepted as elites and passed onto
the next generation.
Cannot be used together with the argument `num_elites`.
eta: Optionally expected as an integer, specifying the eta
hyperparameter for the simulated binary cross-over (SBX).
If left as None, one-point cross-over will be used instead.
num_children: Number of children to generate at each iteration.
If left as None, then this number is half of the population
size.
"""
problem.ensure_numeric()
SearchAlgorithm.__init__(self, problem)
if mutation_stdev is None and mutation_probability is None:
self.mutation_op = None
else:
self.mutation_op = GaussianMutation(
self._problem, mutation_probability=float(mutation_probability), stdev=float(mutation_stdev)
)
cross_over_kwargs = {"tournament_size": tournament_size}
if num_children is None:
cross_over_kwargs["cross_over_rate"] = 2.0
else:
cross_over_kwargs["num_children"] = num_children
if eta is None:
self._cross_over_op = OnePointCrossOver(self._problem, **cross_over_kwargs)
else:
self._cross_over_op = SimulatedBinaryCrossOver(self._problem, eta=eta, **cross_over_kwargs)
self._permutation_op = CosynePermutation(self._problem, permute_all=permute_all)
self._popsize = int(popsize)
if num_elites is not None and elitism_ratio is None:
self._num_elites = int(num_elites)
elif num_elites is None and elitism_ratio is not None:
self._num_elites = int(self._popsize * elitism_ratio)
elif num_elites is None and elitism_ratio is None:
self._num_elites = None
else:
raise ValueError(
"Received both `num_elites` and `elitism_ratio`. Please provide only one of them, or none of them."
)
self._population = SolutionBatch(problem, device=problem.device, popsize=self._popsize)
self._first_generation: bool = True
# GAStatusMixin.__init__(self)
SinglePopulationAlgorithmMixin.__init__(self)
@property
def population(self) -> SolutionBatch:
return self._population
def _step(self):
if self._first_generation:
self._first_generation = False
self._problem.evaluate(self._population)
to_merge = []
num_elites = self._num_elites
num_parents = int(self._popsize / 4)
num_relevant = max((0 if num_elites is None else num_elites), num_parents)
sorted_relevant = self._population.take_best(num_relevant)
if self._num_elites is not None and self._num_elites >= 1:
to_merge.append(sorted_relevant[:num_elites].clone())
parents = sorted_relevant[:num_parents]
children = self._cross_over_op(parents)
if self.mutation_op is not None:
children = self.mutation_op(children)
permuted = self._permutation_op(self._population)
to_merge.extend([children, permuted])
extended_population = SolutionBatch(merging_of=to_merge)
self._problem.evaluate(extended_population)
self._population = extended_population.take_best(self._popsize)
__init__(self, problem, *, popsize, tournament_size, mutation_stdev, mutation_probability, permute_all=False, num_elites=None, elitism_ratio=None, eta=None, num_children=None)
special
¶
__init__(...)
: Initialize the Cosyne instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem object to work on. |
required |
popsize |
int |
Population size, as an integer. |
required |
tournament_size |
int |
Tournament size, for tournament selection. |
required |
mutation_stdev |
Optional[float] |
Standard deviation of the Gaussian mutation. |
required |
mutation_probability |
Optional[float] |
Elementwise Gaussian mutation probability. |
required |
permute_all |
bool |
If given as True, all solutions are subject to permutation. If given as False (which is the default), there will be a selection procedure for each decision variable. |
False |
num_elites |
Optional[int] |
Optionally expected as an integer, specifying the
number of elites to pass to the next generation.
Cannot be used together with the argument |
None |
elitism_ratio |
Optional[float] |
Optionally expected as a real number between
0 and 1, specifying the amount of elites to pass to the
next generation. For example, 0.1 means that the best 10%
of the population are accepted as elites and passed onto
the next generation.
Cannot be used together with the argument |
None |
eta |
Optional[float] |
Optionally expected as an integer, specifying the eta hyperparameter for the simulated binary cross-over (SBX). If left as None, one-point cross-over will be used instead. |
None |
num_children |
Optional[int] |
Number of children to generate at each iteration. If left as None, then this number is half of the population size. |
None |
Source code in evotorch/algorithms/ga.py
def __init__(
self,
problem: Problem,
*,
popsize: int,
tournament_size: int,
mutation_stdev: Optional[float],
mutation_probability: Optional[float],
permute_all: bool = False,
num_elites: Optional[int] = None,
elitism_ratio: Optional[float] = None,
eta: Optional[float] = None,
num_children: Optional[int] = None,
):
"""
`__init__(...)`: Initialize the Cosyne instance.
Args:
problem: The problem object to work on.
popsize: Population size, as an integer.
tournament_size: Tournament size, for tournament selection.
mutation_stdev: Standard deviation of the Gaussian mutation.
mutation_probability: Elementwise Gaussian mutation probability.
permute_all: If given as True, all solutions are subject to
permutation. If given as False (which is the default),
there will be a selection procedure for each decision
variable.
num_elites: Optionally expected as an integer, specifying the
number of elites to pass to the next generation.
Cannot be used together with the argument `elitism_ratio`.
elitism_ratio: Optionally expected as a real number between
0 and 1, specifying the amount of elites to pass to the
next generation. For example, 0.1 means that the best 10%
of the population are accepted as elites and passed onto
the next generation.
Cannot be used together with the argument `num_elites`.
eta: Optionally expected as an integer, specifying the eta
hyperparameter for the simulated binary cross-over (SBX).
If left as None, one-point cross-over will be used instead.
num_children: Number of children to generate at each iteration.
If left as None, then this number is half of the population
size.
"""
problem.ensure_numeric()
SearchAlgorithm.__init__(self, problem)
if mutation_stdev is None and mutation_probability is None:
self.mutation_op = None
else:
self.mutation_op = GaussianMutation(
self._problem, mutation_probability=float(mutation_probability), stdev=float(mutation_stdev)
)
cross_over_kwargs = {"tournament_size": tournament_size}
if num_children is None:
cross_over_kwargs["cross_over_rate"] = 2.0
else:
cross_over_kwargs["num_children"] = num_children
if eta is None:
self._cross_over_op = OnePointCrossOver(self._problem, **cross_over_kwargs)
else:
self._cross_over_op = SimulatedBinaryCrossOver(self._problem, eta=eta, **cross_over_kwargs)
self._permutation_op = CosynePermutation(self._problem, permute_all=permute_all)
self._popsize = int(popsize)
if num_elites is not None and elitism_ratio is None:
self._num_elites = int(num_elites)
elif num_elites is None and elitism_ratio is not None:
self._num_elites = int(self._popsize * elitism_ratio)
elif num_elites is None and elitism_ratio is None:
self._num_elites = None
else:
raise ValueError(
"Received both `num_elites` and `elitism_ratio`. Please provide only one of them, or none of them."
)
self._population = SolutionBatch(problem, device=problem.device, popsize=self._popsize)
self._first_generation: bool = True
# GAStatusMixin.__init__(self)
SinglePopulationAlgorithmMixin.__init__(self)
SteadyStateGA (SearchAlgorithm, SinglePopulationAlgorithmMixin)
¶
A fully elitist genetic algorithm implementation.
For multi-objective problems, the instances of this class organize their populations into pareto-fronts, and do pareto-rank-based selections among the solutions, in a compatible way with the NSGA-II algorithm.
References:
Sean Luke, 2013, Essentials of Metaheuristics, Lulu, second edition
available for free at http://cs.gmu.edu/~sean/book/metaheuristics/
Kalyanmoy Deb, Amrit Pratap, Sameer Agarwal, T. Meyarivan (2002).
A Fast and Elitist Multiobjective Genetic Algorithm: NSGA-II.
Source code in evotorch/algorithms/ga.py
class SteadyStateGA(SearchAlgorithm, SinglePopulationAlgorithmMixin):
"""
A fully elitist genetic algorithm implementation.
For multi-objective problems, the instances of this class
organize their populations into pareto-fronts, and
do pareto-rank-based selections among the solutions,
in a compatible way with the NSGA-II algorithm.
References:
Sean Luke, 2013, Essentials of Metaheuristics, Lulu, second edition
available for free at http://cs.gmu.edu/~sean/book/metaheuristics/
Kalyanmoy Deb, Amrit Pratap, Sameer Agarwal, T. Meyarivan (2002).
A Fast and Elitist Multiobjective Genetic Algorithm: NSGA-II.
"""
def __init__(self, problem: Problem, *, popsize: int, re_evaluate: bool = True):
"""
`__init__(...)`: Initialize the SteadyStateGA.
Args:
problem: The problem to optimize.
popsize: Population size.
re_evaluate: Whether or not to evaluate the solutions
that were already evaluated in the previous generations.
By default, this is set as True.
The reason behind this default setting is that,
in problems where the evaluation procedure is noisy,
by re-evaluating the already-evaluated solutions,
we prevent the bad solutions that were luckily evaluated
from hanging onto the population.
Instead, at every generation, each solution must go through
the evaluation procedure again and prove their worth.
For problems whose evaluation procedures are NOT noisy,
the user might consider turning re_evaluate to False
for saving computational cycles.
"""
SearchAlgorithm.__init__(self, problem)
self._mutation_op: Optional[Callable] = None
self._cross_over_op: Optional[Callable] = None
self._popsize = int(popsize)
self._first_iter: bool = True
self._re_eval = bool(re_evaluate)
self._population = problem.generate_batch(self._popsize)
# GAStatusMixin.__init__(self)
SinglePopulationAlgorithmMixin.__init__(self)
@property
def population(self) -> SolutionBatch:
return self._population
def use(self, operator: Callable):
"""
Use the specified operator.
If the specified operator is a CrossOver instance, then that operator
is registered as the cross-over operator. Otherwise, the operator
is registered as the mutation operator.
Args:
operator: The operator to use.
"""
if isinstance(operator, CrossOver):
self._cross_over_op = operator
else:
self._mutation_op = operator
def _step(self):
if self._first_iter or self._re_eval:
self.problem.evaluate(self._population)
self._first_iter = False
children = self._cross_over_op(self._population)
if self._mutation_op is None:
mutated = children
else:
mutated = self._mutation_op(children)
if mutated is None:
mutated = children
self.problem.evaluate(mutated)
extended = self._population.concat(mutated)
self._population = extended.take_best(self._popsize)
__init__(self, problem, *, popsize, re_evaluate=True)
special
¶
__init__(...)
: Initialize the SteadyStateGA.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
The problem to optimize. |
required |
popsize |
int |
Population size. |
required |
re_evaluate |
bool |
Whether or not to evaluate the solutions that were already evaluated in the previous generations. By default, this is set as True. The reason behind this default setting is that, in problems where the evaluation procedure is noisy, by re-evaluating the already-evaluated solutions, we prevent the bad solutions that were luckily evaluated from hanging onto the population. Instead, at every generation, each solution must go through the evaluation procedure again and prove their worth. For problems whose evaluation procedures are NOT noisy, the user might consider turning re_evaluate to False for saving computational cycles. |
True |
Source code in evotorch/algorithms/ga.py
def __init__(self, problem: Problem, *, popsize: int, re_evaluate: bool = True):
"""
`__init__(...)`: Initialize the SteadyStateGA.
Args:
problem: The problem to optimize.
popsize: Population size.
re_evaluate: Whether or not to evaluate the solutions
that were already evaluated in the previous generations.
By default, this is set as True.
The reason behind this default setting is that,
in problems where the evaluation procedure is noisy,
by re-evaluating the already-evaluated solutions,
we prevent the bad solutions that were luckily evaluated
from hanging onto the population.
Instead, at every generation, each solution must go through
the evaluation procedure again and prove their worth.
For problems whose evaluation procedures are NOT noisy,
the user might consider turning re_evaluate to False
for saving computational cycles.
"""
SearchAlgorithm.__init__(self, problem)
self._mutation_op: Optional[Callable] = None
self._cross_over_op: Optional[Callable] = None
self._popsize = int(popsize)
self._first_iter: bool = True
self._re_eval = bool(re_evaluate)
self._population = problem.generate_batch(self._popsize)
# GAStatusMixin.__init__(self)
SinglePopulationAlgorithmMixin.__init__(self)
use(self, operator)
¶
Use the specified operator.
If the specified operator is a CrossOver instance, then that operator is registered as the cross-over operator. Otherwise, the operator is registered as the mutation operator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
operator |
Callable |
The operator to use. |
required |
Source code in evotorch/algorithms/ga.py
def use(self, operator: Callable):
"""
Use the specified operator.
If the specified operator is a CrossOver instance, then that operator
is registered as the cross-over operator. Otherwise, the operator
is registered as the mutation operator.
Args:
operator: The operator to use.
"""
if isinstance(operator, CrossOver):
self._cross_over_op = operator
else:
self._mutation_op = operator
searchalgorithm
¶
This namespace contains SearchAlgorithm
, the base class for all
evolutionary algorithms.
LazyReporter
¶
This class provides an interface of storing and reporting status. This class is designed to be inherited by other classes.
Let us assume that we have the following class inheriting from LazyReporter:
class Example(LazyReporter):
def __init__(self):
LazyReporter.__init__(self, a=self._get_a, b=self._get_b)
def _get_a(self):
return ... # return the status 'a'
def _get_b(self):
return ... # return the status 'b'
At its initialization phase, this Example class registers its methods
_get_a
and _get_b
as its status providers.
Having the LazyReporter interface, the Example class gains a status
property:
ex = Example()
print(ex.status["a"]) # Get the status 'a'
print(ex.status["b"]) # Get the status 'b'
Once a status is queried, its computation result is stored to be re-used later. After running the code above, if we query the status 'a' again:
then the status 'a' is not computed again (i.e. _get_a
is not
called again). Instead, the stored status value of 'a' is re-used.
To force re-computation of the status values, one can execute:
Or the Example instance can clear its status from within one of its methods:
Source code in evotorch/algorithms/searchalgorithm.py
class LazyReporter:
"""
This class provides an interface of storing and reporting status.
This class is designed to be inherited by other classes.
Let us assume that we have the following class inheriting from
LazyReporter:
```python
class Example(LazyReporter):
def __init__(self):
LazyReporter.__init__(self, a=self._get_a, b=self._get_b)
def _get_a(self):
return ... # return the status 'a'
def _get_b(self):
return ... # return the status 'b'
```
At its initialization phase, this Example class registers its methods
``_get_a`` and ``_get_b`` as its status providers.
Having the LazyReporter interface, the Example class gains a ``status``
property:
```python
ex = Example()
print(ex.status["a"]) # Get the status 'a'
print(ex.status["b"]) # Get the status 'b'
```
Once a status is queried, its computation result is stored to be re-used
later. After running the code above, if we query the status 'a' again:
```python
print(ex.status["a"]) # Getting the status 'a' again
```
then the status 'a' is not computed again (i.e. ``_get_a`` is not
called again). Instead, the stored status value of 'a' is re-used.
To force re-computation of the status values, one can execute:
```python
ex.clear_status()
```
Or the Example instance can clear its status from within one of its
methods:
```python
class Example(LazyReporter):
...
def some_method(self):
...
self.clear_status()
```
"""
@staticmethod
def _missing_status_producer():
return None
def __init__(self, **kwargs):
"""
`__init__(...)`: Initialize the LazyReporter instance.
Args:
kwargs: Keyword arguments, mapping the status keys to the
methods or functions providing the status values.
"""
self.__getters = kwargs
self.__computed = {}
def get_status_value(self, key: Any) -> Any:
"""
Get the specified status value.
Args:
key: The key (i.e. the name) of the status variable.
"""
if key not in self.__computed:
self.__computed[key] = self.__getters[key]()
return self.__computed[key]
def has_status_key(self, key: Any) -> bool:
"""
Return True if there is a status variable with the specified key.
Otherwise, return False.
Args:
key: The key (i.e. the name) of the status variable whose
existence is to be checked.
Returns:
True if there is such a key; False otherwise.
"""
return key in self.__getters
def iter_status_keys(self):
"""Iterate over the status keys."""
return self.__getters.keys()
def clear_status(self):
"""Clear all the stored values of the status variables."""
self.__computed.clear()
def is_status_computed(self, key) -> bool:
"""
Return True if the specified status is computed yet.
Return False otherwise.
Args:
key: The key (i.e. the name) of the status variable.
Returns:
True if the status of the given key is computed; False otherwise.
"""
return key in self.__computed
def update_status(self, additional_status: Mapping):
"""
Update the stored status with an external dict-like object.
The given dict-like object can override existing status keys
with new values, and also bring new keys to the status.
Args:
additional_status: A dict-like object storing the status update.
"""
for k, v in additional_status.items():
if k not in self.__getters:
self.__getters[k] = LazyReporter._missing_status_producer
self.__computed[k] = v
def add_status_getters(self, getters: Mapping):
"""
Register additional status-getting functions.
Args:
getters: A dictionary-like object where the keys are the
additional status variable names, and values are functions
which are expected to compute/retrieve the values for those
status variables.
"""
self.__getters.update(getters)
@property
def status(self) -> "LazyStatusDict":
"""Get a LazyStatusDict which is bound to this LazyReporter."""
return LazyStatusDict(self)
status: LazyStatusDict
property
readonly
¶
Get a LazyStatusDict which is bound to this LazyReporter.
__init__(self, **kwargs)
special
¶
__init__(...)
: Initialize the LazyReporter instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs |
Keyword arguments, mapping the status keys to the methods or functions providing the status values. |
{} |
Source code in evotorch/algorithms/searchalgorithm.py
add_status_getters(self, getters)
¶
Register additional status-getting functions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
getters |
Mapping |
A dictionary-like object where the keys are the additional status variable names, and values are functions which are expected to compute/retrieve the values for those status variables. |
required |
Source code in evotorch/algorithms/searchalgorithm.py
def add_status_getters(self, getters: Mapping):
"""
Register additional status-getting functions.
Args:
getters: A dictionary-like object where the keys are the
additional status variable names, and values are functions
which are expected to compute/retrieve the values for those
status variables.
"""
self.__getters.update(getters)
clear_status(self)
¶
get_status_value(self, key)
¶
Get the specified status value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Any |
The key (i.e. the name) of the status variable. |
required |
Source code in evotorch/algorithms/searchalgorithm.py
has_status_key(self, key)
¶
Return True if there is a status variable with the specified key. Otherwise, return False.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Any |
The key (i.e. the name) of the status variable whose existence is to be checked. |
required |
Returns:
Type | Description |
---|---|
bool |
True if there is such a key; False otherwise. |
Source code in evotorch/algorithms/searchalgorithm.py
def has_status_key(self, key: Any) -> bool:
"""
Return True if there is a status variable with the specified key.
Otherwise, return False.
Args:
key: The key (i.e. the name) of the status variable whose
existence is to be checked.
Returns:
True if there is such a key; False otherwise.
"""
return key in self.__getters
is_status_computed(self, key)
¶
Return True if the specified status is computed yet. Return False otherwise.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
The key (i.e. the name) of the status variable. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the status of the given key is computed; False otherwise. |
Source code in evotorch/algorithms/searchalgorithm.py
iter_status_keys(self)
¶
update_status(self, additional_status)
¶
Update the stored status with an external dict-like object. The given dict-like object can override existing status keys with new values, and also bring new keys to the status.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
additional_status |
Mapping |
A dict-like object storing the status update. |
required |
Source code in evotorch/algorithms/searchalgorithm.py
def update_status(self, additional_status: Mapping):
"""
Update the stored status with an external dict-like object.
The given dict-like object can override existing status keys
with new values, and also bring new keys to the status.
Args:
additional_status: A dict-like object storing the status update.
"""
for k, v in additional_status.items():
if k not in self.__getters:
self.__getters[k] = LazyReporter._missing_status_producer
self.__computed[k] = v
LazyStatusDict (Mapping)
¶
A Mapping subclass used by the status
property of a LazyReporter
.
The interface of this object is similar to a read-only dictionary.
Source code in evotorch/algorithms/searchalgorithm.py
class LazyStatusDict(Mapping):
"""
A Mapping subclass used by the `status` property of a `LazyReporter`.
The interface of this object is similar to a read-only dictionary.
"""
def __init__(self, lazy_reporter: LazyReporter):
"""
`__init__(...)`: Initialize the LazyStatusDict object.
Args:
lazy_reporter: The LazyReporter object whose status is to be
accessed.
"""
super().__init__()
self.__lazy_reporter = lazy_reporter
def __getitem__(self, key: Any) -> Any:
result = self.__lazy_reporter.get_status_value(key)
if isinstance(result, (torch.Tensor, ObjectArray)):
result = as_read_only_tensor(result)
return result
def __len__(self) -> int:
return len(list(self.__lazy_reporter.iter_status_keys()))
def __iter__(self):
for k in self.__lazy_reporter.iter_status_keys():
yield k
def __contains__(self, key: Any) -> bool:
return self.__lazy_reporter.has_status_key(key)
def _to_string(self) -> str:
with io.StringIO() as f:
print("<" + type(self).__name__, file=f)
for k in self.__lazy_reporter.iter_status_keys():
if self.__lazy_reporter.is_status_computed(k):
r = repr(self.__lazy_reporter.get_status_value(k))
else:
r = "<not yet computed>"
print(" ", k, "=", r, file=f)
print(">", end="", file=f)
f.seek(0)
entire_str = f.read()
return entire_str
def __str__(self) -> str:
return self._to_string()
def __repr__(self) -> str:
return self._to_string()
__init__(self, lazy_reporter)
special
¶
__init__(...)
: Initialize the LazyStatusDict object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lazy_reporter |
LazyReporter |
The LazyReporter object whose status is to be accessed. |
required |
SearchAlgorithm (LazyReporter)
¶
Base class for all evolutionary search algorithms.
An algorithm developer is expected to inherit from this base class,
and override the method named _step()
to define how a single
step of this new algorithm is performed.
For each core status dictionary element, a new method is expected
to exist within the inheriting class. These status reporting
methods are then registered via the keyword arguments of the
__init__(...)
method of SearchAlgorithm
.
To sum up, a newly developed algorithm inheriting from this base class is expected in this structure:
from evotorch import Problem
class MyNewAlgorithm(SearchAlgorithm):
def __init__(self, problem: Problem):
SearchAlgorithm.__init__(
self, problem, status1=self._get_status1, status2=self._get_status2, ...
)
def _step(self):
# Code that defines how a step of this algorithm
# should work goes here.
...
def _get_status1(self):
# The value returned by this function will be shown
# in the status dictionary, associated with the key
# 'status1'.
return ...
def _get_status2(self):
# The value returned by this function will be shown
# in the status dictionary, associated with the key
# 'status2'.
return ...
Source code in evotorch/algorithms/searchalgorithm.py
class SearchAlgorithm(LazyReporter):
"""
Base class for all evolutionary search algorithms.
An algorithm developer is expected to inherit from this base class,
and override the method named `_step()` to define how a single
step of this new algorithm is performed.
For each core status dictionary element, a new method is expected
to exist within the inheriting class. These status reporting
methods are then registered via the keyword arguments of the
`__init__(...)` method of `SearchAlgorithm`.
To sum up, a newly developed algorithm inheriting from this base
class is expected in this structure:
```python
from evotorch import Problem
class MyNewAlgorithm(SearchAlgorithm):
def __init__(self, problem: Problem):
SearchAlgorithm.__init__(
self, problem, status1=self._get_status1, status2=self._get_status2, ...
)
def _step(self):
# Code that defines how a step of this algorithm
# should work goes here.
...
def _get_status1(self):
# The value returned by this function will be shown
# in the status dictionary, associated with the key
# 'status1'.
return ...
def _get_status2(self):
# The value returned by this function will be shown
# in the status dictionary, associated with the key
# 'status2'.
return ...
```
"""
def __init__(self, problem: Problem, **kwargs):
"""
Initialize the SearchAlgorithm instance.
Args:
problem: Problem to work with.
kwargs: Any additional keyword argument, in the form of `k=f`,
is accepted in this manner: for each pair of `k` and `f`,
`k` is accepted as the status key (i.e. a status variable
name), and `f` is accepted as a function (probably a method
of the inheriting class) that will generate the value of that
status variable.
"""
super().__init__(**kwargs)
self._problem = problem
self._before_step_hook = Hook()
self._after_step_hook = Hook()
self._log_hook = Hook()
self._steps_count: int = 0
@property
def problem(self) -> Problem:
"""
The problem object which is being worked on.
"""
return self._problem
@property
def before_step_hook(self) -> Hook:
"""
Use this Hook to add more behavior to the search algorithm
to be performed just before executing a step.
"""
return self._before_step_hook
@property
def after_step_hook(self) -> Hook:
"""
Use this Hook to add more behavior to the search algorithm
to be performed just after executing a step.
The dictionaries returned by the functions registered into
this Hook will be accumulated and added into the status
dictionary of the search algorithm.
"""
return self._after_step_hook
@property
def log_hook(self) -> Hook:
"""
Use this Hook to add more behavior to the search algorithm
at the moment of logging the constructed status dictionary.
This Hook is executed after the execution of `after_step_hook`
is complete.
The functions in this Hook are assumed to expect a single
argument, that is the status dictionary of the search algorithm.
"""
return self._log_hook
@property
def steps_count(self) -> int:
"""
Number of search steps performed.
This is equivalent to the number of generations, or to the
number of iterations.
"""
return self._steps_count
def step(self):
"""
Perform a step of the search algorithm.
"""
self._before_step_hook()
self.clear_status()
self._step()
self._steps_count += 1
self.update_status({"iter": self._steps_count})
self.update_status(self._problem.status)
extra_status = self._after_step_hook.accumulate_dict()
self.update_status(extra_status)
if len(self._log_hook) >= 1:
self._log_hook(dict(self.status))
def _step(self):
"""
Algorithm developers are expected to override this method
in an inheriting subclass.
The code which defines how a step of the evolutionary algorithm
is performed goes here.
"""
raise NotImplementedError
def run(self, num_generations: int):
"""
Run the algorithm for the given number of generations
(i.e. iterations).
Args:
num_generations: Number of generations.
"""
for _ in range(int(num_generations)):
self.step()
after_step_hook: Hook
property
readonly
¶
Use this Hook to add more behavior to the search algorithm to be performed just after executing a step.
The dictionaries returned by the functions registered into this Hook will be accumulated and added into the status dictionary of the search algorithm.
before_step_hook: Hook
property
readonly
¶
Use this Hook to add more behavior to the search algorithm to be performed just before executing a step.
log_hook: Hook
property
readonly
¶
Use this Hook to add more behavior to the search algorithm at the moment of logging the constructed status dictionary.
This Hook is executed after the execution of after_step_hook
is complete.
The functions in this Hook are assumed to expect a single argument, that is the status dictionary of the search algorithm.
problem: Problem
property
readonly
¶
The problem object which is being worked on.
steps_count: int
property
readonly
¶
Number of search steps performed.
This is equivalent to the number of generations, or to the number of iterations.
__init__(self, problem, **kwargs)
special
¶
Initialize the SearchAlgorithm instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
problem |
Problem |
Problem to work with. |
required |
kwargs |
Any additional keyword argument, in the form of |
{} |
Source code in evotorch/algorithms/searchalgorithm.py
def __init__(self, problem: Problem, **kwargs):
"""
Initialize the SearchAlgorithm instance.
Args:
problem: Problem to work with.
kwargs: Any additional keyword argument, in the form of `k=f`,
is accepted in this manner: for each pair of `k` and `f`,
`k` is accepted as the status key (i.e. a status variable
name), and `f` is accepted as a function (probably a method
of the inheriting class) that will generate the value of that
status variable.
"""
super().__init__(**kwargs)
self._problem = problem
self._before_step_hook = Hook()
self._after_step_hook = Hook()
self._log_hook = Hook()
self._steps_count: int = 0
run(self, num_generations)
¶
Run the algorithm for the given number of generations (i.e. iterations).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
num_generations |
int |
Number of generations. |
required |
step(self)
¶
Perform a step of the search algorithm.
Source code in evotorch/algorithms/searchalgorithm.py
def step(self):
"""
Perform a step of the search algorithm.
"""
self._before_step_hook()
self.clear_status()
self._step()
self._steps_count += 1
self.update_status({"iter": self._steps_count})
self.update_status(self._problem.status)
extra_status = self._after_step_hook.accumulate_dict()
self.update_status(extra_status)
if len(self._log_hook) >= 1:
self._log_hook(dict(self.status))
SinglePopulationAlgorithmMixin
¶
A mixin class that can be inherited by a SearchAlgorithm subclass.
This mixin class assumes that the inheriting class has the following members:
problem
: The problem object that is associated with the search algorithm. This attribute is already provided by the SearchAlgorithm base class.population
: An attribute or a (possibly read-only) property which stores the population of the search algorithm as aSolutionBatch
instance.
This mixin class also assumes that the inheriting class might
contain an attribute (or a property) named obj_index
.
If there is such an attribute and its value is not None, then this
mixin class assumes that obj_index
represents the index of the
objective that is being focused on.
Upon initialization, this mixin class first determines whether or not
the algorithm is a single-objective one.
In more details, if there is an attribute named obj_index
(and its
value is not None), or if the associated problem has only one objective,
then this mixin class assumes that the inheriting SearchAlgorithm is a
single objective algorithm.
Otherwise, it is assumed that the underlying algorithm works (or might
work) on multiple objectives.
In the single-objective case, this mixin class brings the inheriting
SearchAlgorithm the ability to report the following:
pop_best
(best solution of the population),
pop_best_eval
(evaluation result of the population's best solution),
mean_eval
(mean evaluation result of the population),
median_eval
(median evaluation result of the population).
In the multi-objective case, for each objective i
, this mixin class
brings the inheriting SearchAlgorithm the ability to report the following:
obj<i>_pop_best
(best solution of the population according),
obj<i>_pop_best_eval
(evaluation result of the population's best
solution),
obj<i>_mean_eval
(mean evaluation result of the population)
obj<iP_median_eval
(median evaluation result of the population).
Source code in evotorch/algorithms/searchalgorithm.py
class SinglePopulationAlgorithmMixin:
"""
A mixin class that can be inherited by a SearchAlgorithm subclass.
This mixin class assumes that the inheriting class has the following
members:
- `problem`: The problem object that is associated with the search
algorithm. This attribute is already provided by the SearchAlgorithm
base class.
- `population`: An attribute or a (possibly read-only) property which
stores the population of the search algorithm as a `SolutionBatch`
instance.
This mixin class also assumes that the inheriting class _might_
contain an attribute (or a property) named `obj_index`.
If there is such an attribute and its value is not None, then this
mixin class assumes that `obj_index` represents the index of the
objective that is being focused on.
Upon initialization, this mixin class first determines whether or not
the algorithm is a single-objective one.
In more details, if there is an attribute named `obj_index` (and its
value is not None), or if the associated problem has only one objective,
then this mixin class assumes that the inheriting SearchAlgorithm is a
single objective algorithm.
Otherwise, it is assumed that the underlying algorithm works (or might
work) on multiple objectives.
In the single-objective case, this mixin class brings the inheriting
SearchAlgorithm the ability to report the following:
`pop_best` (best solution of the population),
`pop_best_eval` (evaluation result of the population's best solution),
`mean_eval` (mean evaluation result of the population),
`median_eval` (median evaluation result of the population).
In the multi-objective case, for each objective `i`, this mixin class
brings the inheriting SearchAlgorithm the ability to report the following:
`obj<i>_pop_best` (best solution of the population according),
`obj<i>_pop_best_eval` (evaluation result of the population's best
solution),
`obj<i>_mean_eval` (mean evaluation result of the population)
`obj<iP_median_eval` (median evaluation result of the population).
"""
class ObjectiveStatusReporter:
REPORTABLES = {"pop_best", "pop_best_eval", "mean_eval", "median_eval"}
def __init__(
self,
algorithm: SearchAlgorithm,
*,
obj_index: int,
to_report: str,
):
self.__algorithm = algorithm
self.__obj_index = int(obj_index)
if to_report not in self.REPORTABLES:
raise ValueError(f"Unrecognized report request: {to_report}")
self.__to_report = to_report
@property
def population(self) -> SolutionBatch:
return self.__algorithm.population
@property
def obj_index(self) -> int:
return self.__obj_index
def get_status_value(self, status_key: str) -> Any:
return self.__algorithm.get_status_value(status_key)
def has_status_key(self, status_key: str) -> bool:
return self.__algorithm.has_status_key(status_key)
def _get_pop_best(self):
i = self.population.argbest(self.obj_index)
return clone(self.population[i])
def _get_pop_best_eval(self):
pop_best = None
pop_best_keys = ("pop_best", f"obj{self.obj_index}_pop_best")
for pop_best_key in pop_best_keys:
if self.has_status_key(pop_best_key):
pop_best = self.get_status_value(pop_best_key)
break
if (pop_best is not None) and pop_best.is_evaluated:
return float(pop_best.evals[self.obj_index])
else:
return None
@torch.no_grad()
def _get_mean_eval(self):
return float(torch.mean(self.population.access_evals(self.obj_index)))
@torch.no_grad()
def _get_median_eval(self):
return float(torch.median(self.population.access_evals(self.obj_index)))
def __call__(self):
return getattr(self, "_get_" + self.__to_report)()
def __init__(self, *, exclude: Optional[Iterable] = None, enable: bool = True):
if not enable:
return
ObjectiveStatusReporter = self.ObjectiveStatusReporter
reportables = ObjectiveStatusReporter.REPORTABLES
single_obj: Optional[int] = None
self.__exclude = set() if exclude is None else set(exclude)
if hasattr(self, "obj_index") and (self.obj_index is not None):
single_obj = self.obj_index
elif len(self.problem.senses) == 1:
single_obj = 0
if single_obj is not None:
for reportable in reportables:
if reportable not in self.__exclude:
self.add_status_getters(
{reportable: ObjectiveStatusReporter(self, obj_index=single_obj, to_report=reportable)}
)
else:
for i_obj in range(len(self.problem.senses)):
for reportable in reportables:
if reportable not in self.__exclude:
self.add_status_getters(
{
f"obj{i_obj}_{reportable}": ObjectiveStatusReporter(
self, obj_index=i_obj, to_report=reportable
),
}
)