Skip to content

Model Predictive Control (MPC) with EvoTorch

In this example, we study Model Predictive Control (MPC) using the Cross Entropy Method (CEM) [1].

The aim of MPC is to obtain a model of a controllable system, and then to use optimisation to find the control actions which achieve some goal. The model can be a neural network trained via supervised learning, which is the case in our example. Therefore, with the supervised learning phase included, the general overview of the approach used here is:

Step 1: Generate and record random trajectories using the controllable system.

Step 2: Using PyTorch, learn a forward model of the system by using the recorded random trajectories as training data.

Step 3: Apply MPC on the problem. Here is a very high level definition of it:

  • for timestep \(t = t_1 ... t_T\)
    • By using CEM, find a vector \([action_1, ... action_A]\) which, according to the forward model, minimises some cost. The resulting best solution of CEM is the plan.
    • Apply the first action of the plan to the simulator.

Although this notebook focuses on step 3 (i.e. the step which involves the application of MPC), we also provide, for completeness, a separate notebook which shows the steps 1 and 2.

In this example, we use an MPC agent to solve the MuJoCo task Reacher-v4. Reacher-v4 is a reinforcement learning environment in which the goal is to make a simulated robotic arm reach the goal point.

To explain the example, we first make the following definitions:

  • \(s_t\): State of the robotic arm at time \(t\)
  • \(a_t\): Action taken by the robotic arm at time \(t\)
  • \(\pi\): A forward model (in the form of a neural network) which, given the input \((s_t, a_t)\), predicts the change in state \({s'}_t\), such that the predicted next state \(\tilde{s}_{t+1}\) can be computed as \(s_t + {s'}_t\) (training of \(\pi\) is shown here)

Specific to our Reacher-v4 task, let us also define the following:

  • \(s_t^x\), \(s_t^y\): x and y coordinates of the point reached by the robotic arm, according to the state \(s_t\)
  • \(g^x\), \(g^y\): x and y coordinates of the goal point

Our aim, therefore, is to make the robotic arm move in such a way that the positional error (i.e. the euclidean distance between \((s_t^x, s_t^y)\) and \((g^x, g^y)\)) is minimized. For this, we will choose our actions with the help of the predictions of the neural network \(\pi\).

Requirements

Because this example focuses on the Reacher-v4 reinforcement learning environment, gym with mujoco support is required. One can install the mujoco support for gym via:

pip install 'gym[mujoco]'

Initial imports

We begin our code with the necessary imports.

import torch
from torch import nn

import numpy as np

import pickle

from evotorch import Problem, Solution, SolutionBatch
from evotorch.algorithms import CEM
from evotorch.logging import StdOutLogger

from typing import Iterable

import gym

Loading the model

Below, we load the forward model \(\pi\) which is a neural network expressed as a PyTorch module. We also load the data required for normalizing the inputs and de-normalizing the outputs \(\pi\).

with open("reacher_model.pickle", "rb") as f:
    loaded = pickle.load(f)

input_mean = torch.as_tensor(loaded["input_mean"], dtype=torch.float32)
input_stdev = torch.as_tensor(loaded["input_stdev"], dtype=torch.float32)

target_mean = torch.as_tensor(loaded["target_mean"], dtype=torch.float32)
target_stdev = torch.as_tensor(loaded["target_stdev"], dtype=torch.float32)

model = loaded["model"]

Definitions

We begin our definitions with a helper function, \(\text{reacher_state}(\text{observation})\) which extracts the state (\(s_t\)) of the robotic arm from the observation vector returned by the environment.

def reacher_state(observation: Iterable) -> Iterable:
    observation = np.asarray(observation, dtype="float32")
    state = np.concatenate([observation[:4], observation[6:10]])
    state[-2] += observation[4]
    state[-1] += observation[5]
    return state

We now define the function \(\text{predict_next_state}(s_t, a_t)\) which, given a state \(s_t\) and an action \(a_t\) (\(t\) being the current timestep), returns the predicted next state \(\tilde{s}_{t+1}\).

Within itself, this function uses the neural network \(\pi\) to make its predictions.

@torch.no_grad()
def predict_next_state(state: torch.Tensor, action: torch.Tensor) -> torch.Tensor:
    action = torch.clamp(action, -1.0, 1.0)
    state_and_action = (torch.hstack([state, action]) - input_mean) / input_stdev
    return ((model(state_and_action) * target_stdev) + target_mean) + state

Let us now define a plan \(p_t\) as a series of actions planned for future timesteps, i.e.: \(p_t = (a_t, a_{t+1}, a_{t+2}, ..., a_{t+(H-1)})\) where \(H\) is the horizon, determining how far into the future we are planning.

With this, we define the function \(\text{predict_plan_outcome}(s_t, p_t)\) which receives the current state \(s_t\) and a plan \(p_t\) and returns a predicted future state \(\tilde{s}_{t+H}\), which represents the predicted outcome of following the plan. Within \(\text{predict_plan_outcome}(\cdot)\), the predictions are made with the help of \(\text{predict_next_state}(\cdot)\) which in turn uses the neural network \(\pi\).

An implementation detail to be noted here is that, \(\text{predict_plan_outcome}(\cdot)\) expects not a single plan, but a batch of plans, and uses PyTorch's vectorization capabilities to make predictions for all those plans in a performant manner.

@torch.no_grad()
def predict_plan_outcome(state: torch.Tensor, plan_batch: torch.Tensor) -> torch.Tensor:
    assert state.ndim == 1
    batch_size, plan_length = plan_batch.shape
    state_batch = state * torch.ones(batch_size, len(state))
    plan_batch = plan_batch.reshape(batch_size, -1, 2)
    horizon = plan_batch.shape[1]

    for t in range(horizon):
        action_batch = plan_batch[:, t, :]
        state_batch = predict_next_state(state_batch, action_batch)

    return state_batch

So far, we have defined the tools necessary for making predictions for the given plans. We also need to be able to generate plans. We pose the generation of plans as an optimization problem, summarized as follows:

\[ \begin{array}{c c l} p_t = & \text{arg min} & ||(\tilde{s}_{t+H}^x,\tilde{s}_{t+H}^y)-(g^x, g^y)|| \\ & \text{subject to} & \tilde{s}_{t+H} = \text{predict_plan_outcome}(s_t, p_t) \end{array} \]

that is, given the current state \(s_t\), we are looking for a plan \(p_t\) whose predicted outcome yields a minimal amount of positional error.

We define the class \(\text{PlanningProblem}\) which represents the problem formulated above.

class PlanningProblem(Problem):
    def __init__(
        self,
        observation: Iterable,
        horizon: int = 4,
    ):
        self.observation = np.asarray(observation, dtype="float32")
        self.state = torch.as_tensor(reacher_state(self.observation), dtype=torch.float32)
        self.target_xy = torch.tensor(
            [
                float(self.observation[4]),
                float(self.observation[5]),
            ],
            dtype=torch.float32,
        )

        super().__init__(
            objective_sense="min",
            initial_bounds=(-0.0001, 0.0001),
            solution_length=(horizon * 2),
        )

    def _evaluate_batch(self, solutions: SolutionBatch):
        final_states = predict_plan_outcome(self.state, solutions.values)
        final_xys = final_states[:, -2:]
        errors = torch.linalg.norm(final_xys - self.target_xy, dim=-1)
        solutions.set_evals(errors)

The following is a convenience function which tackles the optimization problem defined above using the cross entropy method (CEM). The best solution produced by CEM becomes the adopted plan. Finally, the adopted plan's first action is returned (to be sent to the simulator).

def do_planning(observation: Iterable) -> Iterable:
    problem = PlanningProblem(observation)
    searcher = CEM(
        problem,
        stdev_init=0.5,
        popsize=250,  # population size
        parenthood_ratio=0.5,
        stdev_max_change=0.2,
    )
    searcher.run(20)  # run for this many generations
    return searcher.status["best"].values[:2].clone().numpy()

Main MPC Loop

Using the tools defined above, we are now ready to implement the main parts of our MPC. We begin by instantiating the RL environment for Reacher-v4.

from packaging.version import Version
old_render_api = Version(gym.__version__) < Version("0.26")

if old_render_api:
    # For gym versions older than 0.26, we do not have to specify additional
    # keyword arguments for human-mode rendering.
    env = gym.make("Reacher-v4")
else:
    # For gym versions beginning with 0.26, we have to explicitly specify
    # that the rendering mode is "human" if we wish to do the rendering on
    # the screen.
    env = gym.make("Reacher-v4", render_mode="human")

env

The following function defines the main loop of MPC for a single episode of the RL environment. For each timestep of the environment, a plan is made and the first action of the plan is applied on the environment.

def run_episode(visualize: bool = False):
    reset_result = env.reset()
    if isinstance(reset_result, tuple):
        # If the result of the `reset()` method is a tuple, then we assume
        # that it returned a tuple in the form (observation, info).
        observation, _ = reset_result
    else:
        # If the result of the `reset()` method is not a tuple, then we
        # assume that it returned the observation by itself.
        observation = reset_result

    if visualize:
        env.render()

    while True:
        action = do_planning(observation)
        action = np.clip(action, -1.0, 1.0)

        step_result = env.step(action)
        if len(step_result) == 5:
            observation, reward, terminated, truncated, info = step_result
            done = terminated | truncated
        elif len(step_result) == 4:
            observation, reward, done, info = step_result
        else:
            assert False, "Unexpected number of items returned by `.step(...)`"

        if visualize:
            env.render()

        if done:
            break

    return np.linalg.norm([observation[-3], observation[-2]])

Run the MPC for the specified number of episodes.

NUM_EPISODES = 10

for _ in range(NUM_EPISODES):
    print("distance to the goal:", run_episode(visualize=True))

References

[1] Rubinstein, Reuven. "The cross-entropy method for combinatorial and continuous optimization." Methodology and computing in applied probability 1.2 (1999): 127-190.


See this notebook on GitHub