Skip to content

Commit

Permalink
refactored set_time_varying_affine_dynamics in multiple shooting
Browse files Browse the repository at this point in the history
  • Loading branch information
FilippoAiraldi committed Nov 15, 2024
1 parent fbd6f23 commit 26fecf0
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 41 deletions.
101 changes: 64 additions & 37 deletions src/csnlp/wrappers/mpc/pwa_mpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
SymType = TypeVar("SymType", cs.SX, cs.MX)


def _n(parname: str, index: int) -> str:
"""Internal utility for the naming convention of the ``i``-th region parameters."""
return f"{parname}__{index}"
def _n(parname: str, prefix: str) -> str:
"""Internal utility for the naming convention of the TVA dynamics's parameters."""
return f"{parname}__{prefix}"


@dataclass
Expand Down Expand Up @@ -102,6 +102,8 @@ class PwaMpc(Mpc[SymType]):
Raises if the shooting method is invalid; or if any of the horizons are invalid;
or if the number of scenarios is not a positive integer."""

tva_dynamics_name = "tva_dyn"

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._fixed_sequence_dynamics = False
Expand Down Expand Up @@ -244,9 +246,21 @@ def set_pwa_dynamics(
self._dynamics_already_set = True

def set_time_varying_affine_dynamics(self, pwa_system: Sequence[PwaRegion]) -> None:
r"""Sets the time-varying affine dynamics of the system for the MPC controller.
The possible values taken by the affine dynamics are defined by the sequence of
:class:`PwaRegion`.
r"""Sets time-varying affine dynamics as the controller's prediction model and
creates the corresponding dynamics constraints. The dynamics in the affine
time-varying form are described as
.. math:: x_{k+1} = A_k x_k + B_k u_k + c_k.
where :math:`x_k` and :math:`x_{k+1}` are the current and next state,
:math:`u_k` is the control action, :math:`w_k` is the disturbance, and
:math:`A_k, B_k, c_k` are the constant matrices of the region to be visited by
the state trajectory at time step `k`. By setting the dynamics with this method,
the user can then specify the switching sequence of regions to be active at each
time step along the prediction horizon via :meth:`set_switching_sequence`, and
solve a much simpler optimization problem, as the sequence is fixed. Instead, if
:meth:`set_pwa_dynamics` is used, the sequence of regions is optimized over via
a (usually expensive) mixed-integer optimization problem.
Parameters
----------
Expand All @@ -261,7 +275,6 @@ def set_time_varying_affine_dynamics(self, pwa_system: Sequence[PwaRegion]) -> N
ValueError
Raises if the dimensions of any matrix in any region do not match the
expected shape.
"""
if self._dynamics_already_set:
raise RuntimeError("Dynamics were already set.")
Expand All @@ -273,32 +286,21 @@ def set_time_varying_affine_dynamics(self, pwa_system: Sequence[PwaRegion]) -> N
nsa = ns + na
n_ineq = pwa_system[0].T.size

# parameters defining time-varying dynamics
A = [self.parameter(f"A[{k}]", (ns, ns)) for k in range(N)]
B = [self.parameter(f"B[{k}]", (ns, na)) for k in range(N)]
c = [self.parameter(f"c[{k}]", (ns, 1)) for k in range(N)]
S = [self.parameter(f"S[{k}]", (n_ineq, nsa)) for k in range(N)]
T = [self.parameter(f"T[{k}]", (n_ineq, 1)) for k in range(N)]

X = cs.vcat(self._states.values())
U = cs.vcat(self._actions_exp.values())
prefix = self.tva_dynamics_name
As = cs.vertsplit_n(self.parameter(_n("A", prefix), (ns * N, ns)), N)
Bs = cs.vertsplit_n(self.parameter(_n("B", prefix), (ns * N, na)), N)
Cs = cs.vertsplit_n(self.parameter(_n("c", prefix), (ns * N, 1)), N)
Ss = cs.vertsplit_n(self.parameter(_n("S", prefix), (n_ineq * N, nsa)), N)
T = self.parameter(_n("T", prefix), (n_ineq * N, 1))

# set dynamics constraints and region constraints
if not self._is_multishooting:
if self._is_multishooting:
X, U = self._set_multishooting_tva_dynamics(As, Bs, Cs)
else:
raise NotImplementedError("Single shooting not implemented yet.")
xs_next = []
for k in range(N):
x_next = A[k] @ X[:, k] + B[k] @ U[:, k] + c[k]
xs_next.append(x_next)
self.constraint(
"region",
cs.diagcat(*S)
@ cs.vertcat(*[cs.vertcat(X[:, k], U[:, k]) for k in range(N)])
- cs.vertcat(*T),
"<=",
0,
)
self.constraint("dyn", cs.hcat(xs_next), "==", X[:, 1:])

S = cs.dcat(Ss)
XU = cs.vec(cs.vertcat(X[:, :-1], U)) # NOTE: different from vvcat!
self.constraint("region", S @ XU - T, "<=", 0)

self._pwa_system = pwa_system
self._dynamics_already_set = True
Expand Down Expand Up @@ -369,12 +371,23 @@ def solve(

if pars is None:
pars = {}
for k, idx in enumerate(self._sequence):
pars[_n("A", k)] = regions[idx].A
pars[_n("B", k)] = regions[idx].B
pars[_n("c", k)] = regions[idx].c
pars[_n("S", k)] = regions[idx].S
pars[_n("T", k)] = regions[idx].T
As = []
Bs = []
Cs = []
Ss = []
Ts = []
for idx in self._sequence:
As.append(regions[idx].A)
Bs.append(regions[idx].B)
Cs.append(regions[idx].c)
Ss.append(regions[idx].S)
Ts.append(regions[idx].T)
prefix = self.tva_dynamics_name
pars[_n("A", prefix)] = np.concatenate(As, 0)
pars[_n("B", prefix)] = np.concatenate(Bs, 0)
pars[_n("c", prefix)] = np.concatenate(Cs, 0)
pars[_n("S", prefix)] = np.concatenate(Ss, 0)
pars[_n("T", prefix)] = np.concatenate(Ts, 0)
return self.nlp.solve(pars, vals0)

@staticmethod
Expand Down Expand Up @@ -491,3 +504,17 @@ def _set_pwa_dynamics(
self.constraint("region", cs.vcat(region), "<=", 0)
self.constraint("z_x_ub", cs.vcat(z_x_ub), "<=", 0)
self.constraint("z_x_lb", cs.vcat(z_x_lb), ">=", 0)

def _set_multishooting_tva_dynamics(
self, As: Iterable[SymType], Bs: Iterable[SymType], Cs: Iterable[SymType]
) -> tuple[SymType, SymType]:
"""Internal utility to create time-varying affine dynamics constraints in
mutple shooting."""
X = cs.vcat(self._states.values())
U = cs.vcat(self._actions_exp.values())
X_next_ = []
for k, (A, B, C) in enumerate(zip(As, Bs, Cs)):
X_next_.append(A @ X[:, k] + B @ U[:, k] + C)
X_next = cs.hcat(X_next_)
self.constraint("dynamics", X_next, "==", X[:, 1:])
return X, U
63 changes: 59 additions & 4 deletions tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,66 @@ def test__pwa_mpc(self, shooting: str):
),
"delta": np.asarray([[1.0, 1.0], [0.0, 0.0]]),
}
actual = {
"u": sol.vals["u"].full(),
"x": sol.value(x) if shooting == "single" else sol.value(x).full(),
"delta": sol.vals["delta"].full(),
actual = {"u": sol.vals["u"], "x": sol.value(x), "delta": sol.vals["delta"]}
for name, val in expected.items():
np.testing.assert_allclose(actual[name], val, *tols, err_msg=name)

@parameterized.expand([("multi",)])
def test__pwa_mpc__with_sequence(self, shooting: str):
np_random = np.random.default_rng(42)

tau, k1, k2, d, m = 0.5, 10, 1, 4, 10
A1 = np.array([[1, tau], [-((tau * 2 * k1) / m), 1 - (tau * d) / m]])
A2 = np.array([[1, tau], [-((tau * 2 * k2) / m), 1 - (tau * d) / m]])
B1 = B2 = np.array([[0], [tau / m]])
C1, C2 = np_random.normal(scale=0.01, size=(2, A1.shape[0]))
S1 = np.array([[1, 0, 0]])
S2 = -S1
T1, T2 = np_random.normal(scale=0.01, size=(2, S1.shape[0]))
x_bnd = (5, 5)
u_bnd = 20
pwa_regions = (
wrappers.PwaRegion(A1, B1, C1, S1, T1),
wrappers.PwaRegion(A2, B2, C2, S2, T2),
)
D1 = np.array([[1, 0], [-1, 0], [0, 1], [0, -1]])
E1 = np.array([x_bnd[0], x_bnd[0], x_bnd[1], x_bnd[1]])
D2 = np.array([[1], [-1]])
E2 = np.array([u_bnd, u_bnd])
mpc = wrappers.PwaMpc(
nlp=Nlp[cs.SX](sym_type="SX"), prediction_horizon=2, shooting=shooting
)
x, _ = mpc.state("x", 2)
u, _ = mpc.action("u")
mpc.set_time_varying_affine_dynamics(pwa_regions)
if shooting == "single":
x = mpc.states["x"] # previous `x` is None if in single shooting
mpc.constraint("state_constraints", D1 @ x - E1, "<=", 0)
mpc.constraint("input_constraints", D2 @ u - E2, "<=", 0)
mpc.minimize(cs.sumsqr(x) + cs.sumsqr(u))
mpc.init_solver(
{
"print_time": False,
"print_iter": False,
"print_info": False,
"print_header": False,
},
"qrqp",
)
mpc.set_switching_sequence([0, 0])
sol = mpc.solve(pars={"x_0": [-3, 0]})

tols = (1e-6, 1e-6)
expected = {
"u": np.asarray([[-3.751224743945753, -4.563681191310823]]),
"x": np.asarray(
[
[-3.0, -2.9969528092116566, -1.5928861678523183],
[0.0, 2.80203890175142, 5.000000009994731],
]
),
}
actual = {"u": sol.vals["u"], "x": sol.value(x)}
for name, val in expected.items():
np.testing.assert_allclose(actual[name], val, *tols, err_msg=name)

Expand Down

0 comments on commit 26fecf0

Please sign in to comment.