Distributed Model Predictive Control (MPC)¶
In this page we show how to implement a distributed Model Predictive Control (MPC) algorithm for linear systems. We consider the classical algorithm proposed in the paper [MPCRH07] applied to single-integrator systems as in the numerical example of [MPCRH07]. The resulting algorithm is simulated by using the integration features of ChoiRbot and visualization with RVIZ.
Prerequisites¶
We assume a working installation of ChoiRbot is available (see the installation page) and that the desktop version of ROS 2 is installed (this example requires RVIZ). This example requires the DISROPT package to be installed.
The reader is assumed to be familiar with the basic concepts of ROS 2, Python and ChoiRbot (see the quick start page).
Maths of the problem¶
We consider \(N\) robots with linear dynamics that must be regulated to the origin while satisfying a given set of coupling constraints among their outputs.
System dynamics and communication graph¶
The considered algorithm models each robot as having discrete-time linear dynamics
where for all \(i \in \{1, \ldots, N\}\), \(x_i \in \mathbb{R}^{n_i}\) is the \(i\)-th system state, \(u_i \in \mathbb{R}^{m_i}\) is the system input, \(y_i \in \mathbb{R}^{p}\) is the system output and the matrices \(A_i, B_i, C_i, D_i\) have appropriate dimensions. Note that all the robots have the same output dimension \(p\).
In the considered distributed MPC scheme the robots are assumed to be ordered sequentially according to their labels \(i \in \{1, \ldots, N\}\) and that they communicate according to this structure.
Cost function and constraints¶
Each robot wants to minimize a quadratic stage cost of the type
for some positive definite matrices \(Q_i, R_i\). Note that this objective function encodes the regulation task. The robots are assumed to have individual constraints on the states and/or on the inputs that can be modeled as inequalities,
for some matrices \(E_i, G_i\) and some vectors \(f_i, h_i\). The coupling among the robot outputs \(y_i\) is realized with the inequality constraint
with \(b \in \mathbb{R}^p\).
Distributed control scheme¶
The conceptual idea of MPC is to solve an optimal control problem over a finite prediction horizon \(T \in \mathbb{N}\), and then to apply only the first piece of the computed input trajectory. Subsequently, a new optimization problem is formulated taking the realized system state as new initial condition. The distributed MPC algorithm considered here approximates the overall optimal control problem (involving all the robots) with a sequence of local sub-problems. Each robot \(i\) solves once per iteration a sub-problem, involving only its own control inputs as decision variables and fixing the information regarding the other robots to the estimates received by robot \(i-1\).
The optimal control problem solved by robot \(i\) at each time \(t\) is as follows
In the previous problem, the notation \(x_i(t+\tau|t)\) denotes the state trajectory at time \(t+\tau\) that has been predicted at time \(t\) (and similarly for \(u_i\) and \(y_i\)), while \(x_i(t)\) is the actual (measured) state. The symbols \(\bar{y}_j(t+\tau|t)\) denote the estimates of the output trajectories of the other robots.
The overall distributed MPC algorithm is as follows.
Set \(t = 0\) and find a set of output trajectories \(\bar{y}_i(\tau|0), i = 1, \ldots, N\) satisfying the coupling constraints for all \(\tau \in \{0, \ldots, T-1\}\).
For \(i = 1, \ldots, N\) let robot \(i\) do the following
receive the set of output trajectories
solve the local optimal control problem
update the set of output trajectories by replacing \(\bar{y}_i\) with the newly computed \(y_i\)
Set \(t \leftarrow t+1\) and go to step 2
It is important to note that, when going from step 3 to step 2, the robots must extend the output trajectories \(\bar{y}_i\) (i.e. they must determine \(\bar{y}_i(t+T)\)) while ensuring that the new trajectories satisfy the coupling constraints \(\sum_{i=1}^N y_i \le b\) at time \(t+T\). To implement this scheme, it is assumed that the robots already know how to perform such an operation.
Implementation in ChoiRbot¶
In order to implement the MPC example in ChoiRbot, we consider the following nodes for each robot:
a Team Guidance node implementing the computation/communication steps of the distributed MPC algorithm and computes \(u_i(t)\) with a certain frequency
an Integrator node that computes the evolution of the robot’s state according to the applied inputs with a higher frequency than the Team Guidance node
a visualization node that publishes the current position to the RVIZ topic
As required by the ChoiRbot paradigm, we finally write the launch file and the executable scripts for the three nodes.
We analyze each of these components separately in the following subsections.
Team Guidance (main class)¶
To implement the steps of the distributed MPC algorithm, we need to use the optimization
features, therefore the resulting class for the Team Guidance layer must extend the
class OptimizationGuidance
, which requires the method
_optimization_ended
(called at the end of an optimization) to be implemented.
For the new class, we will require
an initialization block that stores information on the control scenario (system matrices, cost function, constraints, prediction horizon, output trajectory continuation)
a method called with a user-defined frequency and asking the optimization thread to solve the optimal control problem
the method
_optimization_ended
(called upon optimization completion), which sends the trajectory to the next robot and shifts the prediction horizon
The implementation of these main components in the new class
MPCGuidance
is as follows:
import numpy as np
from choirbot.optimizer import MPCOptimizer
from choirbot.guidance import OptimizationGuidance
class MPCGuidance(OptimizationGuidance):
# initialization of the class
def __init__(self, sampling_period, pos_handler, pos_topic):
super().__init__(MPCOptimizer(), MPCOptimizationThread, pos_handler, pos_topic)
self.sampling_period = sampling_period
self.timer = self.create_timer(sampling_period, self.control)
self.ctrl_publisher = self.create_publisher(Vector3, 'velocity', 1)
self.system_matrices = None
self.traj_continuation = None
self.output_trajectories = {}
self.prediction_horizon = None
self.can_control = False
# initialization of the control scenario
def initialize(self, prediction_horizon, system_matrices, cost_matrices,
traj_continuation, coupling_constraints, local_constraints):
# initialize local variables
self.system_matrices = system_matrices
self.traj_continuation = traj_continuation
self.prediction_horizon = prediction_horizon
# initialize optimization scenario
self.optimizer.initialize_scenario(self.agent_id, prediction_horizon,
system_matrices, cost_matrices, coupling_constraints, local_constraints)
# mark class as ready
self.can_control = True
# main control loop (before optimal control problem)
def control(self):
# initialize output trajectory (only the first time)
if not self.output_trajectories:
self.initialize_output_trajectory()
# gather new trajectories at agent 0
self.collect_trajectories()
# receive trajectories from agent i-1
if self.agent_id != 0:
traj = self.communicator.neighbors_receive([self.agent_id-1])
self.output_trajectories = traj[self.agent_id-1]
# create and solve local optimal control problem
self.optimizer.create_opt_control_problem(self.current_pose.position[0], self.output_trajectories)
self.optimization_thread.optimize()
# main control loop (after optimal control problem)
def _optimization_ended(self):
# get resulting trajectories (from 0 to T-1)
state_traj, input_traj, output_traj = self.optimizer.get_result()
# update set of output trajectories
self.output_trajectories[self.agent_id] = output_traj
# send trajectories to agent i+1
if self.agent_id != self.n_agents-1:
self.communicator.neighbors_send(self.output_trajectories, [self.agent_id+1])
# apply control input and shift horizon
self.send_input(input_traj[:, 0])
self.shift_horizon(state_traj)
def shift_horizon(self, state_traj):
# ... extend local output trajectory by calling self.trajectory_continuation()
def collect_trajectories(self):
# ... collection of trajectories at agent 0
Although the final class has additional components and checks not reported here, the main flow is well represented by the previous code block. Let us highlight some details. Consider the beginning of the initialization block:
def __init__(self, sampling_period, pos_handler, pos_topic):
super().__init__(MPCOptimizer(), MPCOptimizationThread, pos_handler, pos_topic)
This instruction calls the method choirbot.guidance.OptimizationGuidance.__init__()
,
which requires an Optimizer
and an
OptimizationThread
(actually, only its type). The MPCOptimizer
class is
responsible for solving optimal control problems, while the
MPCOptimizationThread
class is responsible for
starting and stopping the optimization process (more details next).
The optimizer is firstly called in the initialize
method:
self.optimizer.initialize_scenario(self.agent_id, prediction_horizon,
system_matrices, cost_matrices, coupling_constraints, local_constraints)
Here, the optimizer is provided with the necessary information to
formulate optimal control problems. The thread is called in
the control
method:
self.optimizer.create_opt_control_problem(self.current_pose.position[0], self.output_trajectories)
self.optimization_thread.optimize()
Here, we provide the optimizer with the current robot state \(x(t)\)
and with the estimates of the output trajectories \(\bar{y}_j(t+\tau|t)\)
for \(j \ne i\) and \(\tau = 0, \ldots, T-1\). Then, we ask the
optimization thread to start the optimization process. When the optimization
has finished, the optimization thread triggers execution of the
_optimization_ended
callback, which finalizes the current control
iteration and prepares the robot for the next one (this mechanism is already
implemented by the classes
OptimizationThread
and OptimizationGuidance
).
Team Guidance (optimization classes)¶
Let us give more details on the optimization-related classes.
The class MPCOptimizationThread
is very
straightforward since it extends the abstract class
OptimizationThread
as follows:
from choirbot.guidance.optimization_thread import OptimizationThread
class MPCOptimizationThread(OptimizationThread):
def do_optimize(self):
self.optimizer.optimize()
Indeed, the class OptimizationThread
already provides all the features to start and stop the optimization,
and we are only required to specify that the body of the optimization
process is entirely delegated to MPCOptimizer
.
The MPCOptimizer
class provides the necessary
features to solve optimal control problems by using
DISROPT
under the hood. We only report its main structure:
from choirbot.optimizer import Optimizer
class MPCOptimizer(Optimizer):
def initialize_scenario(self, robot_id, prediction_horizon, system_matrices,
cost_matrices, coupling_constraints, local_constraints):
# initialize a parametric optimal control problem
# (parameters are initial conditions and output trajectories)
def create_opt_control_problem(self, initial_condition, output_trajectories):
# create actual optimal control problem
def optimize(self):
# solve optimal control problem and keep track of solution
def get_result(self):
# ... do calculations
return x_traj, u_traj, y_traj
Integration of dynamics¶
The Team Guidance layer will publish control inputs in the velocity
topic.
To address integration of the robot dynamics and publication of odometry
messages, we simply use the choirbot.integrator.SingleIntegrator
class, which subscribes to the velocity
topic, integrates the dynamics
with a user-defined frequency and publishes the updated position in the
odom
topic.
Visualization¶
For the visualization node we use the class choirbot.utils.Visualizer
.
The purpose of this node is very simple: to subscribe to the odom
topic
and to forward messages to the /visualization_marker
topic. RVIZ reads
data from this topic and draws circles (representing robots) at the
specified positions.
Launch file and executables¶
See the corresponding section in the formation control example.
Running the simulation¶
To run the simulation, we simply need to execute the launch file. First we source the workspace:
source install/setup.bash
Now we are ready to run the example:
ros2 launch choirbot_examples mpc.launch.py
An RVIZ window will open. After a few seconds, the circles (representing robots) begin to move toward the origin while keeping their inter-distance within the prescribed bounds:
References