# Model Predictive Control (MPC) with EvoTorch¶

In this example, we study Model Predictive Control (MPC) using the Cross Entropy Method (CEM) [1].

The aim of MPC is to obtain a model of a controllable system, and then to use optimisation to find the control actions which achieve some goal. The model can be a neural network trained via supervised learning, which is the case in our example. Therefore, with the supervised learning phase included, the general overview of the approach used here is:

**Step 1:** Generate and record random trajectories using the controllable system.

**Step 2:** Using PyTorch, learn a forward model of the system by using the recorded random trajectories as training data.

**Step 3:** Apply MPC on the problem. Here is a very high level definition of it:

**for**timestep \(t = t_1 ... t_T\)- By using CEM, find a vector \([action_1, ... action_A]\) which, according to the forward model, minimises some cost. The resulting best solution of CEM is the
*plan*. - Apply the first action of the
*plan*to the simulator.

- By using CEM, find a vector \([action_1, ... action_A]\) which, according to the forward model, minimises some cost. The resulting best solution of CEM is the

Although this notebook focuses on step 3 (i.e. the step which involves the application of MPC), we also provide, for completeness, a separate notebook which shows the steps 1 and 2.

In this example, we use an MPC agent to solve the MuJoCo task `Reacher-v4`

.
`Reacher-v4`

is a reinforcement learning environment in which the goal is to make a simulated robotic arm reach the goal point.

To explain the example, we first make the following definitions:

- \(s_t\): State of the robotic arm at time \(t\)
- \(a_t\): Action taken by the robotic arm at time \(t\)
- \(\pi\): A forward model (in the form of a neural network) which, given the input \((s_t, a_t)\), predicts the change in state \({s'}_t\), such that the predicted next state \(\tilde{s}_{t+1}\) can be computed as \(s_t + {s'}_t\) (training of \(\pi\) is shown here)

Specific to our `Reacher-v4`

task, let us also define the following:

- \(s_t^x\), \(s_t^y\): x and y coordinates of the point reached by the robotic arm, according to the state \(s_t\)
- \(g^x\), \(g^y\): x and y coordinates of the goal point

Our aim, therefore, is to make the robotic arm move in such a way that the *positional error* (i.e. the euclidean distance between \((s_t^x, s_t^y)\) and \((g^x, g^y)\)) is minimized. For this, we will choose our actions with the help of the predictions of the neural network \(\pi\).

## Requirements¶

Because this example focuses on the `Reacher-v4`

reinforcement learning environment, `gym`

with `mujoco`

support is required. One can install the `mujoco`

support for `gym`

via:

## Initial imports¶

We begin our code with the necessary imports.

```
import torch
from torch import nn
import numpy as np
import pickle
from evotorch import Problem, Solution, SolutionBatch
from evotorch.algorithms import CEM
from evotorch.logging import StdOutLogger
from typing import Iterable
import gym
```

## Loading the model¶

Below, we load the forward model \(\pi\) which is a neural network expressed as a PyTorch module. We also load the data required for normalizing the inputs and de-normalizing the outputs \(\pi\).

```
with open("reacher_model.pickle", "rb") as f:
loaded = pickle.load(f)
input_mean = torch.as_tensor(loaded["input_mean"], dtype=torch.float32)
input_stdev = torch.as_tensor(loaded["input_stdev"], dtype=torch.float32)
target_mean = torch.as_tensor(loaded["target_mean"], dtype=torch.float32)
target_stdev = torch.as_tensor(loaded["target_stdev"], dtype=torch.float32)
model = loaded["model"]
```

## Definitions¶

We begin our definitions with a helper function, \(\text{reacher_state}(\text{observation})\) which extracts the state (\(s_t\)) of the robotic arm from the observation vector returned by the environment.

```
def reacher_state(observation: Iterable) -> Iterable:
observation = np.asarray(observation, dtype="float32")
state = np.concatenate([observation[:4], observation[6:10]])
state[-2] += observation[4]
state[-1] += observation[5]
return state
```

We now define the function \(\text{predict_next_state}(s_t, a_t)\) which, given a state \(s_t\) and an action \(a_t\) (\(t\) being the current timestep), returns the predicted next state \(\tilde{s}_{t+1}\).

Within itself, this function uses the neural network \(\pi\) to make its predictions.

```
@torch.no_grad()
def predict_next_state(state: torch.Tensor, action: torch.Tensor) -> torch.Tensor:
action = torch.clamp(action, -1.0, 1.0)
state_and_action = (torch.hstack([state, action]) - input_mean) / input_stdev
return ((model(state_and_action) * target_stdev) + target_mean) + state
```

Let us now define a *plan* \(p_t\) as a series of actions planned for future timesteps, i.e.: \(p_t = (a_t, a_{t+1}, a_{t+2}, ..., a_{t+(H-1)})\) where \(H\) is the horizon, determining how far into the future we are planning.

With this, we define the function \(\text{predict_plan_outcome}(s_t, p_t)\) which receives the current state \(s_t\) and a plan \(p_t\) and returns a predicted future state \(\tilde{s}_{t+H}\), which represents the predicted outcome of following the plan. Within \(\text{predict_plan_outcome}(\cdot)\), the predictions are made with the help of \(\text{predict_next_state}(\cdot)\) which in turn uses the neural network \(\pi\).

An implementation detail to be noted here is that, \(\text{predict_plan_outcome}(\cdot)\) expects not a single plan, but a batch of plans, and uses PyTorch's vectorization capabilities to make predictions for all those plans in a performant manner.

```
@torch.no_grad()
def predict_plan_outcome(state: torch.Tensor, plan_batch: torch.Tensor) -> torch.Tensor:
assert state.ndim == 1
batch_size, plan_length = plan_batch.shape
state_batch = state * torch.ones(batch_size, len(state))
plan_batch = plan_batch.reshape(batch_size, -1, 2)
horizon = plan_batch.shape[1]
for t in range(horizon):
action_batch = plan_batch[:, t, :]
state_batch = predict_next_state(state_batch, action_batch)
return state_batch
```

So far, we have defined the tools necessary for making predictions for the given plans.
We also need to be able to *generate* plans.
We pose the generation of plans as an optimization problem, summarized as follows:

that is, given the current state \(s_t\), we are looking for a plan \(p_t\) whose predicted outcome yields a minimal amount of positional error.

We define the class \(\text{PlanningProblem}\) which represents the problem formulated above.

```
class PlanningProblem(Problem):
def __init__(
self,
observation: Iterable,
horizon: int = 4,
):
self.observation = np.asarray(observation, dtype="float32")
self.state = torch.as_tensor(reacher_state(self.observation), dtype=torch.float32)
self.target_xy = torch.tensor(
[
float(self.observation[4]),
float(self.observation[5]),
],
dtype=torch.float32,
)
super().__init__(
objective_sense="min",
initial_bounds=(-0.0001, 0.0001),
solution_length=(horizon * 2),
)
def _evaluate_batch(self, solutions: SolutionBatch):
final_states = predict_plan_outcome(self.state, solutions.values)
final_xys = final_states[:, -2:]
errors = torch.linalg.norm(final_xys - self.target_xy, dim=-1)
solutions.set_evals(errors)
```

The following is a convenience function which tackles the optimization problem defined above using the cross entropy method (CEM). The best solution produced by CEM becomes the adopted plan. Finally, the adopted plan's first action is returned (to be sent to the simulator).

```
def do_planning(observation: Iterable) -> Iterable:
problem = PlanningProblem(observation)
searcher = CEM(
problem,
stdev_init=0.5,
popsize=250, # population size
parenthood_ratio=0.5,
stdev_max_change=0.2,
)
searcher.run(20) # run for this many generations
return searcher.status["best"].values[:2].clone().numpy()
```

## Main MPC Loop¶

Using the tools defined above, we are now ready to implement the main parts of our MPC.
We begin by instantiating the RL environment for `Reacher-v4`

.

```
from packaging.version import Version
old_render_api = Version(gym.__version__) < Version("0.26")
if old_render_api:
# For gym versions older than 0.26, we do not have to specify additional
# keyword arguments for human-mode rendering.
env = gym.make("Reacher-v4")
else:
# For gym versions beginning with 0.26, we have to explicitly specify
# that the rendering mode is "human" if we wish to do the rendering on
# the screen.
env = gym.make("Reacher-v4", render_mode="human")
env
```

The following function defines the main loop of MPC for a single episode of the RL environment. For each timestep of the environment, a plan is made and the first action of the plan is applied on the environment.

```
def run_episode(visualize: bool = False):
reset_result = env.reset()
if isinstance(reset_result, tuple):
# If the result of the `reset()` method is a tuple, then we assume
# that it returned a tuple in the form (observation, info).
observation, _ = reset_result
else:
# If the result of the `reset()` method is not a tuple, then we
# assume that it returned the observation by itself.
observation = reset_result
if visualize:
env.render()
while True:
action = do_planning(observation)
action = np.clip(action, -1.0, 1.0)
step_result = env.step(action)
if len(step_result) == 5:
observation, reward, terminated, truncated, info = step_result
done = terminated | truncated
elif len(step_result) == 4:
observation, reward, done, info = step_result
else:
assert False, "Unexpected number of items returned by `.step(...)`"
if visualize:
env.render()
if done:
break
return np.linalg.norm([observation[-3], observation[-2]])
```

Run the MPC for the specified number of episodes.

```
NUM_EPISODES = 10
for _ in range(NUM_EPISODES):
print("distance to the goal:", run_episode(visualize=True))
```

#### References¶

[1] Rubinstein, Reuven. "The cross-entropy method for combinatorial and continuous optimization." Methodology and computing in applied probability 1.2 (1999): 127-190.