# Created by William Edwards (wre2@illinois.edu), 2021-01-25
# Standard library includes
import copy
from pdb import set_trace
# Internal library includes
from .utils.cs_utils import *
from .sysid.model import ModelFactory, Model
from .control.controller import Controller, ControllerFactory
from .costs.cost import Cost
from .costs.cost_factory import CostFactory
# External library includes
import numpy as np
import ConfigSpace as CS
import ConfigSpace.hyperparameters as CSH
import ConfigSpace.conditions as CSC
[docs]class Pipeline:
"""
The Pipeline class represents a collection of MPC components, including
the model, controller, and cost function. A Pipeline can provide
the joint configuration space over its constituent components, and
can instantiate an MPC given a configuration.
"""
[docs] def __init__(self, system, *components):
"""
Parameters
----------
system : System
Corresponding robot system
components : List of models, controllers, costs and corresponding factories.
The set of components which make up the pipeline: the model, the controller,
and the cost. For each of these components, you can either pass the factory
or the instantiated version. For example, you must pass either a Controller
or a ControllerFactory, but not both. If the factory is passed, than its
hyperparameters will become part of the joint configuration space. If the
instantiated version is passed, the component will be treated as fixed in
the pipeline.
"""
self.system = system
self.model = None
self.model_factory = None
self.controller = None
self.controller_factory = None
self.cost = None
self.cost_factory = None
for component in components:
if isinstance(component, Model):
if self.model or self.model_factory:
raise ValueError("Pipeline cannot contain multiple models or "
+ "model factories.")
self.model = component
if isinstance(component, ModelFactory):
if self.model or self.model_factory:
raise ValueError("Pipeline cannot contain multiple models or "
+ "model factories.")
self.model_factory = component
if isinstance(component, Controller):
if self.controller or self.controller_factory:
raise ValueError("Pipeline cannot contain multple controllers or "
+ "controller factories.")
self.controller = component
if isinstance(component, ControllerFactory):
if self.controller or self.controller_factory:
raise ValueError("Pipeline cannot contain multple controllers or "
+ "controller factories.")
self.controller_factory = component
if isinstance(component, Cost):
if self.cost or self.cost_factory:
raise ValueError("Pipeline cannot contain multple costs or "
+ "cost factories.")
self.cost = component
if isinstance(component, CostFactory):
if self.cost or self.cost_factory:
raise ValueError("Pipeline cannot contain multple costs or "
+ "cost factories.")
self.cost_factory = component
if not (self.model or self.model_factory):
raise ValueError("Pipeline must contain model or model factory")
if not (self.controller or self.controller_factory):
raise ValueError("Pipeline must contain controller or controller factory")
if not (self.cost or self.cost_factory):
raise ValueError("Pipeline must contain cost or cost factory")
[docs] def get_configuration_space(self):
"""
Return the pipeline configuration space.
"""
cs = CS.ConfigurationSpace()
if self.model_factory:
model_cs = self.model_factory.get_configuration_space()
add_configuration_space(cs, "_model", model_cs)
if self.controller_factory:
controller_cs = self.controller_factory.get_configuration_space()
add_configuration_space(cs, "_ctrlr", controller_cs)
if self.cost_factory:
cost_factory_cs = self.cost_factory.get_configuration_space()
add_configuration_space(cs, "_cost", cost_factory_cs)
return cs
[docs] def __call__(self, cfg, task, trajs, model=None):
"""
Instantiate the MPC.
Parameters
----------
cfg : Configuration
Configuration from the joint pipeline ConfigurationSpace
task : Task
Task which the MPC will solve
trajs : List of Trajectory
System ID training set
model : Model
A pre-trained model can be passed here which overrides
the configuration. Default is None.
Returns
-------
controller : Controller
The MPC controller
task : Task
The task with the instantiated cost
model : Model
The instantiated and trained model
"""
# First instantiate and train the model
if not model:
if self.model:
model = self.model
else:
model_cs = self.model_factory.get_configuration_space()
model_cfg = model_cs.get_default_configuration()
set_subspace_configuration(cfg, "_model", model_cfg)
model = self.model_factory(model_cfg, trajs)
# Then create the objective function
if self.cost:
cost = self.cost
else:
cost_cs = self.cost_factory.get_configuration_space()
cost_cfg = cost_cs.get_default_configuration()
set_subspace_configuration(cfg, "_cost", cost_cfg)
cost = self.cost_factory(cost_cfg, task, trajs)
new_task = copy.deepcopy(task)
new_task.set_cost(cost)
# Then initialize the controller
if self.controller:
controller = self.controller
else:
controller_cs = self.controller_factory.get_configuration_space()
controller_cfg = controller_cs.get_default_configuration()
set_subspace_configuration(cfg, "_ctrlr", controller_cfg)
controller = self.controller_factory(controller_cfg, new_task, model)
return controller, new_task, model