Model Predictive Control (MPC) with EvoTorch¶
In this example, we study Model Predictive Control (MPC) using the Cross Entropy Method (CEM) [1].
The aim of MPC is to obtain a model of a controllable system, and then to use optimisation to find the control actions which achieve some goal. The model can be a neural network trained via supervised learning, which is the case in our example. Therefore, with the supervised learning phase included, the general overview of the approach used here is:
Step 1: Generate and record random trajectories using the controllable system.
Step 2: Using PyTorch, learn a forward model of the system by using the recorded random trajectories as training data.
Step 3: Apply MPC on the problem. Here is a very high level definition of it:
- for timestep \(t = t_1 ... t_T\)
- By using CEM, find a vector \([action_1, ... action_A]\) which, according to the forward model, minimises some cost. The resulting best solution of CEM is the plan.
- Apply the first action of the plan to the simulator.
Although this notebook focuses on step 3 (i.e. the step which involves the application of MPC), we also provide, for completeness, a separate notebook which shows the steps 1 and 2.
In this example, we use an MPC agent to solve the MuJoCo task Reacher-v4
.
Reacher-v4
is a reinforcement learning environment in which the goal is to make a simulated robotic arm reach the goal point.
To explain the example, we first make the following definitions:
- \(s_t\): State of the robotic arm at time \(t\)
- \(a_t\): Action taken by the robotic arm at time \(t\)
- \(\pi\): A forward model (in the form of a neural network) which, given the input \((s_t, a_t)\), predicts the change in state \({s'}_t\), such that the predicted next state \(\tilde{s}_{t+1}\) can be computed as \(s_t + {s'}_t\) (training of \(\pi\) is shown here)
Specific to our Reacher-v4
task, let us also define the following:
- \(s_t^x\), \(s_t^y\): x and y coordinates of the point reached by the robotic arm, according to the state \(s_t\)
- \(g^x\), \(g^y\): x and y coordinates of the goal point
Our aim, therefore, is to make the robotic arm move in such a way that the positional error (i.e. the euclidean distance between \((s_t^x, s_t^y)\) and \((g^x, g^y)\)) is minimized. For this, we will choose our actions with the help of the predictions of the neural network \(\pi\).
Requirements¶
Because this example focuses on the Reacher-v4
reinforcement learning environment, gym
with mujoco
support is required. One can install the mujoco
support for gym
via:
Initial imports¶
We begin our code with the necessary imports.
import torch
from torch import nn
import numpy as np
import pickle
from evotorch import Problem, Solution, SolutionBatch
from evotorch.algorithms import CEM
from evotorch.logging import StdOutLogger
from typing import Iterable
import gym
Loading the model¶
Below, we load the forward model \(\pi\) which is a neural network expressed as a PyTorch module. We also load the data required for normalizing the inputs and de-normalizing the outputs \(\pi\).
with open("reacher_model.pickle", "rb") as f:
loaded = pickle.load(f)
input_mean = torch.as_tensor(loaded["input_mean"], dtype=torch.float32)
input_stdev = torch.as_tensor(loaded["input_stdev"], dtype=torch.float32)
target_mean = torch.as_tensor(loaded["target_mean"], dtype=torch.float32)
target_stdev = torch.as_tensor(loaded["target_stdev"], dtype=torch.float32)
model = loaded["model"]
Definitions¶
We begin our definitions with a helper function, \(\text{reacher_state}(\text{observation})\) which extracts the state (\(s_t\)) of the robotic arm from the observation vector returned by the environment.
def reacher_state(observation: Iterable) -> Iterable:
observation = np.asarray(observation, dtype="float32")
state = np.concatenate([observation[:4], observation[6:10]])
state[-2] += observation[4]
state[-1] += observation[5]
return state
We now define the function \(\text{predict_next_state}(s_t, a_t)\) which, given a state \(s_t\) and an action \(a_t\) (\(t\) being the current timestep), returns the predicted next state \(\tilde{s}_{t+1}\).
Within itself, this function uses the neural network \(\pi\) to make its predictions.
@torch.no_grad()
def predict_next_state(state: torch.Tensor, action: torch.Tensor) -> torch.Tensor:
action = torch.clamp(action, -1.0, 1.0)
state_and_action = (torch.hstack([state, action]) - input_mean) / input_stdev
return ((model(state_and_action) * target_stdev) + target_mean) + state
Let us now define a plan \(p_t\) as a series of actions planned for future timesteps, i.e.: \(p_t = (a_t, a_{t+1}, a_{t+2}, ..., a_{t+(H-1)})\) where \(H\) is the horizon, determining how far into the future we are planning.
With this, we define the function \(\text{predict_plan_outcome}(s_t, p_t)\) which receives the current state \(s_t\) and a plan \(p_t\) and returns a predicted future state \(\tilde{s}_{t+H}\), which represents the predicted outcome of following the plan. Within \(\text{predict_plan_outcome}(\cdot)\), the predictions are made with the help of \(\text{predict_next_state}(\cdot)\) which in turn uses the neural network \(\pi\).
An implementation detail to be noted here is that, \(\text{predict_plan_outcome}(\cdot)\) expects not a single plan, but a batch of plans, and uses PyTorch's vectorization capabilities to make predictions for all those plans in a performant manner.
@torch.no_grad()
def predict_plan_outcome(state: torch.Tensor, plan_batch: torch.Tensor) -> torch.Tensor:
assert state.ndim == 1
batch_size, plan_length = plan_batch.shape
state_batch = state * torch.ones(batch_size, len(state))
plan_batch = plan_batch.reshape(batch_size, -1, 2)
horizon = plan_batch.shape[1]
for t in range(horizon):
action_batch = plan_batch[:, t, :]
state_batch = predict_next_state(state_batch, action_batch)
return state_batch
So far, we have defined the tools necessary for making predictions for the given plans. We also need to be able to generate plans. We pose the generation of plans as an optimization problem, summarized as follows:
that is, given the current state \(s_t\), we are looking for a plan \(p_t\) whose predicted outcome yields a minimal amount of positional error.
We define the class \(\text{PlanningProblem}\) which represents the problem formulated above.
class PlanningProblem(Problem):
def __init__(
self,
observation: Iterable,
horizon: int = 4,
):
self.observation = np.asarray(observation, dtype="float32")
self.state = torch.as_tensor(reacher_state(self.observation), dtype=torch.float32)
self.target_xy = torch.tensor(
[
float(self.observation[4]),
float(self.observation[5]),
],
dtype=torch.float32,
)
super().__init__(
objective_sense="min",
initial_bounds=(-0.0001, 0.0001),
solution_length=(horizon * 2),
)
def _evaluate_batch(self, solutions: SolutionBatch):
final_states = predict_plan_outcome(self.state, solutions.values)
final_xys = final_states[:, -2:]
errors = torch.linalg.norm(final_xys - self.target_xy, dim=-1)
solutions.set_evals(errors)
The following is a convenience function which tackles the optimization problem defined above using the cross entropy method (CEM). The best solution produced by CEM becomes the adopted plan. Finally, the adopted plan's first action is returned (to be sent to the simulator).
def do_planning(observation: Iterable) -> Iterable:
problem = PlanningProblem(observation)
searcher = CEM(
problem,
stdev_init=0.5,
popsize=250, # population size
parenthood_ratio=0.5,
stdev_max_change=0.2,
)
searcher.run(20) # run for this many generations
return searcher.status["best"].values[:2].clone().numpy()
Main MPC Loop¶
Using the tools defined above, we are now ready to implement the main parts of our MPC.
We begin by instantiating the RL environment for Reacher-v4
.
The following function defines the main loop of MPC for a single episode of the RL environment. For each timestep of the environment, a plan is made and the first action of the plan is applied on the environment.
def run_episode(visualize: bool = False):
observation = env.reset(return_info=False)
if visualize:
env.render()
while True:
action = do_planning(observation)
action = np.clip(action, -1.0, 1.0)
observation, reward, done, info = env.step(action)
if visualize:
env.render()
if done:
break
return np.linalg.norm([observation[-3], observation[-2]])
Run the MPC for the specified number of episodes.
NUM_EPISODES = 10
for _ in range(NUM_EPISODES):
print("distance to the goal:", run_episode(visualize=True))
References¶
[1] Rubinstein, Reuven. "The cross-entropy method for combinatorial and continuous optimization." Methodology and computing in applied probability 1.2 (1999): 127-190.