Skip to content

Vecgymne

VecGymNE (BaseNEProblem)

An EvoTorch problem for solving vectorized gym environments

Source code in evotorch/neuroevolution/vecgymne.py
class VecGymNE(BaseNEProblem):
    """
    An EvoTorch problem for solving vectorized gym environments
    """

    def __init__(
        self,
        env: Union[str, Callable],
        network: Union[str, Callable, nn.Module],
        *,
        env_config: Optional[Mapping] = None,
        max_num_envs: Optional[int] = None,
        network_args: Optional[Mapping] = None,
        observation_normalization: bool = False,
        decrease_rewards_by: Optional[float] = None,
        alive_bonus_schedule: Optional[tuple] = None,
        action_noise_stdev: Optional[float] = None,
        num_episodes: int = 1,
        device: Optional[Device] = None,
        num_actors: Optional[Union[int, str]] = None,
        num_gpus_per_actor: Optional[int] = None,
        num_subbatches: Optional[int] = None,
        subbatch_size: Optional[int] = None,
        actor_config: Optional[Mapping] = None,
    ):
        """
        Initialize the VecGymNE.

        Args:
            env: Environment to be solved.
                If this is given as a string starting with "gym::" (e.g.
                "gym::Humanoid-v4", etc.), then it is assumed that the target
                environment is a classical gym environment.
                If this is given as a string starting with "brax::" (e.g.
                "brax::humanoid", etc.), then it is assumed that the target
                environment is a brax environment.
                If this is given as a string which does not contain "::" at
                all (e.g. "Humanoid-v4", etc.), then it is assumed that the
                target environment is a classical gym environment. Therefore,
                "gym::Humanoid-v4" and "Humanoid-v4" are equivalent.
                If this argument is given as a Callable (maybe a function or a
                class), then, with the assumption that this Callable expects
                a keyword argument `num_envs: int`, this Callable is called
                and its result (expected as a `gym.vector.VectorEnv` instance)
                is used as the environment.
            network: A network structure string, or a Callable (which can be
                a class inheriting from `torch.nn.Module`, or a function
                which returns a `torch.nn.Module` instance), or an instance
                of `torch.nn.Module`.
                The object provided here determines the structure of the
                neural network whose parameters will be evolved.
                A network structure string is a string which can be processed
                by `evotorch.neuroevolution.net.str_to_net(...)`.
                Please see the documentation of the function
                `evotorch.neuroevolution.net.str_to_net(...)` to see how such
                a neural network structure string looks like.
                Note that this network can be a recurrent network.
                When the network's `forward(...)` method can optionally accept
                an additional positional argument for the hidden state of the
                network and returns an additional value for its next state,
                then the policy is treated as a recurrent one.
                When the network is given as a callable object (e.g.
                a subclass of `nn.Module` or a function) and this callable
                object is decorated via `evotorch.decorators.pass_info`,
                the following keyword arguments will be passed:
                (i) `obs_length` (the length of the observation vector),
                (ii) `act_length` (the length of the action vector),
                (iii) `obs_shape` (the shape tuple of the observation space),
                (iv) `act_shape` (the shape tuple of the action space),
                (v) `obs_space` (the Box object specifying the observation
                space, and
                (vi) `act_space` (the Box object specifying the action
                space). Note that `act_space` will always be given as a
                `gym.spaces.Box` instance, even when the actual gym
                environment has a discrete action space. This because
                `VecGymNE` always expects the neural network to return
                a tensor of floating-point numbers.
            env_config: Keyword arguments to pass to the environment while
                it is being created.
            max_num_envs: Maximum number of environments to be instantiated.
                By default, this is None, which means that the number of
                environments can go up to the population size (or up to the
                number of solutions that a remote actor receives, if the
                problem object is configured to have parallelization).
                For situations where the current reinforcement learning task
                requires large amount of resources (e.g. memory), allocating
                environments as much as the number of solutions might not
                be feasible. In such cases, one can set `max_num_envs` as an
                integer to bring an upper bound (in total, across all the
                remote actors, for when the problem is parallelized) to how
                many environments can be allocated.
            network_args: Any additional keyword argument to be used when
                instantiating the network can be specified via `network_args`
                as a dictionary. If there are no such additional keyword
                arguments, then `network_args` can be left as None.
                Note that the argument `network_args` is expected to be None
                when the network is specified as a `torch.nn.Module` instance.
            observation_normalization: Whether or not online normalization
                will be done on the encountered observations.
            decrease_rewards_by: If given as a float, each reward will be
                decreased by this amount. For example, if the environment's
                reward function has a constant "alive bonus" (i.e. a bonus
                that is constantly added onto the reward as long as the
                agent is alive), and if you wish to negate this bonus,
                you can set `decrease_rewards_by` to this bonus amount,
                and the bonus will be nullified.
                If you do not wish to affect the rewards in this manner,
                keep this as None.
            alive_bonus_schedule: Use this to add a customized amount of
                alive bonus.
                If left as None (which is the default), additional alive
                bonus will not be added.
                If given as a tuple `(t, b)`, an alive bonus `b` will be
                added onto all the rewards beyond the timestep `t`.
                If given as a tuple `(t0, t1, b)`, a partial (linearly
                increasing towards `b`) alive bonus will be added onto
                all the rewards between the timesteps `t0` and `t1`,
                and a full alive bonus (which equals to `b`) will be added
                onto all the rewards beyond the timestep `t1`.
            action_noise_stdev: If given as a real number `s`, then, for
                each generated action, Gaussian noise with standard
                deviation `s` will be sampled, and then this sampled noise
                will be added onto the action.
                If action noise is not desired, then this argument can be
                left as None.
                For sampling the noise, the global random number generator
                of PyTorch on the simulator's device will be used.
            num_episodes: Number of episodes over which each policy will
                be evaluated. The default is 1.
            device: The device in which the population will be kept.
                If you wish to do a single-GPU evolution, we recommend
                to set this as "cuda" (or "cuda:0", or "cuda:1", etc.),
                assuming that the simulator will also instantiate itself
                on that same device.
                Alternatively, if you wish to do a multi-GPU evolution,
                we recommend to leave this as None or set this as "cpu",
                so that the main population will be kept on the cpu
                and the remote actors will perform their evaluations on
                the GPUs that are assigned to them.
            num_actors: Number of actors to create for parallelized
                evaluation of the solutions.
                Certain string values are also accepted.
                When given as "max" or as "num_cpus", the number of actors
                will be equal to the number of all available CPUs in the ray
                cluster.
                When given as "num_gpus", the number of actors will be
                equal to the number of all available GPUs in the ray
                cluster, and each actor will be assigned a GPU.
                When given as "num_devices", the number of actors will be
                equal to the minimum among the number of CPUs and the number
                of GPUs available in the cluster (or will be equal to the
                number of CPUs if there is no GPU), and each actor will be
                assigned a GPU (if available).
                If `num_actors` is given as "num_gpus" or "num_devices",
                the argument `num_gpus_per_actor` must not be used,
                and the `actor_config` dictionary must not contain the
                key "num_gpus".
                If `num_actors` is given as something other than "num_gpus"
                or "num_devices", and if you wish to assign GPUs to each
                actor, then please see the argument `num_gpus_per_actor`.
            num_gpus_per_actor: Number of GPUs to be assigned to each
                actor. This can be an integer or a float (for when you
                wish to assign fractional amounts of GPUs to actors).
                When `num_actors` has the special value "num_devices",
                the argument `num_gpus_per_actor` is expected to be left as
                None.
            num_subbatches: For when there are multiple actors, you can
                set this to an integer n if you wish the population
                to be divided exactly into n sub-batches. The actors, as they
                finish their currently assigned sub-batch of solutions,
                will pick the next un-evaluated sub-batch.
                If you specify too large numbers for this argument, then
                each sub-batch will be smaller.
                When working with vectorized simulators on GPU, having too
                many and too small sub-batches can hurt the performance.
                This argument can be left as None, in which case, assuming
                that `subbatch_size` is also None, the population will be
                split to m sub-batches, m being the number of actors.
            subbatch_size: For when there are multiple actors, you can
                set this to an integer n if you wish the population to be
                divided into sub-batches in such a way that each sub-batch
                will consist of exactly n solutions. The actors, as they
                finish their currently assigned sub-batch of solutions,
                will pick the next un-evaluated sub-batch.
                If you specify too small numbers for this argument, then
                there will be many sub-batches, each sub-batch having a
                small number of solutions.
                When working with vectorized simulators on GPU, having too
                many and too small sub-batches can hurt the performance.
                This argument can be left as None, in which case, assuming
                that `num_subbatches` is also None, the population will be
                split to m sub-batches, m being the number of actors.
            actor_config: Additional configuration to be used when creating
                each actor with the help of `ray` library.
                Can be left as None if additional configuration is not needed.
        """

        # Store the string or the Callable that will be used to generate the reinforcement learning environment.
        self._env_maker = env

        # Declare the variable which will store the environment.
        self._env: Optional[TorchWrapper] = None

        # Declare the variable which will store the batch size of the vectorized environment.
        self._num_envs: Optional[int] = None

        # Store the upper bound (if any) regarding how many environments can exist at the same time.
        self._max_num_envs: Optional[int] = None if max_num_envs is None else int(max_num_envs)

        # Actor-specific upper bound regarding how many environments can exist at the same time.
        # This variable will be filled by the `_parallelize(...)` method.
        self._actor_max_num_envs: Optional[int] = None

        # Declare the variable which stores whether or not we properly initialized the `_actor_max_num_envs` variable.
        self._actor_max_num_envs_ready: bool = False

        # Store the additional configurations to be used as keyword arguments while instantiating the environment.
        self._env_config: dict = {} if env_config is None else dict(env_config)

        # Declare the variable that will store the device of the simulator.
        # This variable will be filled when the first observation is received from the environment.
        # The device of the observation array received from the environment will determine the value of this variable.
        self._simulator_device: Optional[torch.device] = None

        # Store the neural network architecture (that might be a string or an `nn.Module` instance).
        self._architecture = network

        if network_args is None:
            # If `network_args` is given as None, change it to an empty dictionary
            network_args = {}

        if isinstance(network, str):
            # If the network is given as a string, then we will need the values for the constants `obs_length`,
            # `act_length`, and `obs_space`. To obtain those values, we use our helper function
            # `_env_constants_for_str_net(...)` which temporarily instantiates the specified environment and returns
            # its needed constants.
            env_constants = _env_constants_for_str_net(self._env_maker, **(self._env_config))
        elif isinstance(network, nn.Module):
            # If the network is an already instantiated nn.Module, then we do not prepare any pre-defined constants.
            env_constants = {}
        else:
            # If the network is given as a Callable, then we will need the values for the constants `obs_length`,
            # `act_length`, and `obs_space`. To obtain those values, we use our helper function
            # `_env_constants_for_callable_net(...)` which temporarily instantiates the specified environment and
            # returns its needed constants.
            env_constants = _env_constants_for_callable_net(self._env_maker, **(self._env_config))

        # Build a `Policy` instance according to the given architecture, and store it.
        if isinstance(network, str):
            instantiated_net = str_to_net(network, **{**env_constants, **network_args})
        elif isinstance(network, nn.Module):
            instantiated_net = network
        else:
            instantiated_net = pass_info_if_needed(network, env_constants)(**network_args)
        self._policy = Policy(instantiated_net)

        # Store the boolean which indicates whether or not there will be observation normalization.
        self._observation_normalization = bool(observation_normalization)

        # Declare the variables that will store the observation-related stats if observation normalization is enabled.
        self._obs_stats: Optional[RunningNorm] = None
        self._collected_stats: Optional[RunningNorm] = None

        # Store the number of episodes configuration given by the user.
        self._num_episodes = int(num_episodes)

        # Store the `decrease_rewards_by` configuration given by the user.
        self._decrease_rewards_by = None if decrease_rewards_by is None else float(decrease_rewards_by)

        if alive_bonus_schedule is None:
            # If `alive_bonus_schedule` argument is None, then we store it as None as well.
            self._alive_bonus_schedule = None
        else:
            # This is the case where the user has specified an `alive_bonus_schedule`.
            alive_bonus_schedule = list(alive_bonus_schedule)
            alive_bonus_schedule_length = len(alive_bonus_schedule)
            if alive_bonus_schedule_length == 2:
                # If `alive_bonus_schedule` was given as a 2-element sequence (t, b), then store it as (t, t, b).
                # This means that the partial alive bonus time window starts and ends at t, therefore, there will
                # be no alive bonus until t, and beginning with t, there will be full alive bonus.
                self._alive_bonus_schedule = [
                    int(alive_bonus_schedule[0]),
                    int(alive_bonus_schedule[0]),
                    float(alive_bonus_schedule[1]),
                ]
            elif alive_bonus_schedule_length == 3:
                # If `alive_bonus_schedule` was given as a 3-element sequence (t0, t1, b), then store those 3
                # elements.
                self._alive_bonus_schedule = [
                    int(alive_bonus_schedule[0]),
                    int(alive_bonus_schedule[1]),
                    float(alive_bonus_schedule[2]),
                ]
            else:
                # `alive_bonus_schedule` sequences with unrecognized lengths trigger an error.
                raise ValueError(
                    f"Received invalid number elements as the alive bonus schedule."
                    f" Expected 2 or 3 items, but got these: {self._alive_bonus_schedule}"
                    f" (having a length of {len(self._alive_bonus_schedule)})."
                )

        # If `action_noise_stdev` is specified, store it.
        self._action_noise_stdev = None if action_noise_stdev is None else float(action_noise_stdev)

        # Initialize the counters for the number of simulator interactions and the number of episodes.
        self._interaction_count: int = 0
        self._episode_count: int = 0

        # Call the superclass
        super().__init__(
            objective_sense="max",
            initial_bounds=(-0.00001, 0.00001),
            solution_length=self._policy.parameter_length,
            device=device,
            dtype=torch.float32,
            num_actors=num_actors,
            num_gpus_per_actor=num_gpus_per_actor,
            actor_config=actor_config,
            num_subbatches=num_subbatches,
            subbatch_size=subbatch_size,
        )

    def _parallelize(self):
        super()._parallelize()
        if self.is_main:
            if not self._actor_max_num_envs_ready:
                if self._actors is None:
                    self._actor_max_num_envs = self._max_num_envs
                else:
                    if self._max_num_envs is not None:
                        max_num_envs_per_actor = split_workload(self._max_num_envs, len(self._actors))
                        for i_actor, actor in enumerate(self._actors):
                            actor.call.remote("_set_actor_max_num_envs", max_num_envs_per_actor[i_actor])
                self._actor_max_num_envs_ready = True

    def _set_actor_max_num_envs(self, n: int):
        self._actor_max_num_envs = n
        self._actor_max_num_envs_ready = True

    @property
    def observation_normalization(self) -> bool:
        return self._observation_normalization

    def set_episode_count(self, n: int):
        """
        Set the episode count manually.
        """
        self._episode_count = int(n)

    def set_interaction_count(self, n: int):
        """
        Set the interaction count manually.
        """
        self._interaction_count = int(n)

    @property
    def interaction_count(self) -> int:
        """
        Get the total number of simulator interactions made.
        """
        return self._interaction_count

    @property
    def episode_count(self) -> int:
        """
        Get the total number of episodes completed.
        """
        return self._episode_count

    def _get_local_episode_count(self) -> int:
        return self.episode_count

    def _get_local_interaction_count(self) -> int:
        return self.interaction_count

    def _get_env(self, num_policies: int) -> TorchWrapper:
        # Get the existing environment instance stored by this VecGymNE, after (re)building it if needed.

        if (self._env is None) or (num_policies > self._num_envs):
            # If this VecGymNE does not have its environment ready yet (i.e. the `_env` attribute is None)
            # or it the batch size of the previously instantiated environment is not enough to deal with
            # the number of policies (i.e. the `_num_envs` attribute is less than `num_policies`), then
            # we (re)build the environment.

            # Keyword arguments to pass to the TorchWrapper.
            torch_wrapper_cfg = dict(
                force_classic_api=True,
                discrete_to_continuous_act=True,
                clip_actions=True,
            )

            if isinstance(self._env_maker, str):
                # If the environment is specified via a string, then we use our `make_vector_env` function.
                self._env = make_vector_env(
                    self._env_maker, num_envs=num_policies, **torch_wrapper_cfg, **(self._env_config)
                )
            else:
                # If the environment is specified via a Callable, then we call it.
                # We expect this Callable to accept a keyword argument named `num_envs`, and additionally, we pass
                # the environment configuration dictionary as keyword arguments.
                self._env = self._env_maker(num_envs=num_policies, **(self._env_config))

                if not isinstance(self._env, gym.vector.VectorEnv):
                    # If what is returned by the Callable is not a vectorized environment, then we trigger an error.
                    raise TypeError("This is not a vectorized environment")

                # We wrap the returned vectorized environment with a TorchWrapper, so that the actions that we send
                # and the observations and rewards that we receive are PyTorch tensors.
                self._env = TorchWrapper(self._env, **torch_wrapper_cfg)

            if self._env.num_envs != num_policies:
                # If the finally obtained vectorized environment has a different number of batch size, then we trigger
                # an error.
                raise ValueError("Incompatible number of environments")

            # We update the batch size of the created environment.
            self._num_envs = num_policies

            if not isinstance(self._env.single_observation_space, Box):
                # If the observation space is not Box, then we trigger an error.
                raise TypeError(
                    f"Unsupported observation type: {self._env.single_observation_space}."
                    f" Only Box-typed observation spaces are supported."
                )

            try:
                # If possible, use the `seed(...)` method to explicitly randomize the environment.
                # Although the new gym API removed the seed method, some environments define their own `seed(...)`
                # method for randomization.
                new_seed = random.randint(0, (2**32) - 1)
                self._env.seed(new_seed)
            except Exception:
                # Our attempt at manually seeding the environment has failed.
                # This could be because the environment does not have a `seed(...)` method.
                # Nothing to do.
                pass

        return self._env

    @property
    def _nonserialized_attribs(self):
        # Call the `_nonserialized_attribs` property implementation of the superclass to receive the base list
        # of non-serialized attributes, then add "_env" to this base list, and then return the resulting list.
        return super()._nonserialized_attribs + ["_env"]

    @property
    def _grad_device(self) -> torch.device:
        # For distributed mode, this property determines the device in which the temporary populations will be made
        # for gradient computation.

        if self._simulator_device is None:
            # If the simulator device is not known yet, then we return the cpu device.
            return torch.device("cpu")
        else:
            # If the simulator device is known, then we return that device.
            return self._simulator_device

    def _make_running_norm(self, observation: torch.Tensor) -> RunningNorm:
        # Make a new RunningNorm instance according to the observation tensor.
        # The dtype and the device of the new RunningNorm is taken from the observation.
        # This new RunningNorm is empty (i.e. does not contain any stats yet).
        return RunningNorm(shape=observation.shape[1:], dtype=observation.dtype, device=observation.device)

    def _transfer_running_norm(self, rn: RunningNorm, observation: torch.Tensor) -> RunningNorm:
        # Transfer (if necessary) the RunningNorm to the device of the observation tensor.
        # The returned RunningNorm may be the RunningNorm itself (if the device did not change)
        # or a new copy (if the device did change).
        if torch.device(rn.device) != torch.device(observation.device):
            rn = rn.to(observation.device)
        return rn

    def _normalize_observation(
        self, observation: torch.Tensor, *, mask: Optional[torch.Tensor] = None, update_stats: bool = True
    ) -> torch.Tensor:
        # This function normalizes the received observation batch.
        # If a mask is given (as a tensor of booleans), only observations with corresponding mask value set as True
        # will be taken into consideration.
        # If `update_stats` is given as True and observation normalization is enabled, then we will update the
        # RunningNorm instances as well.

        if self._observation_normalization:
            # This is the case where observation normalization is enabled.
            if self._obs_stats is None:
                # If we do not have observation stats yet, we build a new one (according to the dtype and device
                # of the observation).
                self._obs_stats = self._make_running_norm(observation)
            else:
                # If we already have observation stats, we make sure that it is in the correct device.
                self._obs_stats = self._transfer_running_norm(self._obs_stats, observation)

            if update_stats:
                # This is the case where the `update_stats` argument was encountered as True.
                if self._collected_stats is None:
                    # If the RunningNorm responsible to collect new stats is not built yet, we build it here
                    # (according to the dtype and device of the observation).
                    self._collected_stats = self._make_running_norm(observation)
                else:
                    # If the RunningNorm responsible to collect new stats already exists, then we make sure
                    # that it is in the correct device.
                    self._collected_stats = self._transfer_running_norm(self._collected_stats, observation)

                # We first update the RunningNorm responsible for collecting the new stats.
                self._collected_stats.update(observation, mask)

                # We now update the RunningNorm which stores all the stats, and return the normalized observation.
                result = self._obs_stats.update_and_normalize(observation, mask)
            else:
                # This is the case where the `update_stats` argument was encountered as False.
                # Here we normalize the observation but do not update our existing RunningNorm instances.
                result = self._obs_stats.update(observation, mask)
            return result
        else:
            # This is the case where observation normalization is disabled.
            # In this case, we just return the observation as it is.
            return observation

    def _ensure_obsnorm(self):
        if not self.observation_normalization:
            raise ValueError("This feature can only be used when observation_normalization=True.")

    def get_observation_stats(self) -> RunningNorm:
        """Get the observation stats"""
        self._ensure_obsnorm()
        return self._obs_stats

    def _make_sync_data_for_actors(self) -> Any:
        if self.observation_normalization:
            obs_stats = self.get_observation_stats()
            if obs_stats is not None:
                obs_stats = obs_stats.to("cpu")
            return dict(obs_stats=obs_stats)
        else:
            return None

    def set_observation_stats(self, rn: RunningNorm):
        """Set the observation stats"""
        self._ensure_obsnorm()
        self._obs_stats = rn

    def _use_sync_data_from_main(self, received: dict):
        for k, v in received.items():
            if k == "obs_stats":
                self.set_observation_stats(v)

    def pop_observation_stats(self) -> RunningNorm:
        """Get and clear the collected observation stats"""
        self._ensure_obsnorm()
        result = self._collected_stats
        self._collected_stats = None
        return result

    def _make_sync_data_for_main(self) -> Any:
        result = dict(episode_count=self.episode_count, interaction_count=self.interaction_count)

        if self.observation_normalization:
            collected = self.pop_observation_stats()
            if collected is not None:
                collected = collected.to("cpu")
            result["obs_stats_delta"] = collected

        return result

    def update_observation_stats(self, rn: RunningNorm):
        """Update the observation stats via another RunningNorm instance"""
        self._ensure_obsnorm()
        if self._obs_stats is None:
            self._obs_stats = rn
        else:
            self._obs_stats.update(rn)

    def _use_sync_data_from_actors(self, received: list):
        total_episode_count = 0
        total_interaction_count = 0

        for data in received:
            data: dict
            total_episode_count += data["episode_count"]
            total_interaction_count += data["interaction_count"]
            if self.observation_normalization:
                self.update_observation_stats(data["obs_stats_delta"])

        self.set_episode_count(total_episode_count)
        self.set_interaction_count(total_interaction_count)

    def _make_pickle_data_for_main(self) -> dict:
        # For when the main Problem object (the non-remote one) gets pickled,
        # this function returns the counters of this remote Problem instance,
        # to be sent to the main one.
        return dict(interaction_count=self.interaction_count, episode_count=self.episode_count)

    def _use_pickle_data_from_main(self, state: dict):
        # For when a newly unpickled Problem object gets (re)parallelized,
        # this function restores the inner states specific to this remote
        # worker. In the case of GymNE, those inner states are episode
        # and interaction counters.
        for k, v in state.items():
            if k == "episode_count":
                self.set_episode_count(v)
            elif k == "interaction_count":
                self.set_interaction_count(v)
            else:
                raise ValueError(f"When restoring the inner state of a remote worker, unrecognized state key: {k}")

    def _evaluate_batch(self, batch: SolutionBatch):
        if self._actor_max_num_envs is None:
            self._evaluate_subbatch(batch)
        else:
            subbatches = batch.split(max_size=self._actor_max_num_envs)
            for subbatch in subbatches:
                self._evaluate_subbatch(subbatch)

    def _evaluate_subbatch(self, batch: SolutionBatch):
        # Get the number of solutions and the solution batch from the shape of the batch.
        num_solutions, solution_length = batch.values_shape

        # Get (possibly after (re)building) the environment object.
        env = self._get_env(num_solutions)

        # Reset the environment and receive the first observation batch.
        obs_per_env = env.reset()

        # Update the simulator device according to the device of the observation batch received.
        self._simulator_device = obs_per_env.device

        # Get the number of environments.
        num_envs = obs_per_env.shape[0]

        # Transfer (if necessary) the solutions (which are the network parameters) to the simulator device.
        batch_values = batch.values.to(self._simulator_device)

        if num_solutions == num_envs:
            # If the number of solutions is equal to the number of environments, then we declare all of the solutions
            # as the network parameters, and we declare all of these environments active.
            params_per_env = batch_values
            active_per_env = torch.ones(num_solutions, dtype=torch.bool, device=self._simulator_device)
        elif num_solutions < num_envs:
            # If the number of solutions is less than the number of environments, then we allocate a new empty
            # tensor to represent the network parameters.
            params_per_env = torch.empty((num_envs, solution_length), dtype=batch.dtype, device=self._simulator_device)

            # The first `num_solutions` rows of this new parameters tensor is filled with the values of the solutions.
            params_per_env[:num_solutions, :] = batch_values

            # The remaining parameters become the clones of the first solution.
            params_per_env[num_solutions:, :] = batch_values[0]

            # At first, all the environments are declared as inactive.
            active_per_env = torch.zeros(num_envs, dtype=torch.bool, device=self._simulator_device)

            # Now, the first `num_solutions` amount of environments is declared as active.
            # The remaining ones remain inactive.
            active_per_env[:num_solutions] = True
        else:
            assert False, "Received incompatible number of environments"

        # We get the policy and fill it with the parameters stored by the solutions.
        policy = self._policy
        policy.set_parameters(params_per_env)

        # Declare the counter which stores the total timesteps encountered during this evaluation.
        total_timesteps = 0

        # Declare the counters (one for each environment) storing the number of episodes completed.
        num_eps_per_env = torch.zeros(num_envs, dtype=torch.int64, device=self._simulator_device)

        # Declare the scores (one for each environment).
        score_per_env = torch.zeros(num_envs, dtype=torch.float32, device=self._simulator_device)

        if self._alive_bonus_schedule is not None:
            # If an alive_bonus_schedule was provided, then we extract the timesteps.
            # bonus_t0 is the timestep where the partial alive bonus will start.
            # bonus_t1 is the timestep where the full alive bonus will start.
            # alive_bonus is the amount that will be added to reward if the agent is alive.
            bonus_t0, bonus_t1, alive_bonus = self._alive_bonus_schedule

            if bonus_t1 > bonus_t0:
                # If bonus_t1 is bigger than bonus_t0, then we have a partial alive bonus time window.
                add_partial_alive_bonus = True

                # We compute and store the length of the time window.
                bonus_t_gap_as_float = float(bonus_t1 - bonus_t0)
            else:
                # If bonus_t1 is NOT bigger than bonus_t0, then we do NOT have a partial alive bonus time window.
                add_partial_alive_bonus = False

            # To properly give the alive bonus for each solution, we need to keep track of the timesteps for all
            # the running solutions. So, we declare the following variable.
            t_per_env = torch.zeros(num_envs, dtype=torch.int64, device=self._simulator_device)

        # We normalize the initial observation.
        obs_per_env = self._normalize_observation(obs_per_env, mask=active_per_env)

        while True:
            # Pass the observations through the policy and get the actions to perform.
            action_per_env = policy(torch.as_tensor(obs_per_env, dtype=params_per_env.dtype))

            if self._action_noise_stdev is not None:
                # If we are to apply action noise, we sample from a Gaussian distribution and add the noise onto
                # the actions.
                action_per_env = action_per_env + (torch.rand_like(action_per_env) * self._action_noise_stdev)

            # Apply the actions, get the observations, rewards, and the 'done' flags.
            obs_per_env, reward_per_env, done_per_env, _ = env.step(action_per_env)

            if self._decrease_rewards_by is not None:
                # We decrease the rewards, if we have the configuration to do so.
                reward_per_env = reward_per_env - self._decrease_rewards_by

            if self._alive_bonus_schedule is not None:
                # Here we handle the alive bonus schedule.

                # For each environment, increment the timestep.
                t_per_env[active_per_env] += 1

                # For those who are within the full alive bonus time region, increase the scores by the full amount.
                in_full_bonus_t_per_env = active_per_env & (t_per_env >= bonus_t1)
                score_per_env[in_full_bonus_t_per_env] += alive_bonus

                if add_partial_alive_bonus:
                    # Here we handle the partial alive bonus time window.
                    # We first determine which environments are in the partial alive bonus time window.
                    in_partial_bonus_t_per_env = active_per_env & (t_per_env >= bonus_t0) & (t_per_env < bonus_t1)

                    # Here we compute the partial alive bonuses and add those bonuses to the scores.
                    score_per_env[in_partial_bonus_t_per_env] += alive_bonus * (
                        torch.as_tensor(t_per_env[in_partial_bonus_t_per_env] - bonus_t0, dtype=torch.float32)
                        / bonus_t_gap_as_float
                    )

                # Determine which environments just finished their episodes.
                just_finished_per_env = active_per_env & done_per_env

                # Reset the timestep counters of the environments which are just finished.
                t_per_env[just_finished_per_env] = 0

            # For each active environment, increase the score by the reward received.
            score_per_env[active_per_env] += reward_per_env[active_per_env]

            # Update the total timesteps counter.
            total_timesteps += int(torch.sum(active_per_env))

            # Reset the policies whose episodes are done (so that their hidden states become 0).
            policy.reset(done_per_env)

            # Update the number of episodes counter for each environment.
            num_eps_per_env[done_per_env] += 1

            # Solutions with number of completed episodes larger than the number of allowed episodes become inactive.
            active_per_env[:num_solutions] = num_eps_per_env[:num_solutions] < self._num_episodes

            if not torch.any(active_per_env[:num_solutions]):
                # If there is not a single active solution left, then we exit this loop.
                break

            # For the next iteration of this loop, we normalize the observation.
            obs_per_env = self._normalize_observation(obs_per_env, mask=active_per_env)

        # Update the interaction count and the episode count stored by this VecGymNE instance.
        self._interaction_count += total_timesteps
        self._episode_count += num_solutions * self._num_episodes

        # Compute the fitnesses
        fitnesses = score_per_env[:num_solutions]
        if self._num_episodes > 1:
            fitnesses /= self._num_episodes

        # Assign the scores to the solutions as fitnesses.
        batch.set_evals(fitnesses)

    def get_env(self) -> Optional[gym.Env]:
        """
        Get the gym environment.

        Returns:
            The gym environment if it is built. If not built yet, None.
        """
        return self._env

    def to_policy(self, solution: Iterable, *, with_wrapper_modules: bool = True) -> nn.Module:
        """
        Convert the given solution to a policy.

        Args:
            solution: A solution which can be given as a `torch.Tensor`, as a
                `Solution`, or as any `Iterable`.
            with_wrapper_modules: Whether or not to wrap the policy module
                with helper modules so that observations are normalized
                and actions are clipped to be within the correct boundaries.
                The default and the recommended value is True.
        Returns:
            The policy, as a `torch.nn.Module` instance.
        """
        # Get the gym environment
        env = self._get_env(1)

        # Get the observation space, its lower and higher bounds.
        obs_space = env.single_action_space
        low = obs_space.low
        high = obs_space.high

        # If the lower and higher bounds are not -inf and +inf respectively, then this environment needs clipping.
        needs_clipping = _numpy_arrays_specify_bounds(low, high)

        # Convert the solution to a PyTorch tensor on cpu.
        if isinstance(solution, torch.Tensor):
            solution = solution.to("cpu")
        elif isinstance(solution, Solution):
            solution = solution.values.clone().to("cpu")
        else:
            solution = torch.as_tensor(solution, dtype=torch.float32, device="cpu")

        # Convert the internally stored policy to a PyTorch module.
        result = self._policy.to_torch_module(solution)

        if with_wrapper_modules:
            if self.observation_normalization and (self._obs_stats is not None):
                # If observation normalization is needed and there are collected observation stats, then we wrap the
                # policy with an ObsNormWrapperModule.
                result = ObsNormWrapperModule(result, self._obs_stats)

            if needs_clipping:
                # If clipping is needed, then we wrap the policy with an ActClipWrapperModule
                result = ActClipWrapperModule(result, obs_space)

        return result

    def save_solution(self, solution: Iterable, fname: Union[str, Path]):
        """
        Save the solution into a pickle file.
        Among the saved data within the pickle file are the solution
        (as a PyTorch tensor), the policy (as a `torch.nn.Module` instance),
        and observation stats (if any).

        Args:
            solution: The solution to be saved. This can be a PyTorch tensor,
                a `Solution` instance, or any `Iterable`.
            fname: The file name of the pickle file to be created.
        """

        # Convert the solution to a PyTorch tensor on the cpu.
        if isinstance(solution, torch.Tensor):
            solution = solution.to("cpu")
        elif isinstance(solution, Solution):
            solution = solution.values.clone().to("cpu")
        else:
            solution = torch.as_tensor(solution, dtype=torch.float32, device="cpu")

        if isinstance(solution, ReadOnlyTensor):
            solution = solution.as_subclass(torch.Tensor)

        # Store the solution and the policy.
        result = {
            "solution": solution,
            "policy": self.to_policy(solution),
        }

        # If available, store the observation stats.
        if self.observation_normalization and (self._obs_stats is not None):
            result["obs_mean"] = self._obs_stats.mean.to("cpu")
            result["obs_stdev"] = self._obs_stats.stdev.to("cpu")
            result["obs_sum"] = self._obs_stats.sum.to("cpu")
            result["obs_sum_of_squares"] = self._obs_stats.sum_of_squares.to("cpu")

        # Some additional data.
        result["interaction_count"] = self.interaction_count
        result["episode_count"] = self.episode_count
        result["time"] = datetime.now()

        if isinstance(self._env_maker, str):
            # If the environment was specified via a string, store the string.
            result["env"] = self._env_maker

        # Store the network architecture.
        result["architecture"] = self._architecture

        # Save the dictionary which stores the data.
        with open(fname, "wb") as f:
            pickle.dump(result, f)

    @property
    def max_num_envs(self) -> Optional[int]:
        """
        Maximum number of environments to be allocated.

        If a maximum number of environments is not set, then None is returned.
        If this problem instance is the main one, then the overall maximum
        number of environments is returned.
        If this problem instance is a remote one (i.e. is on a remote actor)
        then the maximum number of environments for that actor is returned.
        """
        if self.is_main:
            return self._max_num_envs
        else:
            return self._actor_max_num_envs

    def make_net(self, solution: Iterable) -> nn.Module:
        """
        Make a new policy network parameterized by the given solution.
        Note that this parameterized network assumes that the observation
        is already normalized, and it does not do action clipping to ensure
        that the generated actions are within valid bounds.

        To have a policy network which has its own observation normalization
        and action clipping layers, please see the method `to_policy(...)`.

        Args:
            solution: The solution which stores the parameters.
                This can be a Solution instance, or a 1-dimensional tensor,
                or any Iterable of real numbers.
        Returns:
            The policy network, as a PyTorch module.
        """
        return self.to_policy(solution, with_wrapper_modules=False)

    @property
    def network_device(self) -> Optional[Device]:
        """
        The device on which the policy networks will operate.

        Specific to VecGymNE, the network device is determined only
        after receiving the first observation from the reinforcement
        learning environment. Until then, this property has the value
        None.
        """
        return self._simulator_device

episode_count: int property readonly

Get the total number of episodes completed.

interaction_count: int property readonly

Get the total number of simulator interactions made.

max_num_envs: Optional[int] property readonly

Maximum number of environments to be allocated.

If a maximum number of environments is not set, then None is returned. If this problem instance is the main one, then the overall maximum number of environments is returned. If this problem instance is a remote one (i.e. is on a remote actor) then the maximum number of environments for that actor is returned.

network_device: Union[str, torch.device] property readonly

The device on which the policy networks will operate.

Specific to VecGymNE, the network device is determined only after receiving the first observation from the reinforcement learning environment. Until then, this property has the value None.

__init__(self, env, network, *, env_config=None, max_num_envs=None, network_args=None, observation_normalization=False, decrease_rewards_by=None, alive_bonus_schedule=None, action_noise_stdev=None, num_episodes=1, device=None, num_actors=None, num_gpus_per_actor=None, num_subbatches=None, subbatch_size=None, actor_config=None) special

Initialize the VecGymNE.

Parameters:

Name Type Description Default
env Union[str, Callable]

Environment to be solved. If this is given as a string starting with "gym::" (e.g. "gym::Humanoid-v4", etc.), then it is assumed that the target environment is a classical gym environment. If this is given as a string starting with "brax::" (e.g. "brax::humanoid", etc.), then it is assumed that the target environment is a brax environment. If this is given as a string which does not contain "::" at all (e.g. "Humanoid-v4", etc.), then it is assumed that the target environment is a classical gym environment. Therefore, "gym::Humanoid-v4" and "Humanoid-v4" are equivalent. If this argument is given as a Callable (maybe a function or a class), then, with the assumption that this Callable expects a keyword argument num_envs: int, this Callable is called and its result (expected as a gym.vector.VectorEnv instance) is used as the environment.

required
network Union[str, Callable, torch.nn.modules.module.Module]

A network structure string, or a Callable (which can be a class inheriting from torch.nn.Module, or a function which returns a torch.nn.Module instance), or an instance of torch.nn.Module. The object provided here determines the structure of the neural network whose parameters will be evolved. A network structure string is a string which can be processed by evotorch.neuroevolution.net.str_to_net(...). Please see the documentation of the function evotorch.neuroevolution.net.str_to_net(...) to see how such a neural network structure string looks like. Note that this network can be a recurrent network. When the network's forward(...) method can optionally accept an additional positional argument for the hidden state of the network and returns an additional value for its next state, then the policy is treated as a recurrent one. When the network is given as a callable object (e.g. a subclass of nn.Module or a function) and this callable object is decorated via evotorch.decorators.pass_info, the following keyword arguments will be passed: (i) obs_length (the length of the observation vector), (ii) act_length (the length of the action vector), (iii) obs_shape (the shape tuple of the observation space), (iv) act_shape (the shape tuple of the action space), (v) obs_space (the Box object specifying the observation space, and (vi) act_space (the Box object specifying the action space). Note that act_space will always be given as a gym.spaces.Box instance, even when the actual gym environment has a discrete action space. This because VecGymNE always expects the neural network to return a tensor of floating-point numbers.

required
env_config Optional[collections.abc.Mapping]

Keyword arguments to pass to the environment while it is being created.

None
max_num_envs Optional[int]

Maximum number of environments to be instantiated. By default, this is None, which means that the number of environments can go up to the population size (or up to the number of solutions that a remote actor receives, if the problem object is configured to have parallelization). For situations where the current reinforcement learning task requires large amount of resources (e.g. memory), allocating environments as much as the number of solutions might not be feasible. In such cases, one can set max_num_envs as an integer to bring an upper bound (in total, across all the remote actors, for when the problem is parallelized) to how many environments can be allocated.

None
network_args Optional[collections.abc.Mapping]

Any additional keyword argument to be used when instantiating the network can be specified via network_args as a dictionary. If there are no such additional keyword arguments, then network_args can be left as None. Note that the argument network_args is expected to be None when the network is specified as a torch.nn.Module instance.

None
observation_normalization bool

Whether or not online normalization will be done on the encountered observations.

False
decrease_rewards_by Optional[float]

If given as a float, each reward will be decreased by this amount. For example, if the environment's reward function has a constant "alive bonus" (i.e. a bonus that is constantly added onto the reward as long as the agent is alive), and if you wish to negate this bonus, you can set decrease_rewards_by to this bonus amount, and the bonus will be nullified. If you do not wish to affect the rewards in this manner, keep this as None.

None
alive_bonus_schedule Optional[tuple]

Use this to add a customized amount of alive bonus. If left as None (which is the default), additional alive bonus will not be added. If given as a tuple (t, b), an alive bonus b will be added onto all the rewards beyond the timestep t. If given as a tuple (t0, t1, b), a partial (linearly increasing towards b) alive bonus will be added onto all the rewards between the timesteps t0 and t1, and a full alive bonus (which equals to b) will be added onto all the rewards beyond the timestep t1.

None
action_noise_stdev Optional[float]

If given as a real number s, then, for each generated action, Gaussian noise with standard deviation s will be sampled, and then this sampled noise will be added onto the action. If action noise is not desired, then this argument can be left as None. For sampling the noise, the global random number generator of PyTorch on the simulator's device will be used.

None
num_episodes int

Number of episodes over which each policy will be evaluated. The default is 1.

1
device Union[str, torch.device]

The device in which the population will be kept. If you wish to do a single-GPU evolution, we recommend to set this as "cuda" (or "cuda:0", or "cuda:1", etc.), assuming that the simulator will also instantiate itself on that same device. Alternatively, if you wish to do a multi-GPU evolution, we recommend to leave this as None or set this as "cpu", so that the main population will be kept on the cpu and the remote actors will perform their evaluations on the GPUs that are assigned to them.

None
num_actors Union[int, str]

Number of actors to create for parallelized evaluation of the solutions. Certain string values are also accepted. When given as "max" or as "num_cpus", the number of actors will be equal to the number of all available CPUs in the ray cluster. When given as "num_gpus", the number of actors will be equal to the number of all available GPUs in the ray cluster, and each actor will be assigned a GPU. When given as "num_devices", the number of actors will be equal to the minimum among the number of CPUs and the number of GPUs available in the cluster (or will be equal to the number of CPUs if there is no GPU), and each actor will be assigned a GPU (if available). If num_actors is given as "num_gpus" or "num_devices", the argument num_gpus_per_actor must not be used, and the actor_config dictionary must not contain the key "num_gpus". If num_actors is given as something other than "num_gpus" or "num_devices", and if you wish to assign GPUs to each actor, then please see the argument num_gpus_per_actor.

None
num_gpus_per_actor Optional[int]

Number of GPUs to be assigned to each actor. This can be an integer or a float (for when you wish to assign fractional amounts of GPUs to actors). When num_actors has the special value "num_devices", the argument num_gpus_per_actor is expected to be left as None.

None
num_subbatches Optional[int]

For when there are multiple actors, you can set this to an integer n if you wish the population to be divided exactly into n sub-batches. The actors, as they finish their currently assigned sub-batch of solutions, will pick the next un-evaluated sub-batch. If you specify too large numbers for this argument, then each sub-batch will be smaller. When working with vectorized simulators on GPU, having too many and too small sub-batches can hurt the performance. This argument can be left as None, in which case, assuming that subbatch_size is also None, the population will be split to m sub-batches, m being the number of actors.

None
subbatch_size Optional[int]

For when there are multiple actors, you can set this to an integer n if you wish the population to be divided into sub-batches in such a way that each sub-batch will consist of exactly n solutions. The actors, as they finish their currently assigned sub-batch of solutions, will pick the next un-evaluated sub-batch. If you specify too small numbers for this argument, then there will be many sub-batches, each sub-batch having a small number of solutions. When working with vectorized simulators on GPU, having too many and too small sub-batches can hurt the performance. This argument can be left as None, in which case, assuming that num_subbatches is also None, the population will be split to m sub-batches, m being the number of actors.

None
actor_config Optional[collections.abc.Mapping]

Additional configuration to be used when creating each actor with the help of ray library. Can be left as None if additional configuration is not needed.

None
Source code in evotorch/neuroevolution/vecgymne.py
def __init__(
    self,
    env: Union[str, Callable],
    network: Union[str, Callable, nn.Module],
    *,
    env_config: Optional[Mapping] = None,
    max_num_envs: Optional[int] = None,
    network_args: Optional[Mapping] = None,
    observation_normalization: bool = False,
    decrease_rewards_by: Optional[float] = None,
    alive_bonus_schedule: Optional[tuple] = None,
    action_noise_stdev: Optional[float] = None,
    num_episodes: int = 1,
    device: Optional[Device] = None,
    num_actors: Optional[Union[int, str]] = None,
    num_gpus_per_actor: Optional[int] = None,
    num_subbatches: Optional[int] = None,
    subbatch_size: Optional[int] = None,
    actor_config: Optional[Mapping] = None,
):
    """
    Initialize the VecGymNE.

    Args:
        env: Environment to be solved.
            If this is given as a string starting with "gym::" (e.g.
            "gym::Humanoid-v4", etc.), then it is assumed that the target
            environment is a classical gym environment.
            If this is given as a string starting with "brax::" (e.g.
            "brax::humanoid", etc.), then it is assumed that the target
            environment is a brax environment.
            If this is given as a string which does not contain "::" at
            all (e.g. "Humanoid-v4", etc.), then it is assumed that the
            target environment is a classical gym environment. Therefore,
            "gym::Humanoid-v4" and "Humanoid-v4" are equivalent.
            If this argument is given as a Callable (maybe a function or a
            class), then, with the assumption that this Callable expects
            a keyword argument `num_envs: int`, this Callable is called
            and its result (expected as a `gym.vector.VectorEnv` instance)
            is used as the environment.
        network: A network structure string, or a Callable (which can be
            a class inheriting from `torch.nn.Module`, or a function
            which returns a `torch.nn.Module` instance), or an instance
            of `torch.nn.Module`.
            The object provided here determines the structure of the
            neural network whose parameters will be evolved.
            A network structure string is a string which can be processed
            by `evotorch.neuroevolution.net.str_to_net(...)`.
            Please see the documentation of the function
            `evotorch.neuroevolution.net.str_to_net(...)` to see how such
            a neural network structure string looks like.
            Note that this network can be a recurrent network.
            When the network's `forward(...)` method can optionally accept
            an additional positional argument for the hidden state of the
            network and returns an additional value for its next state,
            then the policy is treated as a recurrent one.
            When the network is given as a callable object (e.g.
            a subclass of `nn.Module` or a function) and this callable
            object is decorated via `evotorch.decorators.pass_info`,
            the following keyword arguments will be passed:
            (i) `obs_length` (the length of the observation vector),
            (ii) `act_length` (the length of the action vector),
            (iii) `obs_shape` (the shape tuple of the observation space),
            (iv) `act_shape` (the shape tuple of the action space),
            (v) `obs_space` (the Box object specifying the observation
            space, and
            (vi) `act_space` (the Box object specifying the action
            space). Note that `act_space` will always be given as a
            `gym.spaces.Box` instance, even when the actual gym
            environment has a discrete action space. This because
            `VecGymNE` always expects the neural network to return
            a tensor of floating-point numbers.
        env_config: Keyword arguments to pass to the environment while
            it is being created.
        max_num_envs: Maximum number of environments to be instantiated.
            By default, this is None, which means that the number of
            environments can go up to the population size (or up to the
            number of solutions that a remote actor receives, if the
            problem object is configured to have parallelization).
            For situations where the current reinforcement learning task
            requires large amount of resources (e.g. memory), allocating
            environments as much as the number of solutions might not
            be feasible. In such cases, one can set `max_num_envs` as an
            integer to bring an upper bound (in total, across all the
            remote actors, for when the problem is parallelized) to how
            many environments can be allocated.
        network_args: Any additional keyword argument to be used when
            instantiating the network can be specified via `network_args`
            as a dictionary. If there are no such additional keyword
            arguments, then `network_args` can be left as None.
            Note that the argument `network_args` is expected to be None
            when the network is specified as a `torch.nn.Module` instance.
        observation_normalization: Whether or not online normalization
            will be done on the encountered observations.
        decrease_rewards_by: If given as a float, each reward will be
            decreased by this amount. For example, if the environment's
            reward function has a constant "alive bonus" (i.e. a bonus
            that is constantly added onto the reward as long as the
            agent is alive), and if you wish to negate this bonus,
            you can set `decrease_rewards_by` to this bonus amount,
            and the bonus will be nullified.
            If you do not wish to affect the rewards in this manner,
            keep this as None.
        alive_bonus_schedule: Use this to add a customized amount of
            alive bonus.
            If left as None (which is the default), additional alive
            bonus will not be added.
            If given as a tuple `(t, b)`, an alive bonus `b` will be
            added onto all the rewards beyond the timestep `t`.
            If given as a tuple `(t0, t1, b)`, a partial (linearly
            increasing towards `b`) alive bonus will be added onto
            all the rewards between the timesteps `t0` and `t1`,
            and a full alive bonus (which equals to `b`) will be added
            onto all the rewards beyond the timestep `t1`.
        action_noise_stdev: If given as a real number `s`, then, for
            each generated action, Gaussian noise with standard
            deviation `s` will be sampled, and then this sampled noise
            will be added onto the action.
            If action noise is not desired, then this argument can be
            left as None.
            For sampling the noise, the global random number generator
            of PyTorch on the simulator's device will be used.
        num_episodes: Number of episodes over which each policy will
            be evaluated. The default is 1.
        device: The device in which the population will be kept.
            If you wish to do a single-GPU evolution, we recommend
            to set this as "cuda" (or "cuda:0", or "cuda:1", etc.),
            assuming that the simulator will also instantiate itself
            on that same device.
            Alternatively, if you wish to do a multi-GPU evolution,
            we recommend to leave this as None or set this as "cpu",
            so that the main population will be kept on the cpu
            and the remote actors will perform their evaluations on
            the GPUs that are assigned to them.
        num_actors: Number of actors to create for parallelized
            evaluation of the solutions.
            Certain string values are also accepted.
            When given as "max" or as "num_cpus", the number of actors
            will be equal to the number of all available CPUs in the ray
            cluster.
            When given as "num_gpus", the number of actors will be
            equal to the number of all available GPUs in the ray
            cluster, and each actor will be assigned a GPU.
            When given as "num_devices", the number of actors will be
            equal to the minimum among the number of CPUs and the number
            of GPUs available in the cluster (or will be equal to the
            number of CPUs if there is no GPU), and each actor will be
            assigned a GPU (if available).
            If `num_actors` is given as "num_gpus" or "num_devices",
            the argument `num_gpus_per_actor` must not be used,
            and the `actor_config` dictionary must not contain the
            key "num_gpus".
            If `num_actors` is given as something other than "num_gpus"
            or "num_devices", and if you wish to assign GPUs to each
            actor, then please see the argument `num_gpus_per_actor`.
        num_gpus_per_actor: Number of GPUs to be assigned to each
            actor. This can be an integer or a float (for when you
            wish to assign fractional amounts of GPUs to actors).
            When `num_actors` has the special value "num_devices",
            the argument `num_gpus_per_actor` is expected to be left as
            None.
        num_subbatches: For when there are multiple actors, you can
            set this to an integer n if you wish the population
            to be divided exactly into n sub-batches. The actors, as they
            finish their currently assigned sub-batch of solutions,
            will pick the next un-evaluated sub-batch.
            If you specify too large numbers for this argument, then
            each sub-batch will be smaller.
            When working with vectorized simulators on GPU, having too
            many and too small sub-batches can hurt the performance.
            This argument can be left as None, in which case, assuming
            that `subbatch_size` is also None, the population will be
            split to m sub-batches, m being the number of actors.
        subbatch_size: For when there are multiple actors, you can
            set this to an integer n if you wish the population to be
            divided into sub-batches in such a way that each sub-batch
            will consist of exactly n solutions. The actors, as they
            finish their currently assigned sub-batch of solutions,
            will pick the next un-evaluated sub-batch.
            If you specify too small numbers for this argument, then
            there will be many sub-batches, each sub-batch having a
            small number of solutions.
            When working with vectorized simulators on GPU, having too
            many and too small sub-batches can hurt the performance.
            This argument can be left as None, in which case, assuming
            that `num_subbatches` is also None, the population will be
            split to m sub-batches, m being the number of actors.
        actor_config: Additional configuration to be used when creating
            each actor with the help of `ray` library.
            Can be left as None if additional configuration is not needed.
    """

    # Store the string or the Callable that will be used to generate the reinforcement learning environment.
    self._env_maker = env

    # Declare the variable which will store the environment.
    self._env: Optional[TorchWrapper] = None

    # Declare the variable which will store the batch size of the vectorized environment.
    self._num_envs: Optional[int] = None

    # Store the upper bound (if any) regarding how many environments can exist at the same time.
    self._max_num_envs: Optional[int] = None if max_num_envs is None else int(max_num_envs)

    # Actor-specific upper bound regarding how many environments can exist at the same time.
    # This variable will be filled by the `_parallelize(...)` method.
    self._actor_max_num_envs: Optional[int] = None

    # Declare the variable which stores whether or not we properly initialized the `_actor_max_num_envs` variable.
    self._actor_max_num_envs_ready: bool = False

    # Store the additional configurations to be used as keyword arguments while instantiating the environment.
    self._env_config: dict = {} if env_config is None else dict(env_config)

    # Declare the variable that will store the device of the simulator.
    # This variable will be filled when the first observation is received from the environment.
    # The device of the observation array received from the environment will determine the value of this variable.
    self._simulator_device: Optional[torch.device] = None

    # Store the neural network architecture (that might be a string or an `nn.Module` instance).
    self._architecture = network

    if network_args is None:
        # If `network_args` is given as None, change it to an empty dictionary
        network_args = {}

    if isinstance(network, str):
        # If the network is given as a string, then we will need the values for the constants `obs_length`,
        # `act_length`, and `obs_space`. To obtain those values, we use our helper function
        # `_env_constants_for_str_net(...)` which temporarily instantiates the specified environment and returns
        # its needed constants.
        env_constants = _env_constants_for_str_net(self._env_maker, **(self._env_config))
    elif isinstance(network, nn.Module):
        # If the network is an already instantiated nn.Module, then we do not prepare any pre-defined constants.
        env_constants = {}
    else:
        # If the network is given as a Callable, then we will need the values for the constants `obs_length`,
        # `act_length`, and `obs_space`. To obtain those values, we use our helper function
        # `_env_constants_for_callable_net(...)` which temporarily instantiates the specified environment and
        # returns its needed constants.
        env_constants = _env_constants_for_callable_net(self._env_maker, **(self._env_config))

    # Build a `Policy` instance according to the given architecture, and store it.
    if isinstance(network, str):
        instantiated_net = str_to_net(network, **{**env_constants, **network_args})
    elif isinstance(network, nn.Module):
        instantiated_net = network
    else:
        instantiated_net = pass_info_if_needed(network, env_constants)(**network_args)
    self._policy = Policy(instantiated_net)

    # Store the boolean which indicates whether or not there will be observation normalization.
    self._observation_normalization = bool(observation_normalization)

    # Declare the variables that will store the observation-related stats if observation normalization is enabled.
    self._obs_stats: Optional[RunningNorm] = None
    self._collected_stats: Optional[RunningNorm] = None

    # Store the number of episodes configuration given by the user.
    self._num_episodes = int(num_episodes)

    # Store the `decrease_rewards_by` configuration given by the user.
    self._decrease_rewards_by = None if decrease_rewards_by is None else float(decrease_rewards_by)

    if alive_bonus_schedule is None:
        # If `alive_bonus_schedule` argument is None, then we store it as None as well.
        self._alive_bonus_schedule = None
    else:
        # This is the case where the user has specified an `alive_bonus_schedule`.
        alive_bonus_schedule = list(alive_bonus_schedule)
        alive_bonus_schedule_length = len(alive_bonus_schedule)
        if alive_bonus_schedule_length == 2:
            # If `alive_bonus_schedule` was given as a 2-element sequence (t, b), then store it as (t, t, b).
            # This means that the partial alive bonus time window starts and ends at t, therefore, there will
            # be no alive bonus until t, and beginning with t, there will be full alive bonus.
            self._alive_bonus_schedule = [
                int(alive_bonus_schedule[0]),
                int(alive_bonus_schedule[0]),
                float(alive_bonus_schedule[1]),
            ]
        elif alive_bonus_schedule_length == 3:
            # If `alive_bonus_schedule` was given as a 3-element sequence (t0, t1, b), then store those 3
            # elements.
            self._alive_bonus_schedule = [
                int(alive_bonus_schedule[0]),
                int(alive_bonus_schedule[1]),
                float(alive_bonus_schedule[2]),
            ]
        else:
            # `alive_bonus_schedule` sequences with unrecognized lengths trigger an error.
            raise ValueError(
                f"Received invalid number elements as the alive bonus schedule."
                f" Expected 2 or 3 items, but got these: {self._alive_bonus_schedule}"
                f" (having a length of {len(self._alive_bonus_schedule)})."
            )

    # If `action_noise_stdev` is specified, store it.
    self._action_noise_stdev = None if action_noise_stdev is None else float(action_noise_stdev)

    # Initialize the counters for the number of simulator interactions and the number of episodes.
    self._interaction_count: int = 0
    self._episode_count: int = 0

    # Call the superclass
    super().__init__(
        objective_sense="max",
        initial_bounds=(-0.00001, 0.00001),
        solution_length=self._policy.parameter_length,
        device=device,
        dtype=torch.float32,
        num_actors=num_actors,
        num_gpus_per_actor=num_gpus_per_actor,
        actor_config=actor_config,
        num_subbatches=num_subbatches,
        subbatch_size=subbatch_size,
    )

get_env(self)

Get the gym environment.

Returns:

Type Description
Optional[gymnasium.core.Env]

The gym environment if it is built. If not built yet, None.

Source code in evotorch/neuroevolution/vecgymne.py
def get_env(self) -> Optional[gym.Env]:
    """
    Get the gym environment.

    Returns:
        The gym environment if it is built. If not built yet, None.
    """
    return self._env

get_observation_stats(self)

Get the observation stats

Source code in evotorch/neuroevolution/vecgymne.py
def get_observation_stats(self) -> RunningNorm:
    """Get the observation stats"""
    self._ensure_obsnorm()
    return self._obs_stats

make_net(self, solution)

Make a new policy network parameterized by the given solution. Note that this parameterized network assumes that the observation is already normalized, and it does not do action clipping to ensure that the generated actions are within valid bounds.

To have a policy network which has its own observation normalization and action clipping layers, please see the method to_policy(...).

Parameters:

Name Type Description Default
solution Iterable

The solution which stores the parameters. This can be a Solution instance, or a 1-dimensional tensor, or any Iterable of real numbers.

required

Returns:

Type Description
Module

The policy network, as a PyTorch module.

Source code in evotorch/neuroevolution/vecgymne.py
def make_net(self, solution: Iterable) -> nn.Module:
    """
    Make a new policy network parameterized by the given solution.
    Note that this parameterized network assumes that the observation
    is already normalized, and it does not do action clipping to ensure
    that the generated actions are within valid bounds.

    To have a policy network which has its own observation normalization
    and action clipping layers, please see the method `to_policy(...)`.

    Args:
        solution: The solution which stores the parameters.
            This can be a Solution instance, or a 1-dimensional tensor,
            or any Iterable of real numbers.
    Returns:
        The policy network, as a PyTorch module.
    """
    return self.to_policy(solution, with_wrapper_modules=False)

pop_observation_stats(self)

Get and clear the collected observation stats

Source code in evotorch/neuroevolution/vecgymne.py
def pop_observation_stats(self) -> RunningNorm:
    """Get and clear the collected observation stats"""
    self._ensure_obsnorm()
    result = self._collected_stats
    self._collected_stats = None
    return result

save_solution(self, solution, fname)

Save the solution into a pickle file. Among the saved data within the pickle file are the solution (as a PyTorch tensor), the policy (as a torch.nn.Module instance), and observation stats (if any).

Parameters:

Name Type Description Default
solution Iterable

The solution to be saved. This can be a PyTorch tensor, a Solution instance, or any Iterable.

required
fname Union[str, pathlib.Path]

The file name of the pickle file to be created.

required
Source code in evotorch/neuroevolution/vecgymne.py
def save_solution(self, solution: Iterable, fname: Union[str, Path]):
    """
    Save the solution into a pickle file.
    Among the saved data within the pickle file are the solution
    (as a PyTorch tensor), the policy (as a `torch.nn.Module` instance),
    and observation stats (if any).

    Args:
        solution: The solution to be saved. This can be a PyTorch tensor,
            a `Solution` instance, or any `Iterable`.
        fname: The file name of the pickle file to be created.
    """

    # Convert the solution to a PyTorch tensor on the cpu.
    if isinstance(solution, torch.Tensor):
        solution = solution.to("cpu")
    elif isinstance(solution, Solution):
        solution = solution.values.clone().to("cpu")
    else:
        solution = torch.as_tensor(solution, dtype=torch.float32, device="cpu")

    if isinstance(solution, ReadOnlyTensor):
        solution = solution.as_subclass(torch.Tensor)

    # Store the solution and the policy.
    result = {
        "solution": solution,
        "policy": self.to_policy(solution),
    }

    # If available, store the observation stats.
    if self.observation_normalization and (self._obs_stats is not None):
        result["obs_mean"] = self._obs_stats.mean.to("cpu")
        result["obs_stdev"] = self._obs_stats.stdev.to("cpu")
        result["obs_sum"] = self._obs_stats.sum.to("cpu")
        result["obs_sum_of_squares"] = self._obs_stats.sum_of_squares.to("cpu")

    # Some additional data.
    result["interaction_count"] = self.interaction_count
    result["episode_count"] = self.episode_count
    result["time"] = datetime.now()

    if isinstance(self._env_maker, str):
        # If the environment was specified via a string, store the string.
        result["env"] = self._env_maker

    # Store the network architecture.
    result["architecture"] = self._architecture

    # Save the dictionary which stores the data.
    with open(fname, "wb") as f:
        pickle.dump(result, f)

set_episode_count(self, n)

Set the episode count manually.

Source code in evotorch/neuroevolution/vecgymne.py
def set_episode_count(self, n: int):
    """
    Set the episode count manually.
    """
    self._episode_count = int(n)

set_interaction_count(self, n)

Set the interaction count manually.

Source code in evotorch/neuroevolution/vecgymne.py
def set_interaction_count(self, n: int):
    """
    Set the interaction count manually.
    """
    self._interaction_count = int(n)

set_observation_stats(self, rn)

Set the observation stats

Source code in evotorch/neuroevolution/vecgymne.py
def set_observation_stats(self, rn: RunningNorm):
    """Set the observation stats"""
    self._ensure_obsnorm()
    self._obs_stats = rn

to_policy(self, solution, *, with_wrapper_modules=True)

Convert the given solution to a policy.

Parameters:

Name Type Description Default
solution Iterable

A solution which can be given as a torch.Tensor, as a Solution, or as any Iterable.

required
with_wrapper_modules bool

Whether or not to wrap the policy module with helper modules so that observations are normalized and actions are clipped to be within the correct boundaries. The default and the recommended value is True.

True

Returns:

Type Description
Module

The policy, as a torch.nn.Module instance.

Source code in evotorch/neuroevolution/vecgymne.py
def to_policy(self, solution: Iterable, *, with_wrapper_modules: bool = True) -> nn.Module:
    """
    Convert the given solution to a policy.

    Args:
        solution: A solution which can be given as a `torch.Tensor`, as a
            `Solution`, or as any `Iterable`.
        with_wrapper_modules: Whether or not to wrap the policy module
            with helper modules so that observations are normalized
            and actions are clipped to be within the correct boundaries.
            The default and the recommended value is True.
    Returns:
        The policy, as a `torch.nn.Module` instance.
    """
    # Get the gym environment
    env = self._get_env(1)

    # Get the observation space, its lower and higher bounds.
    obs_space = env.single_action_space
    low = obs_space.low
    high = obs_space.high

    # If the lower and higher bounds are not -inf and +inf respectively, then this environment needs clipping.
    needs_clipping = _numpy_arrays_specify_bounds(low, high)

    # Convert the solution to a PyTorch tensor on cpu.
    if isinstance(solution, torch.Tensor):
        solution = solution.to("cpu")
    elif isinstance(solution, Solution):
        solution = solution.values.clone().to("cpu")
    else:
        solution = torch.as_tensor(solution, dtype=torch.float32, device="cpu")

    # Convert the internally stored policy to a PyTorch module.
    result = self._policy.to_torch_module(solution)

    if with_wrapper_modules:
        if self.observation_normalization and (self._obs_stats is not None):
            # If observation normalization is needed and there are collected observation stats, then we wrap the
            # policy with an ObsNormWrapperModule.
            result = ObsNormWrapperModule(result, self._obs_stats)

        if needs_clipping:
            # If clipping is needed, then we wrap the policy with an ActClipWrapperModule
            result = ActClipWrapperModule(result, obs_space)

    return result

update_observation_stats(self, rn)

Update the observation stats via another RunningNorm instance

Source code in evotorch/neuroevolution/vecgymne.py
def update_observation_stats(self, rn: RunningNorm):
    """Update the observation stats via another RunningNorm instance"""
    self._ensure_obsnorm()
    if self._obs_stats is None:
        self._obs_stats = rn
    else:
        self._obs_stats.update(rn)