From 26fecf078656188f5703ae57ce03d5ee34f2d90d Mon Sep 17 00:00:00 2001 From: Filippo Airaldi Date: Fri, 15 Nov 2024 16:39:53 +0100 Subject: [PATCH] refactored set_time_varying_affine_dynamics in multiple shooting --- src/csnlp/wrappers/mpc/pwa_mpc.py | 101 +++++++++++++++++++----------- tests/test_examples.py | 63 +++++++++++++++++-- 2 files changed, 123 insertions(+), 41 deletions(-) diff --git a/src/csnlp/wrappers/mpc/pwa_mpc.py b/src/csnlp/wrappers/mpc/pwa_mpc.py index c57b9fe..b0280a9 100644 --- a/src/csnlp/wrappers/mpc/pwa_mpc.py +++ b/src/csnlp/wrappers/mpc/pwa_mpc.py @@ -15,9 +15,9 @@ SymType = TypeVar("SymType", cs.SX, cs.MX) -def _n(parname: str, index: int) -> str: - """Internal utility for the naming convention of the ``i``-th region parameters.""" - return f"{parname}__{index}" +def _n(parname: str, prefix: str) -> str: + """Internal utility for the naming convention of the TVA dynamics's parameters.""" + return f"{parname}__{prefix}" @dataclass @@ -102,6 +102,8 @@ class PwaMpc(Mpc[SymType]): Raises if the shooting method is invalid; or if any of the horizons are invalid; or if the number of scenarios is not a positive integer.""" + tva_dynamics_name = "tva_dyn" + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._fixed_sequence_dynamics = False @@ -244,9 +246,21 @@ def set_pwa_dynamics( self._dynamics_already_set = True def set_time_varying_affine_dynamics(self, pwa_system: Sequence[PwaRegion]) -> None: - r"""Sets the time-varying affine dynamics of the system for the MPC controller. - The possible values taken by the affine dynamics are defined by the sequence of - :class:`PwaRegion`. + r"""Sets time-varying affine dynamics as the controller's prediction model and + creates the corresponding dynamics constraints. The dynamics in the affine + time-varying form are described as + + .. math:: x_{k+1} = A_k x_k + B_k u_k + c_k. + + where :math:`x_k` and :math:`x_{k+1}` are the current and next state, + :math:`u_k` is the control action, :math:`w_k` is the disturbance, and + :math:`A_k, B_k, c_k` are the constant matrices of the region to be visited by + the state trajectory at time step `k`. By setting the dynamics with this method, + the user can then specify the switching sequence of regions to be active at each + time step along the prediction horizon via :meth:`set_switching_sequence`, and + solve a much simpler optimization problem, as the sequence is fixed. Instead, if + :meth:`set_pwa_dynamics` is used, the sequence of regions is optimized over via + a (usually expensive) mixed-integer optimization problem. Parameters ---------- @@ -261,7 +275,6 @@ def set_time_varying_affine_dynamics(self, pwa_system: Sequence[PwaRegion]) -> N ValueError Raises if the dimensions of any matrix in any region do not match the expected shape. - """ if self._dynamics_already_set: raise RuntimeError("Dynamics were already set.") @@ -273,32 +286,21 @@ def set_time_varying_affine_dynamics(self, pwa_system: Sequence[PwaRegion]) -> N nsa = ns + na n_ineq = pwa_system[0].T.size - # parameters defining time-varying dynamics - A = [self.parameter(f"A[{k}]", (ns, ns)) for k in range(N)] - B = [self.parameter(f"B[{k}]", (ns, na)) for k in range(N)] - c = [self.parameter(f"c[{k}]", (ns, 1)) for k in range(N)] - S = [self.parameter(f"S[{k}]", (n_ineq, nsa)) for k in range(N)] - T = [self.parameter(f"T[{k}]", (n_ineq, 1)) for k in range(N)] - - X = cs.vcat(self._states.values()) - U = cs.vcat(self._actions_exp.values()) + prefix = self.tva_dynamics_name + As = cs.vertsplit_n(self.parameter(_n("A", prefix), (ns * N, ns)), N) + Bs = cs.vertsplit_n(self.parameter(_n("B", prefix), (ns * N, na)), N) + Cs = cs.vertsplit_n(self.parameter(_n("c", prefix), (ns * N, 1)), N) + Ss = cs.vertsplit_n(self.parameter(_n("S", prefix), (n_ineq * N, nsa)), N) + T = self.parameter(_n("T", prefix), (n_ineq * N, 1)) - # set dynamics constraints and region constraints - if not self._is_multishooting: + if self._is_multishooting: + X, U = self._set_multishooting_tva_dynamics(As, Bs, Cs) + else: raise NotImplementedError("Single shooting not implemented yet.") - xs_next = [] - for k in range(N): - x_next = A[k] @ X[:, k] + B[k] @ U[:, k] + c[k] - xs_next.append(x_next) - self.constraint( - "region", - cs.diagcat(*S) - @ cs.vertcat(*[cs.vertcat(X[:, k], U[:, k]) for k in range(N)]) - - cs.vertcat(*T), - "<=", - 0, - ) - self.constraint("dyn", cs.hcat(xs_next), "==", X[:, 1:]) + + S = cs.dcat(Ss) + XU = cs.vec(cs.vertcat(X[:, :-1], U)) # NOTE: different from vvcat! + self.constraint("region", S @ XU - T, "<=", 0) self._pwa_system = pwa_system self._dynamics_already_set = True @@ -369,12 +371,23 @@ def solve( if pars is None: pars = {} - for k, idx in enumerate(self._sequence): - pars[_n("A", k)] = regions[idx].A - pars[_n("B", k)] = regions[idx].B - pars[_n("c", k)] = regions[idx].c - pars[_n("S", k)] = regions[idx].S - pars[_n("T", k)] = regions[idx].T + As = [] + Bs = [] + Cs = [] + Ss = [] + Ts = [] + for idx in self._sequence: + As.append(regions[idx].A) + Bs.append(regions[idx].B) + Cs.append(regions[idx].c) + Ss.append(regions[idx].S) + Ts.append(regions[idx].T) + prefix = self.tva_dynamics_name + pars[_n("A", prefix)] = np.concatenate(As, 0) + pars[_n("B", prefix)] = np.concatenate(Bs, 0) + pars[_n("c", prefix)] = np.concatenate(Cs, 0) + pars[_n("S", prefix)] = np.concatenate(Ss, 0) + pars[_n("T", prefix)] = np.concatenate(Ts, 0) return self.nlp.solve(pars, vals0) @staticmethod @@ -491,3 +504,17 @@ def _set_pwa_dynamics( self.constraint("region", cs.vcat(region), "<=", 0) self.constraint("z_x_ub", cs.vcat(z_x_ub), "<=", 0) self.constraint("z_x_lb", cs.vcat(z_x_lb), ">=", 0) + + def _set_multishooting_tva_dynamics( + self, As: Iterable[SymType], Bs: Iterable[SymType], Cs: Iterable[SymType] + ) -> tuple[SymType, SymType]: + """Internal utility to create time-varying affine dynamics constraints in + mutple shooting.""" + X = cs.vcat(self._states.values()) + U = cs.vcat(self._actions_exp.values()) + X_next_ = [] + for k, (A, B, C) in enumerate(zip(As, Bs, Cs)): + X_next_.append(A @ X[:, k] + B @ U[:, k] + C) + X_next = cs.hcat(X_next_) + self.constraint("dynamics", X_next, "==", X[:, 1:]) + return X, U diff --git a/tests/test_examples.py b/tests/test_examples.py index 771a20f..8bf135c 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -368,11 +368,66 @@ def test__pwa_mpc(self, shooting: str): ), "delta": np.asarray([[1.0, 1.0], [0.0, 0.0]]), } - actual = { - "u": sol.vals["u"].full(), - "x": sol.value(x) if shooting == "single" else sol.value(x).full(), - "delta": sol.vals["delta"].full(), + actual = {"u": sol.vals["u"], "x": sol.value(x), "delta": sol.vals["delta"]} + for name, val in expected.items(): + np.testing.assert_allclose(actual[name], val, *tols, err_msg=name) + + @parameterized.expand([("multi",)]) + def test__pwa_mpc__with_sequence(self, shooting: str): + np_random = np.random.default_rng(42) + + tau, k1, k2, d, m = 0.5, 10, 1, 4, 10 + A1 = np.array([[1, tau], [-((tau * 2 * k1) / m), 1 - (tau * d) / m]]) + A2 = np.array([[1, tau], [-((tau * 2 * k2) / m), 1 - (tau * d) / m]]) + B1 = B2 = np.array([[0], [tau / m]]) + C1, C2 = np_random.normal(scale=0.01, size=(2, A1.shape[0])) + S1 = np.array([[1, 0, 0]]) + S2 = -S1 + T1, T2 = np_random.normal(scale=0.01, size=(2, S1.shape[0])) + x_bnd = (5, 5) + u_bnd = 20 + pwa_regions = ( + wrappers.PwaRegion(A1, B1, C1, S1, T1), + wrappers.PwaRegion(A2, B2, C2, S2, T2), + ) + D1 = np.array([[1, 0], [-1, 0], [0, 1], [0, -1]]) + E1 = np.array([x_bnd[0], x_bnd[0], x_bnd[1], x_bnd[1]]) + D2 = np.array([[1], [-1]]) + E2 = np.array([u_bnd, u_bnd]) + mpc = wrappers.PwaMpc( + nlp=Nlp[cs.SX](sym_type="SX"), prediction_horizon=2, shooting=shooting + ) + x, _ = mpc.state("x", 2) + u, _ = mpc.action("u") + mpc.set_time_varying_affine_dynamics(pwa_regions) + if shooting == "single": + x = mpc.states["x"] # previous `x` is None if in single shooting + mpc.constraint("state_constraints", D1 @ x - E1, "<=", 0) + mpc.constraint("input_constraints", D2 @ u - E2, "<=", 0) + mpc.minimize(cs.sumsqr(x) + cs.sumsqr(u)) + mpc.init_solver( + { + "print_time": False, + "print_iter": False, + "print_info": False, + "print_header": False, + }, + "qrqp", + ) + mpc.set_switching_sequence([0, 0]) + sol = mpc.solve(pars={"x_0": [-3, 0]}) + + tols = (1e-6, 1e-6) + expected = { + "u": np.asarray([[-3.751224743945753, -4.563681191310823]]), + "x": np.asarray( + [ + [-3.0, -2.9969528092116566, -1.5928861678523183], + [0.0, 2.80203890175142, 5.000000009994731], + ] + ), } + actual = {"u": sol.vals["u"], "x": sol.value(x)} for name, val in expected.items(): np.testing.assert_allclose(actual[name], val, *tols, err_msg=name)