Source code for autompc.tuning.pipeline_tuner


# Standard library includes
from collections import namedtuple
import pickle

# Internal project includes
from .. import zeros
from ..utils import simulate
from ..evaluation import HoldoutModelEvaluator
from .model_tuner import ModelTuner
from ..sysid import MLPFactory, SINDyFactory, ApproximateGPModelFactory, ARXFactory, KoopmanFactory

# External library includes
import numpy as np
from smac.scenario.scenario import Scenario
from smac.facade.smac_hpo_facade import SMAC4HPO


PipelineTuneResult = namedtuple("PipelineTuneResult", ["inc_cfg", "cfgs", 
    "inc_cfgs", "costs", "inc_costs", "truedyn_costs", "inc_truedyn_costs", 
    "surr_trajs", "truedyn_trajs", "surr_tune_result"])
"""
PipelineTuneREsult contains information about a tuning process.

.. py:attribute:: inc_cfg
    
    The final tuned configuration.

.. py:attribute:: cfgs

    List of configurations. The configuration evaluated at each
    tuning iteration.

.. py:attribute:: inc_cfgs

    The incumbent (best found so far) configuration at each
    tuning iteration.

.. py:attribute:: costs

    The cost observed at each iteration of
    tuning iteration.

.. py:attribute:: inc_costs

    The incumbent score at each tuning iteration.

.. py:attribute:: truedyn_costs

    The true dynamics cost observed at each iteration of
    tuning iteration. None if true dynamics was not provided.

.. py:attribute:: inc_truedyn_costs

    The incumbent true dynamics cost observed at each iteration of
    tuning iteration. None if true dynamics was not provided.

.. py:attribute:: surr_trajs

    The trajectory simulated at each tuning iteration with respect to the
    surrogate model.

.. py:attribute:: truedyn_trajs

    The trajectory simulated at each tuning iteration with respect to the
    true dynamics. None if true dynamics are not provided.

.. py:attribute:: surr_tune_result

    The ModelTuneResult from tuning the surrogate model, for modes "autotune"
    and "autoselect".  None for other tuning modes.

"""

autoselect_factories = [MLPFactory, SINDyFactory, ApproximateGPModelFactory,
        ARXFactory, KoopmanFactory]

[docs]class PipelineTuner: """ This class tunes SysID+MPC pipelines. """
[docs] def __init__(self, surrogate_mode="defaultcfg", surrogate_factory=None, surrogate_split=None, surrogate_cfg=None, surrogate_evaluator=None, surrogate_tune_holdout=0.25, surrogate_tune_metric="rmse"): """ Parameters ---------- surrogate_mode : string Mode for selecting surrogate model. One of the following: "defaultcfg" - use the surrogate factories default configuration, "fixedcfg" - use the surrogate configuration passed by surrogate_cfg, "autotune" - automatically tune hyperparameters of surrogate, "autoselect" - automatically tune and select surrogate model, "pretrain" - Use an already trained surrogate which is passed when the tuning process is run. surrogate_factory : ModelFactory Factory for creating surrogate model. Required for all modes except for autoselect. surrogate_split : float Proportion of data to use for surrogate training. Required for all modes except "pretrain" surrogate_cfg : Configuration Surrogate model config, required for "fixedcfg" mode surrogate_evaluator : ModelEvaluator Evaluator to use for surrogate tuning, used for "autoselect" and "autotune" modes. If not passed, will use HoldoutEvaluator with default arguments. surrogate_tune_holdout : float Proportion of data to hold out for surrogate tuning. Used for "autotune" and "autoselect" modes. surrogate_tune_metric : string Model metric to use for surrogate tuning. Used for "autotune" and "autoselect" modes. See documentation of ModelEvaluator for more details. """ self.surrogate_mode = surrogate_mode self.surrogate_factory = surrogate_factory self.surrogate_split = surrogate_split self.surrogate_cfg = surrogate_cfg self.surrogate_evaluator = surrogate_evaluator self.surrogate_tune_holdout = surrogate_tune_holdout self.surrogate_tune_metric = surrogate_tune_metric
def _get_surrogate(self, pipeline, trajs, rng, surrogate_tune_iters): surrogate_tune_result = None if self.surrogate_mode == "defaultcfg": surrogate_cs = self.surrogate_factory.get_configuration_space() surrogate_cfg = surrogate_cs.get_default_configuration() surrogate = self.surrogate_factory(surrogate_cfg, trajs) elif self.surrogate_mode == "fixedcfg": surrogate = self.surrogate_factory(self.surrogate_cfg, trajs) elif self.surrogate_mode == "autotune": evaluator = self.surrogate_evaluator if evaluator is None: evaluator = HoldoutModelEvaluator( holdout_prop = self.surrogate_tune_holdout, metric = self.surrogate_tune_metric, system=pipeline.system, trajs=trajs, rng=rng) model_tuner = ModelTuner(pipeline.system, evaluator) model_tuner.add_model_factory(self.surrogate_factory) surrogate, surrogate_tune_result = model_tuner.run(rng, n_iters=surrogate_tune_iters) elif self.surrogate_mode == "autoselect": evaluator = self.surrogate_evaluator if evaluator is None: evaluator = HoldoutModelEvaluator( holdout_prop = self.surrogate_tune_holdout, metric = self.surrogate_tune_metric, system=pipeline.system, trajs=trajs, rng=rng) model_tuner = ModelTuner(pipeline.system, evaluator) for factory in autoselect_factories: model_tuner.add_model_factory(factory(pipeline.system)) surrogate, surrogate_tune_result = model_tuner.run(rng, n_iters=surrogate_tune_iters) return surrogate, surrogate_tune_result
[docs] def run(self, pipeline, task, trajs, n_iters, rng, surrogate=None, truedyn=None, surrogate_tune_iters=100, special_debug=False): """ Run tuning. Parameters ---------- pipeline : Pipeline Pipeline to tune. task : Task Task specification to tune for trajs : List of Trajectory Trajectory training set. n_iters : int Number of tuning iterations rng : numpy.random.Generator RNG to use for tuning. surrogate : Model Surrogate model to use for tuning. Used for "pretrain" mode. truedyn : obs, ctrl -> obs True dynamics function. If passed, the true dynamics cost will be evaluated for each iteration in addition to the surrogate cost. However, this information will not be used for tuning. surrogate_tune_iters : int Number of iterations to use for surrogate tuning. Used for "autotune" and "autoselect" modes. Default is 100 Returns ------- controller : Controller Final tuned controller. tune_result : PipelineTuneResult Additional tuning information. """ # Run surrogate training if surrogate is None: surr_size = int(self.surrogate_split * len(trajs)) shuffled_trajs = trajs[:] rng.shuffle(shuffled_trajs) surr_trajs = shuffled_trajs[:surr_size] sysid_trajs = shuffled_trajs[surr_size:] print("Surr Traj Last: ", surr_trajs[-1][-1].obs) print("Sysid Traj Last: ", sysid_trajs[-1][-1].obs) surrogate, surr_tune_result = self._get_surrogate(pipeline, surr_trajs, rng, surrogate_tune_iters) else: sysid_trajs = trajs surr_tune_result = None if special_debug: with open("out/2021-09-26/surrogate.pkl", "wb") as f: pickle.dump(surrogate, f) eval_idx = [0] def eval_cfg(cfg): info = dict() controller, cost, model = pipeline(cfg, task, sysid_trajs) if special_debug: with open("../../out/2021-05-17/con_{}.pkl".format(eval_idx[0]), "wb") as f: pickle.dump(controller, f) eval_idx[0] += 1 print("Simulating Surrogate Trajectory: ") try: controller.reset() if task.has_num_steps(): surr_traj = simulate(controller, task.get_init_obs(), task.term_cond, sim_model=surrogate, max_steps=task.get_num_steps()) else: surr_traj = simulate(controller, task.get_init_obs(), task.term_cond, sim_model=surrogate) cost = task.get_cost() surr_cost = cost(surr_traj) print("Surrogate Cost: ", surr_cost) print("Surrogate Final State: ", surr_traj[-1].obs) info["surr_cost"] = surr_cost info["surr_traj"] = (surr_traj.obs.tolist(), surr_traj.ctrls.tolist()) except np.linalg.LinAlgError: surr_cost = np.inf info["surr_cost"] = surr_cost info["surr_traj"] = None if not truedyn is None: print("Simulating True Dynamics Trajectory") controller, _, _ = pipeline(cfg, task, sysid_trajs, model=model) controller.reset() if task.has_num_steps(): truedyn_traj = simulate(controller, task.get_init_obs(), task.term_cond, dynamics=truedyn, max_steps=task.get_num_steps()) else: truedyn_traj = simulate(controller, task.get_init_obs(), task.term_cond, dynamics=truedyn) truedyn_cost = cost(truedyn_traj) print("True Dynamics Cost: ", truedyn_cost) print("True Dynamics Final State: ", truedyn_traj[-1].obs) info["truedyn_cost"] = truedyn_cost info["truedyn_traj"] = (truedyn_traj.obs.tolist(), truedyn_traj.ctrls.tolist()) return surr_cost, info smac_rng = np.random.RandomState(seed=rng.integers(1 << 31)) scenario = Scenario({"run_obj" : "quality", "runcount-limit" : n_iters, "cs" : pipeline.get_configuration_space(), "deterministic" : "true", "limit_resources" : False }) smac = SMAC4HPO(scenario=scenario, rng=smac_rng, tae_runner=eval_cfg) inc_cfg = smac.optimize() cfgs, inc_cfgs, costs, inc_costs, truedyn_costs, inc_truedyn_costs, surr_trajs,\ truedyn_trajs = [], [], [], [], [], [], [], [] inc_cost = float("inf") for key, val in smac.runhistory.data.items(): cfg = smac.runhistory.ids_config[key.config_id] if val.cost < inc_cost: inc_cost = val.cost if "truedyn_cost" in val.additional_info: inc_truedyn_cost = val.additional_info["truedyn_cost"] inc_cfg = cfg inc_costs.append(inc_cost) inc_cfgs.append(inc_cfg) cfgs.append(cfg) costs.append(val.cost) if val.additional_info["surr_traj"] is not None: surr_obs, surr_ctrls = val.additional_info["surr_traj"] surr_traj = zeros(pipeline.system, len(surr_obs)) surr_traj.obs[:] = surr_obs surr_traj.ctrls[:] = surr_ctrls surr_trajs.append(surr_traj) else: surr_trajs.append(None) if "truedyn_cost" in val.additional_info: inc_truedyn_costs.append(inc_truedyn_cost) truedyn_costs.append(val.additional_info["truedyn_cost"]) truedyn_obs, truedyn_ctrls = val.additional_info["truedyn_traj"] truedyn_traj = zeros(pipeline.system, len(truedyn_obs)) truedyn_traj.obs[:] = truedyn_obs truedyn_traj.ctrls[:] = truedyn_ctrls truedyn_trajs.append(truedyn_traj) tune_result = PipelineTuneResult(inc_cfg = inc_cfg, cfgs = cfgs, inc_cfgs = inc_cfgs, costs = costs, inc_costs = inc_costs, truedyn_costs = truedyn_costs, inc_truedyn_costs = inc_truedyn_costs, surr_trajs = surr_trajs, truedyn_trajs = truedyn_trajs, surr_tune_result = surr_tune_result) # Generate final model and controller controller, cost, model = pipeline(inc_cfg, task, sysid_trajs) return controller, tune_result