Source code for autompc.control.lqr

# Created by William Edwards (wre2@illinois.edu)

from pdb import set_trace

import numpy as np
import numpy.linalg as la

from ConfigSpace import ConfigurationSpace
from ConfigSpace.hyperparameters import (UniformIntegerHyperparameter, 
        CategoricalHyperparameter)
import ConfigSpace.conditions as CSC

from .controller import Controller, ControllerFactory

def _dynamic_ricatti_equation(A, B, Q, R, N, Pk):
    return (A.T @ Pk @ A 
            - (A.T @ Pk @ B + N) 
              @ la.inv(R + B.T @ Pk @ B)
              @ (B.T @ Pk @ A + N.T)
            + Q)

def _inf_horz_dt_lqr(A, B, Q, R, N, threshold=1e-3):
    P1 = Q
    P2 = _dynamic_ricatti_equation(A, B, Q, R, N, P1)
    Pdiff = np.abs(P1 - P2)
    while Pdiff.max() > threshold:
        P1 = P2
        P2 = _dynamic_ricatti_equation(A, B, Q, R, N, P1)
        Pdiff = np.abs(P1 - P2)

    K = -la.inv(R + B.T @ P2 @ B) @ B.T @ P2 @ A

    return K

def _finite_horz_dt_lqr(A, B, Q, R, N, F, horizon):
    P1 = F
    P2 = _dynamic_ricatti_equation(A, B, Q, R, N, P1)
    for _ in range(horizon):
        P1 = P2
        P2 = _dynamic_ricatti_equation(A, B, Q, R, N, P1)
        Pdiff = np.abs(P1 - P2)

    K = -la.inv(R + B.T @ P2 @ B) @ B.T @ P2 @ A
    print("P2=")
    print(P2)

    return K

#class InfiniteHorizonLQR(Controller):
#    def __init__(self, system, model, Q, R):
#        if not model.is_linear:
#            raise ValueError("Linear model required.")
#        super().__init__(system, model)
#        A, B, state_func, cost_func = model.to_linear()
#        Qp, Rp = cost_func(Q, R)
#        N = np.zeros((A.shape[0], B.shape[1]))
#        self.K = _inf_horz_dt_lqr(A, B, Qp, Rp, N)
#        self.state_func = state_func
#        self.Qp, self.Rp = Qp, Rp
#
#    def run(self, traj, latent=None):
#        # Implement control logic here
#        u = self.K @ self.state_func(traj)
#
#        return u, None

# class LQRFactory(ControllerFactory):
#     """
#     Docs
# 
#     Hyperparameters:
#     
#     - *horizon_type* (Type: str, Choices: ["finite", "infinite"], Default: "finite"): Whether horizon is finite or infinite.
#     - *horizon* (Type: int, Low: 1, High: 1000, Default: 10): Length of control horizon. (Conditioned on horizon_type="finite").
#     """
#     def __init__(self, *args, **kwargs):
#         super().__init__(*args, **kwargs)
#         self.name  = "InfiniteHorizonLQR"
# 
#     def get_configuration_space(self):
#         cs = ConfigurationSpace()
#         horizon_type = CategoricalHyperparameter("horizon_type", choices=["finite", "infinite"], default="finite")
#         horizon = UniformIntegerHyperparameter(name="horizon_cond",
#                 lower=1, upper=1000, default_value=10)
#         horizon_cond = InCondition(child=horizon, parent=horizon_type, values=["finite"])
#         cs.add_hyperparameters([horizon, horizon_type])
#         cs.add_condition(horizon_cond)
#         return cs
# 
#     def __call__(self, cfg, task, model):
#         if cfg["horizon_type"] == "finite":
#             controller = FiniteHorizonLQR(self.system, task, model, horizon = cfg["horizon"])
#         else:
#             controller = InfiniteHorizonLQR(self.system, task, model)

class InfiniteHorizonLQR(Controller):
    def __init__(self, system, task, model):
        super().__init__(system, task, model)
        A, B = model.to_linear()
        state_dim = model.state_dim
        Q, R, F = task.get_cost().get_cost_matrices()
        Qp = np.zeros((state_dim, state_dim))
        Qp[:Q.shape[0], :Q.shape[1]] = Q
        X, L, K = dare(A, B, Qp, R)
        self.K = K
        self.Qp, self.Rp = Qp, R
        self.model = model

    @property
    def state_dim(self):
        return self.model.state_dim + self.system.ctrl_dim

    @staticmethod
    def is_compatible(system, task, model):
        return (model.is_linear 
                and task.is_cost_quad()
                and not task.are_obs_bounded()
                and not task.are_ctrl_bounded()
                and not task.eq_cons_present()
                and not task.ineq_cons_present())
 
    def traj_to_state(self, traj):
        return np.concatenate([self.model.traj_to_state(traj),
                traj[-1].ctrl])

    def run(self, state, new_obs):
        # Implement control logic here
        modelstate = self.model.update_state(state[:-self.system.ctrl_dim],
                state[-self.system.ctrl_dim:], new_obs)
        u = np.array(self.K @ modelstate).flatten()
        print("state={}".format(state))
        print("u={}".format(u))
        print("state_cost={}".format(modelstate.T @ self.Qp @ modelstate))
        statenew = np.concatenate([modelstate, u])

        return u, statenew

class FiniteHorizonLQR(Controller):
    def __init__(self, system, task, model, horizon):
        super().__init__(system, task, model)
        A, B = model.to_linear()
        N = np.zeros((A.shape[0], B.shape[1]))
        self.horizon = horizon
        state_dim = model.state_dim
        #Q, R, F = task.get_quad_cost()
        Q, R, F = task.get_cost().get_cost_matrices()
        Qp = np.zeros((state_dim, state_dim))
        Qp[:Q.shape[0], :Q.shape[1]] = Q
        Fp = np.zeros((state_dim, state_dim))
        Fp[:F.shape[0], :F.shape[1]] = F
        self.K = _finite_horz_dt_lqr(A, B, Qp, R, N, Fp, horizon)
        self.Qp, self.Rp = Qp, R
        self.model = model
        self.umin = task.get_ctrl_bounds()[:,0]
        self.umax = task.get_ctrl_bounds()[:,1]

    @property
    def state_dim(self):
        return self.model.state_dim + self.system.ctrl_dim

    @staticmethod
    def is_compatible(system, task, model):
        return (model.is_linear 
                and task.is_cost_quad()
                and not task.are_obs_bounded()
                #and not task.are_ctrl_bounded()
                and not task.eq_cons_present()
                and not task.ineq_cons_present())
 
    def traj_to_state(self, traj):
        return np.concatenate([self.model.traj_to_state(traj),
                traj[-1].ctrl])

    def run(self, state, new_obs):
        # Implement control logic here
        modelstate = self.model.update_state(state[:-self.system.ctrl_dim],
                state[-self.system.ctrl_dim:], new_obs)
        x0 = self.task.get_cost().get_goal()
        if x0.size < modelstate.size:
            state0 = np.zeros(modelstate.size)
            state0[:x0.size] = x0
        else:
            state0 = x0
        u = self.K @ (modelstate - state0)
        u = np.minimum(u, self.umax)
        u = np.maximum(u, self.umin)
        #print("state={}".format(state))
        #print("u={}".format(u))
        #print("state_cost={}".format(modelstate.T @ self.Qp @ modelstate))
        statenew = np.concatenate([modelstate, u])

        return u, statenew

[docs]class LQRFactory(ControllerFactory): """ Linear Quadratic Regulator (LQR) is some classical results from linear system theory and optimal control theory. It applies to linear system with quadratic cost function with respect to both state and control. It is proven that the optimal control policy is linear, i.e. :math:`u=-Kx` where :math:`x` is system state, :math:`K` is gain matrix, and :math:`u` is the control. The feedback :math:`K` is computed by solving Ricatti equations. For more details refer to these slides_ . .. _slides: https://katefvision.github.io/katefSlides/RECITATIONtrajectoryoptimization_katef.pdf Hyperparameters: - *finite_horizon* (Type: str, Choices: ["true", "false"], Default: "finite"): Whether horizon is finite or infinite. - *horizon* (Type: int, Low: 1, High: 1000, Default: 10): Length of control horizon. (Conditioned on finite_horizon="true"). """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.Controller = LQR self.name = "LQR" def get_configuration_space(self): cs = ConfigurationSpace() finite_horizon = CategoricalHyperparameter(name="finite_horizon", choices=["true", "false"], default_value="true") horizon = UniformIntegerHyperparameter(name="horizon", lower=1, upper=1000, default_value=10) use_horizon = CSC.InCondition(child=horizon, parent=finite_horizon, values=["true"]) cs.add_hyperparameters([horizon, finite_horizon]) cs.add_condition(use_horizon) return cs
class LQR(Controller): def __init__(self, system, task, model, finite_horizon, horizon=None): super().__init__(system, task, model) if not isinstance(finite_horizon, bool): finite_horizon = True if finite_horizon == "true" else False if finite_horizon: self._controller = FiniteHorizonLQR(system, task, model, horizon) else: self._controller = InfiniteHorizonLQR(system, task, model) @property def state_dim(self): return self._controller.state_dim @staticmethod def is_compatible(system, task, model): return (model.is_linear and task.is_cost_quad() and not task.are_obs_bounded() #and not task.are_ctrl_bounded() and not task.eq_cons_present() and not task.ineq_cons_present()) def traj_to_state(self, traj): return self._controller.traj_to_state(traj) def run(self, state, new_obs): return self._controller.run(state, new_obs)