Source code for pyinduct.simulation

"""
This module consist of three parts.

    Simulation:
        Simulation infrastructure with helpers and data structures for preprocessing of the given equations
        and functions for postprocessing of simulation data.

    `Control`_:
        All classes and functions related to the creation of controllers as well as the implementation
        for simulation purposes.

    `Observer`_:
        Some objects for observer implementation which are mostly a combination from the objects for
        simulation and control tasks.
"""

import warnings
from abc import ABCMeta, abstractmethod
from itertools import chain

import numpy as np
from scipy.integrate import ode
from scipy.interpolate import interp1d
from scipy.linalg import block_diag
import collections

from pyinduct.core import domain_intersection, TransformationInfo, get_weight_transformation

from .core import (Function, integrate_function, calculate_scalar_product_matrix,
                   project_on_base, dot_product_l2)
from .placeholder import Scalars, TestFunction, Input, FieldVariable, EquationTerm, get_common_target
from .registry import get_base
from .visualization import EvalData

"""
Simulation section
"""


[docs]class Domain(object): """ Helper class that manages ranges for data evaluation, containing parameters. Args: bounds (tuple): Interval bounds. num (int): Number of points in interval. step (numbers.Number): Distance between points (if homogeneous). points (array_like): Points themselves. Note: If num and step are given, num will take precedence. """ def __init__(self, bounds=None, num=None, step=None, points=None): if points is not None: # points are given, easy one self._values = np.atleast_1d(points) self._limits = (points.min(), points.max()) self._num = points.size # TODO check for evenly spaced entries # for now just use provided information self._step = step elif bounds and num: self._limits = bounds self._num = num self._values, self._step = np.linspace(bounds[0], bounds[1], num, retstep=True) if step is not None and not np.isclose(self._step, step): raise ValueError("could not satisfy both redundant requirements for num and step!") elif bounds and step: self._limits = bounds # calculate number of needed points but save correct step size self._num = int((bounds[1] - bounds[0]) / step + 1.5) self._values, self._step = np.linspace(bounds[0], bounds[1], self._num, retstep=True) if np.abs(step - self._step) > 1e-1: warnings.warn("desired step-size {} doesn't fit to given interval," " changing to {}".format(step, self._step)) else: raise ValueError("not enough arguments provided!") def __len__(self): return len(self._values) def __getitem__(self, item): return self._values[item] @property def step(self): return self._step @property def bounds(self): return self._limits
[docs]class SimulationInput(object, metaclass=ABCMeta): """ Base class for all objects that want to act as an input for the time-step simulation. The calculated values for each time-step are stored in internal memory and can be accessed by :py:func:`get_results` (after the simulation is finished). """ def __init__(self, name=""): self._time_storage = [] self._value_storage = {} self.name = name def __call__(self, **kwargs): """ handle that is used by the simulator to retrieve input. """ out = self._calc_output(**kwargs) self._time_storage.append(kwargs["time"]) for key, value in out.items(): entries = self._value_storage.get(key, []) entries.append(value) self._value_storage[key] = entries return out["output"] @abstractmethod def _calc_output(self, **kwargs): """ Handle that has to be implemented for output calculation. Keyword Args: time: The current simulation time. weights: The current weight vector. weight_lbl: The label of the weights used. Returns: dict: Dictionary with mandatory key ``output``. """ return dict(output=0)
[docs] def get_results(self, time_steps, result_key="output", interpolation="nearest", as_eval_data=False): """ Return results from internal storage for given time steps. Raises: Error: If calling this method before a simulation was running. Args: time_steps: Time points where values are demanded. result_key: Type of values to be returned. interpolation: Interpolation method to use if demanded time-steps are not covered by the storage, see :func:`scipy.interpolate.interp1d` for all possibilities. as_eval_data (bool): Return results as :py:class:`pyinduct.visualization.EvalData` object for straightforward display. Return: Corresponding function values to the given time steps. """ func = interp1d(np.array(self._time_storage), np.array(self._value_storage[result_key]), kind=interpolation, assume_sorted=True, axis=0) values = func(time_steps) if as_eval_data: return EvalData([time_steps], values, name=".".join([self.name, result_key])) return values
class EmptyInput(SimulationInput): def __init__(self, dim): SimulationInput.__init__(self) self.dim = dim def _calc_output(self, **kwargs): return dict(output=np.zeros((len(np.atleast_1d(kwargs['time'])), self.dim)))
[docs]class SimulationInputSum(SimulationInput): """ Helper that represents a signal mixer. """ def __init__(self, inputs): SimulationInput.__init__(self) self.inputs = inputs def _calc_output(self, **kwargs): outs = np.array([handle(**kwargs) for handle in self.inputs]) return dict(output=np.sum(outs, axis=0))
[docs]class SimulationInputVector(SimulationInput): """ Class that represent the input vector :math:`\\boldsymbol{u}\\in\\mathbb R^n` from a state space system (:py:class:`StateSpace`) like .. math:: \\boldsymbol{\\dot{x}}(t) &= \\boldsymbol{A}\\boldsymbol{x}(t) + \\boldsymbol{B}\\boldsymbol{u}(t) \\\\ \\boldsymbol{y}(t) &= \\boldsymbol{C}\\boldsymbol{x}(t) + \\boldsymbol{D}\\boldsymbol{u}(t). Args: input_vector (list): List which holds (in sum) :math:`n` :py:class:`SimulationInput` and/or :py:class:`ObserverError` instances. """ def __init__(self, input_vector): SimulationInput.__init__(self) if not all([isinstance(input, SimulationInput) and not isinstance(input, SimulationInputVector) for input in input_vector]): raise TypeError("A SimulationInputVector can only hold SimulationInputs's and can not nest.") self.input_vector = input_vector self.len = len(input_vector) self.indices = np.arange(self.len) self._sys_inputs = collections.OrderedDict() self._obs_errors = collections.OrderedDict() for index in self.indices: if isinstance(input_vector[index], ObserverError): self._obs_errors[index] = input_vector[index] else: self._sys_inputs[index] = input_vector[index] def _calc_output(self, **kwargs): output = list() if "obs_weights" in kwargs.keys(): for obs_err in self._obs_errors.values(): output.append(obs_err(**kwargs)) else: for sys_input in self._sys_inputs.values(): output.append(sys_input(**kwargs)) return dict(output=np.matrix(np.vstack(output)))
[docs]class WeakFormulation(object): """ This class represents the weak formulation of a spatial problem. It can be initialized with several terms (see children of :py:class:`pyinduct.placeholder.EquationTerm`). The equation is interpreted as .. math:: term_0 + term_1 + ... + term_N = 0. Args: terms (list): List of object(s) of type EquationTerm. dynamic_weights (str): Only for multi-pde systems. Weights (weight label) which occur temporal derived. It is only one kind of weight labels (one pde) allowed in the weak formulation if :code:`dynamic_weights` will not provided. """ def __init__(self, terms, dynamic_weights=None, name=None): if isinstance(terms, EquationTerm): terms = [terms] if not isinstance(terms, list): raise TypeError("only (list of) {0} allowed".format(EquationTerm)) for term in terms: if not isinstance(term, EquationTerm): raise TypeError("Only EquationTerm(s) are accepted.") self.terms = terms self.dynamic_weights = dynamic_weights self.name = name
[docs]class StateSpace(object): """ Standard state space implementation for a dynamic system with .. math:: \\boldsymbol{\\dot{x}}(t) &= \\boldsymbol{A}\\boldsymbol{x}(t) + \\boldsymbol{B}u(t) \\\\ \\boldsymbol{y}(t) &= \\boldsymbol{C}\\boldsymbol{x}(t) + \\boldsymbol{D}u(t). The corresponding infinite dimensional system has been approximated by a base given by weight_label. Args: weight_label: Label that has been used for approximation. a_matrices: :math:`\\boldsymbol{A_p}, \\dotsc, \\boldsymbol{A_0},` b_matrices: :math:`\\boldsymbol{B_q}, \\dotsc, \\boldsymbol{B_0},` input_handle: :math:`u(t)` f_vector: c_matrix: :math:`\\boldsymbol{C}` d_matrix: :math:`\\boldsymbol{D}` family_tree (collections.OrderedDict or NoneType): Ordered dictionary which hold informations about nesting of the original weights. Is None if the state space derived from only one weak formulation. """ def __init__(self, weight_label, a_matrices, b_matrices, input_handle=None, f_vector=None, c_matrix=None, d_matrix=None, family_tree=None): self.weight_lbl = weight_label self._weight_num = None self.f = f_vector self.C = c_matrix self.D = d_matrix # mandatory if isinstance(a_matrices, np.ndarray): self.A = {1: a_matrices} else: self.A = a_matrices # optional # TODO change order: 1 to order that is guaranteed to be in. if isinstance(b_matrices, np.ndarray): self.B = {1: np.atleast_2d(b_matrices)} else: self.B = b_matrices if self.B is None: self.B = {1: np.zeros((self.A[1].shape[0], 1))} if self.f is None: self.f = np.zeros((self.A[1].shape[0],)) if self.C is None: self.C = np.zeros((1, self.A[1].shape[1])) if self.D is None: self.D = np.zeros((1, np.atleast_2d(self.B[1]).T.shape[1])) if input_handle is None: self.input = EmptyInput(self.B[1].shape[1]) else: self.input = input_handle if isinstance(self.input, SimulationInputVector): pass # TODO: replace this check # if not all([bi.shape[1] == self._input_function.num for bi in self.B.values()]): # raise ValueError("Input vector has more elements than (at least) one of the B matrices has rows.") elif isinstance(self.input, SimulationInput): pass # TODO: uncomment this check when StateSpace input derivative transformation is ready to go # if not all([1 in np.atleast_2d(bi).shape for bi in self.B.values()]): # raise ValueError("All B matrices must be column vectors.") elif not callable(self.input): raise TypeError("Input must be callable!") self.family_tree = family_tree def rhs_hint(self, _t, _q, _u, ss): q_t = ss.f for p, a_mat in ss.A.items(): # np.add(q_t, np.dot(a_mat, np.power(_q, p))) q_t = q_t + np.dot(a_mat, np.power(_q, p)) for p, b_mat in ss.B.items(): q_t = q_t + np.dot(b_mat, np.power(_u, p)).flatten() return np.asarray(q_t)
# TODO update signature
[docs]def simulate_systems(weak_forms, initial_states, time_interval, time_step, spatial_interval, spatial_step): """ Convenience wrapper for simulate system, see :py:func:`simulate_system` for parameters. Args: weak_forms (:py:class:`WeakFormulation`): initial_states: time_interval: time_step: spatial_interval: spatial_step: """ return [simulate_system(sys, initial_states, time_interval, time_step, spatial_interval, spatial_step) for sys in weak_forms]
[docs]def simulate_system(weak_form, initial_states, temporal_domain, spatial_domain, settings=None, der_orders=(0, 0)): """ Convenience wrapper that encapsulates the whole simulation process. Args: weak_form (:py:class:`WeakFormulation`): Weak formulation of the system to simulate. initial_states (callable or numpy.ndarray): Array of core.Functions for :math:`x(t=0, z), \\dot{x}(t=0, z), \\dotsc, x^{(n)}(t=0, z)`. temporal_domain (:py:class:`Domain`): Domain object holding information for time evaluation. spatial_domain (:py:class:`Domain`): Domain object holding information for spatial evaluation. der_orders (tuple): Tuple of derivative orders (time, spat) that shall be evaluated additionally. settings: Integrator settings, see :py:func:`simulate_state_space`. Return: list: List of :py:class:`pyinduct.visualization.EvalData` objects,holding the results for the FieldVariable and asked derivatives. """ print(("simulating system: {0}".format(weak_form.name))) if not isinstance(weak_form, WeakFormulation): raise TypeError("only WeakFormulation accepted.") initial_states = np.atleast_1d(initial_states) if not isinstance(initial_states[0], Function): raise TypeError("only core.Function accepted as initial state") if not isinstance(temporal_domain, Domain) or not isinstance(spatial_domain, Domain): raise TypeError("domains must be given as Domain object") # parse input and create state space system print(">>> parsing formulation") canonical_form = parse_weak_formulation(weak_form) print(">>> creating state space system") state_space_form = canonical_form.convert_to_state_space() # calculate initial state print(">>> deriving initial conditions") q0 = np.array([project_on_base(initial_state, get_base( canonical_form.weights, 0)) for initial_state in initial_states]).flatten() # simulate print(">>> performing time step integration") sim_domain, q = simulate_state_space(state_space_form, q0, temporal_domain, settings=settings) # evaluate print(">>> performing postprocessing") temporal_order = min(initial_states.size - 1, der_orders[0]) data = process_sim_data(canonical_form.weights, q, sim_domain, spatial_domain, temporal_order, der_orders[1], name=canonical_form.name) print("finished simulation.") return data
[docs]def process_sim_data(weight_lbl, q, temp_domain, spat_domain, temp_order, spat_order, name=""): """ Create handles and evaluate at given points. Args: weight_lbl (str): Label of Basis for reconstruction. temp_order: Order or temporal derivatives to evaluate additionally. spat_order: Order or spatial derivatives to evaluate additionally. q: weights spat_domain (:py:class:`Domain`): Domain object providing values for spatial evaluation. temp_domain (:py:class:`Domain`): Timesteps on which rows of q are given. name (str): Name of the WeakForm, used to generate the dataset. """ data = [] # temporal ini_funcs = get_base(weight_lbl, 0) for der_idx in range(temp_order + 1): name = "{0}{1}".format(name, "_" + "".join(["d" for x in range(der_idx)] + ["t"]) if der_idx > 0 else "") data.append(evaluate_approximation(weight_lbl, q[:, der_idx * ini_funcs.size:(der_idx + 1) * ini_funcs.size], temp_domain, spat_domain, name=name)) # spatial (0th derivative is skipped since this is already handled above) for der_idx in range(1, spat_order + 1): name = "{0}{1}".format(name, "_" + "".join(["d" for x in range(der_idx)] + ["z"]) if der_idx > 0 else "") data.append( evaluate_approximation(weight_lbl, q[:, :ini_funcs.size], temp_domain, spat_domain, der_idx, name=name)) return data
[docs]class CanonicalForm(object): """ The canonical form of an ordinary differential equation system of order n. """ def __init__(self, name=None): self.name = name self._matrices = {} self._max_order = dict(E=None, G=None) self._max_exponent = dict(E=None, G=None) self._weights = None self._input_function = None self._inverse_en_hash = None self._en_hash = None @staticmethod def _build_name(term): return "_" + term[0] + str(term[1]) @property def input_function(self): return self._input_function @input_function.setter def input_function(self, func): if not (func is None or isinstance(func, (SimulationInput))): raise TypeError("Input function must be a instance from SimulationInput.") if self._input_function is None: self._input_function = func if self._input_function != func: raise ValueError("Already defined input is overridden!") @property def weights(self): return self._weights @weights.setter def weights(self, weight_lbl): if not isinstance(weight_lbl, str) and not weight_lbl is None: raise TypeError("Only string allowed as weight label!") if self._weights is None: self._weights = weight_lbl if weight_lbl is not None: self._len_weights = len(get_base(self._weights, 0)) if self._weights != weight_lbl: raise ValueError("Already defined target weights are overridden!") @property def inverse_e_n(self): if self._max_order["E"] is None: warnings.warn("There is no E matrix in this canonical form.") return None else: en = self._matrices["E"][self._max_order["E"]][self._max_exponent["E"]] if self._en_hash is None or not np.allclose(en, self._en_hash): self._en_hash = en if en.shape[0] != en.shape[1]: raise warnings.warn("CanonicalForm holds rectangle matrix. Request for inverse unintended?") self._inverse_en_hash = en.I return self._inverse_en_hash else: return self._inverse_en_hash
[docs] def add_to(self, term, value, column=None): """ Adds the value :py:obj:`value` to term :py:obj:`term`. :py:obj:`term` is a dict that describes which coefficient matrix of the canonical form the value shall be added to. Args: term (dict): Targeted term in the canonical form h. It has to contain: - name: Type of the coefficient matrix: 'E', 'f', or 'G'. - order: Temporal derivative order of the assigned weights. - exponent: Exponent of the assigned weights. value (:py:obj:`numpy.ndarray`): Value to add. It is converted in numpy.matrix if it is not already a numpy.matrix instance. column (int): Add the value only to one column of term (useful if only one dimension of term is known). """ if not isinstance(value, np.ndarray): raise TypeError("Argument val must be numpy.ndarray.") elif not isinstance(value, np.matrix): value = np.matrix(value) if column and not isinstance(column, int): raise TypeError("Argument column (index) must be int.") # get entry if term["name"] == "f": if "order" in term or "exponent" in term: warnings.warn("Values to the keys order and exponent are ignored for f_vector!") f_vector = self._matrices.get("f", np.zeros_like(value)) self._matrices["f"] = value + f_vector return type_group = self._matrices.get(term["name"], {}) derivative_group = type_group.get(term["order"], {}) target_matrix = derivative_group.get(term["exponent"], np.zeros_like(value)) if target_matrix.shape != value.shape and column is None: raise ValueError("{0}{1}{2} was already initialized with dimensions {3} but value to add has " "dimension {4}".format(term["name"], term["order"], term["exponent"], target_matrix.shape, value.shape)) if column is not None: # check whether the dimensions fit or if the matrix has to be extended if column >= target_matrix.shape[1]: new_target_matrix = np.zeros((target_matrix.shape[0], column + 1)) new_target_matrix[:target_matrix.shape[0], :target_matrix.shape[1]] = target_matrix target_matrix = new_target_matrix target_matrix[:, column:column + 1] += value else: target_matrix += value # store changes derivative_group[term["exponent"]] = target_matrix type_group[term["order"]] = derivative_group self._matrices[term["name"]] = type_group # store greatest temporal derivative orders and exponents for "E" and "G" matrices if term["name"] in ("E", "G"): if self._max_order[term["name"]] is None or term["order"] > self._max_order[term["name"]]: self._max_order[term["name"]] = term["order"] if self._max_exponent[term["name"]] is None or term["exponent"] > self._max_exponent[term["name"]]: self._max_exponent[term["name"]] = term["exponent"]
[docs] def get_terms(self): """ Return all coefficient matrices of the canonical formulation. Return: Cascade of dictionaries: Structure: Type > Order > Exponent. """ return self._matrices
[docs] def convert_to_state_space(self): """ Convert the canonical ode system of order n a into an ode system of order 1. This will only work if the highest derivative of the given FieldVariable can be isolated! Return: :py:class:`StateSpace` object: """ if "f" in self._matrices: # TODO add functionality to StateSpace and allow f raise NotImplementedError # system matrices A_* # check whether the system can be formulated in an explicit form max_order = self._max_order["E"] if len(self._matrices["E"][max_order]) > 1: # more than one power of the highest derivative -> implicit formulation raise NotImplementedError pb = next(iter(self._matrices["E"][max_order])) if pb != 1: # TODO raise the resulting last blocks to 1/pb raise NotImplementedError e_n_pb_inv = self.inverse_e_n dim_x = e_n_pb_inv.shape[0] # length of the weight vector dim_xb = max_order * dim_x # dimension of the new system # get highest power # max_power = max(list(chain.from_iterable([list(mat) for mat in self._matrices["E"].values()]))) powers = set(chain.from_iterable([list(mat) for mat in self._matrices["E"].values()])) # system matrices A_* a_matrices = {} # for p in range(max_power, 0, -1): for p in powers: a_mat = np.zeros((dim_xb, dim_xb)) # add integrator chain a_mat[:-dim_x:, dim_x:] = block_diag(*[np.eye(dim_x) for a in range(max_order - 1)]) # add "block-line" with feedback entries a_mat[-dim_x:, :] = -self._build_feedback("E", p, e_n_pb_inv) a_matrices.update({p: a_mat}) # input matrices B_* if "G" in self._matrices: max_input_order = max(iter(self._matrices["G"])) # max_input_power = max(list(chain.from_iterable([list(mat) for mat in self._matrices["G"].values()]))) input_powers = set(chain.from_iterable([list(mat) for mat in self._matrices["G"].values()])) dim_u = next(iter(self._matrices["G"][max_input_order].values())).shape[1] dim_ub = (max_input_order + 1) * dim_u # dimension of the new systems input b_matrices = {} for q in input_powers: b_mat = np.zeros((dim_xb, dim_ub)) # overwrite the last "block-line" in the matrices with input entries b_mat[-dim_x:, :] = -self._build_feedback("G", q, e_n_pb_inv) b_matrices.update({q: b_mat}) else: b_matrices = None # the f vector f_mat = np.zeros((dim_xb,)) if "f" in self._matrices: f_mat[-dim_x:] = self._matrices["f"] ss = StateSpace(self.weights, a_matrices, b_matrices, input_handle=self.input_function) return ss
def _build_feedback(self, entry, power, product_mat): max_order = max(sorted(self._matrices[entry])) entry_shape = next(iter(self._matrices[entry][max_order].values())).shape if entry == "G": # include highest order for system input max_order += 1 blocks = (np.dot(product_mat, self._matrices[entry].get(order, {}).get(power, np.zeros(entry_shape))) for order in range(max_order)) return np.hstack(blocks)
[docs]class CanonicalForms(object): """ Wrapper that holds several entities of canonical forms for different sets of weights. """ def __init__(self, dynamic_weight_label): self.dynamic_form = CanonicalForm() self.dynamic_form.weights = dynamic_weight_label self.static_forms = dict() self._len_weights = None @property def len_weights(self): return self._len_weights @len_weights.setter def len_weights(self, length): if self._len_weights is None: self._len_weights = length else: if length != self._len_weights: raise ValueError("It is not allowed to change the length of the weights vector.")
[docs] def add_to(self, weight_label, term, val, column=None): """ Add val to the canonical form for weight_label, see :py:func:`CanonicalForm.add_to` for further information. Args: weight_label (str): Basis to add onto. term: Coefficient to add onto, see :py:func:`CanonicalForm.add_to`. val: Values to add. """ try: self.len_weights = val.shape[0] except ValueError: raise ValueError("All matrices in the static forms must have the same row width as the matrices in " "the dynamic form. Also all matrices in the dynamic form must have the same row " "width. Maybe you projected different terms of one pde with different test functions. " "Check your weak formulation!") if weight_label == self.dynamic_form.weights or self.dynamic_form.weights is None or weight_label is None: self.dynamic_form.add_to(term, val, column=column) else: if weight_label not in list(self.static_forms.keys()): self.static_forms[weight_label] = CanonicalForm(weight_label) elif not isinstance(weight_label, str): raise TypeError("Argument weight_label must provided as string.") self.static_forms[weight_label].add_to(term, val, column=column)
[docs] def get_dynamic_terms(self): """ Return: dict: Terms of the dynamic :py:class:`CanonicalForm`. """ return self.dynamic_form.get_terms()
[docs] def get_static_terms(self): """ Return: dict: Dictionary of terms for each static :py:class:`CanonicalForm`. """ return {label: val.get_terms() for label, val in self.static_forms.items()}
[docs]def convert_cfs_to_state_space(list_of_cfs): """ Create :py:class:`StateSpace` from list :code:`list_of_cfs` with elements from type :py:class:`CanonicalForms`. In the common case the :math:`N` list elements are derived from :math:`N` :py:class:`WeakFormulation`s which represent :math:`N` coupled pde's with boundary conditions. Args: list_of_cfs (list): List with elements from type :py:class:`CanonicalForms`. Returns: :py:class:`StateSpace`: State space approximation for the time dynamic of (basically) coupled pde's. """ value_error_string = "Problem formulation meets not the specification. \n\n" odict_info = collections.OrderedDict() # for label, order in [(cf.dynamic_form.weights, cf.dynamic_form._max_order["E"]) for cf in list_of_cfs]: for cfs in list_of_cfs: label = cfs.dynamic_form.weights order = cfs.dynamic_form._max_order["E"] if order is None or order <= 0: raise TypeError(value_error_string + "The dynamic_form of an CanonicalForms object must hold " "temporal derived weights.") if label in odict_info.keys(): raise ValueError("There are at least two CanonicalForms objects with the same dynamic weight label.") odict_info[label] = dict() odict_info[label]["max_order"] = order odict_info[label]["weights_length"] = cfs.len_weights odict_info[label]["state_space"] = cfs.dynamic_form.convert_to_state_space() odict_info[label]["stat_weights"] = set(cfs.static_forms.keys()) odict_info[label]["cfs"] = cfs slice_begin = 0 if len(odict_info) == 1 else list(odict_info.values())[-2]["slice_indices"][1] odict_info[label]["slice_indices"] = (slice_begin, slice_begin + cfs.len_weights) input_function_set = set( [cfs.dynamic_form.input_function for cfs in list_of_cfs if not cfs.dynamic_form.input_function is None] ) if len(input_function_set) > 1: raise ValueError("All given CanonicalForms.dynamic_form's must hold the same input function (or None).") elif len(input_function_set) == 1: input_function = input_function_set.pop() else: input_function = None for cfs in list_of_cfs: cfs_to_check = [cfs.dynamic_form] + list(cfs.static_forms.values()) list_of_powers = np.array([list(cf._max_exponent.values()) for cf in cfs_to_check]).flatten().astype(float) if any([power > 1 for power in list_of_powers]): raise NotImplementedError("Exponents greater 1 not implemented yet.") if not all([cf.input_function == None for cf in cfs.static_forms.values()]): raise ValueError("Input functions in static forms not allowed.") if any(["f" in cf._matrices.keys() for cf in cfs_to_check]): raise NotImplementedError("No matrix \"f\" allowed (for now).") if not cfs.dynamic_form._max_order["G"] is None and cfs.dynamic_form._max_order["G"] > 1: raise NotImplementedError("For now, only order 1 for input matrix \"G\" supported.") # check for valid problem formulation for dyn_label in odict_info.keys(): for cfs in list_of_cfs: if dyn_label in cfs.static_forms.keys(): if cfs.static_forms[dyn_label]._max_order["E"] >= odict_info[dyn_label]["max_order"]: raise ValueError(value_error_string + "For a specific weight_label, the temporal order of the static_form" "can not be greater or equal as that from the corresponding dynamic form.") list_of_labels = list(odict_info.keys()) exponent = 1 # hard coded for now for a_lbl in list_of_labels: row_dict = dict() a_block = odict_info[a_lbl]["state_space"].A[exponent] row_dict[list_of_labels.index(a_lbl)] = a_block list_without_a_lbl = list(odict_info.keys()) list_without_a_lbl.remove(a_lbl) for h_lbl in list_without_a_lbl: if h_lbl in odict_info[a_lbl]["stat_weights"]: for ord in range(odict_info[h_lbl]["max_order"]): if ord in odict_info[a_lbl]["cfs"].static_forms[h_lbl]._matrices["E"].keys(): h_fraction = - odict_info[a_lbl]["cfs"].dynamic_form.inverse_e_n * \ odict_info[a_lbl]["cfs"].static_forms[h_lbl]._matrices["E"][ord][exponent] else: h_fraction = np.matrix(np.zeros((odict_info[a_lbl]["weights_length"], odict_info[h_lbl]["weights_length"]))) if ord > 0: h_block = np.hstack((h_block, h_fraction)) else: h_block = h_fraction if odict_info[a_lbl]["max_order"] > 1: h_block = np.vstack( (np.zeros((odict_info[a_lbl]["weights_length"] * (odict_info[a_lbl]["max_order"] - 1), odict_info[h_lbl]["weights_length"] * odict_info[h_lbl]["max_order"])), h_block)) else: h_block = np.vstack((np.zeros((odict_info[a_lbl]["weights_length"] * odict_info[a_lbl]["max_order"], odict_info[h_lbl]["weights_length"] * odict_info[h_lbl]["max_order"])))) row_dict[list_of_labels.index(h_lbl)] = h_block row_odict = collections.OrderedDict(sorted(row_dict.items(), key=lambda t: t[0])) row = np.hstack(tuple(row_odict.values())) if list_of_labels[0] == a_lbl: a_matrix = row else: a_matrix = np.vstack((a_matrix, row)) a_matrix = np.matrix(a_matrix) dim_x = np.sum([value["weights_length"] * value["max_order"] for value in odict_info.values()]) if not a_matrix.shape == (dim_x, dim_x): raise ValueError("Check algorithm.") if isinstance(input_function, SimulationInputVector): dim_u = input_function.len elif isinstance(input_function, SimulationInput): dim_u = 1 else: dim_u = None b_matrix = None if not dim_u is None: for b_lbl in list_of_labels: b_fraction = odict_info[b_lbl]["state_space"].B[exponent] if odict_info[b_lbl]["max_order"] == 1: b_block = b_fraction else: b_block = np.vstack( (np.matrix(np.zeros((odict_info[b_lbl]["weights_length"] * (odict_info[b_lbl]["max_order"] - 1)))), b_fraction)) if list_of_labels[0] == b_lbl: b_matrix = b_block else: b_matrix = np.vstack((b_matrix, b_block)) if not b_matrix.shape == (dim_x, dim_u): raise ValueError("Check algorithm.") b_matrix = np.matrix(b_matrix) new_weight_lbl = str() for lbl in list_of_labels: new_weight_lbl += lbl return StateSpace(new_weight_lbl, a_matrix, b_matrix, input_handle=input_function, family_tree=odict_info)
[docs]def parse_weak_formulation(weak_form): """ Creates an ode system for the weights x_i based on the weak formulation. Args: weak_form (:py:class:`WeakFormulation`): Weak formulation of the pde. Return: :py:class:`CanonicalForm` or :py:class:`CanonicalForms`: n'th-order ode system. If the desired canonical form should be from type :py:class:`CanonicalForms` (as a part of multi-pde system) then the label of the dynamic weights must be defined in :code:`weak_form.dynamic_weights` otherwise you get a :py:class:`CanonicalForm` object. """ if not isinstance(weak_form, WeakFormulation): raise TypeError("Only able to parse WeakFormulation.") cfs = CanonicalForms(weak_form.dynamic_weights) # handle each term for term in weak_form.terms: # extract Placeholders placeholders = dict(scalars=term.arg.get_arg_by_class(Scalars), functions=term.arg.get_arg_by_class(TestFunction), field_variables=term.arg.get_arg_by_class(FieldVariable), inputs=term.arg.get_arg_by_class(Input)) # field variable terms, sort into E_np, E_n-1p, ..., E_0p if placeholders["field_variables"]: if len(placeholders["field_variables"]) != 1: raise NotImplementedError field_var = placeholders["field_variables"][0] temp_order = field_var.order[0] exponent = field_var.data["exponent"] init_funcs = get_base(field_var.data["func_lbl"], field_var.order[1]) shape_funcs = np.array([func.raise_to(exponent) for func in init_funcs]) # for now we use .startswith and .endswith, while the function label # will manipulated from placeholder.Product._simplify_product # if not field_var.data["func_lbl"].startswith(field_var.data["weight_lbl"]): # if not field_var.data["func_lbl"].endswith(field_var.data["weight_lbl"]): if not True: raise ValueError("In the simulation infrastructure of pyinduct field variables with weight labels" "which differing from function labels not considered. Use this feature only for" "controller approximation.") if placeholders["inputs"]: # TODO think about this case, is it relevant? raise NotImplementedError # is the integrand a product? if placeholders["functions"]: if len(placeholders["functions"]) != 1: raise NotImplementedError func = placeholders["functions"][0] test_funcs = get_base(func.data["func_lbl"], func.order[1]) result = calculate_scalar_product_matrix(dot_product_l2, test_funcs, shape_funcs) else: # extract constant term and compute integral a = Scalars(np.atleast_2d([integrate_function(func, func.nonzero)[0] for func in shape_funcs])) if placeholders["scalars"]: b = placeholders["scalars"][0] else: b = Scalars(np.ones_like(a.data.T)) result = _compute_product_of_scalars([a, b]) if cfs.dynamic_form.weights is None: cfs.dynamic_form.weights = field_var.data["weight_lbl"] cfs.add_to(field_var.data["weight_lbl"], dict(name="E", order=temp_order, exponent=exponent), result * term.scale) continue # TestFunction or pre evaluated terms, those can end up in E, f or G if placeholders["functions"]: if not 1 <= len(placeholders["functions"]) <= 2: raise NotImplementedError func = placeholders["functions"][0] test_funcs = get_base(func.data["func_lbl"], func.order[1]) if len(placeholders["functions"]) == 2: # TODO this computation is nonsense. Result must be a vector containing int(tf1*tf2) raise NotImplementedError func2 = placeholders["functions"][1] test_funcs2 = get_base(func2.data["func_lbl"], func2.order[2]) result = calculate_scalar_product_matrix(dot_product_l2, test_funcs, test_funcs2) cfs.add_to(weak_form.dynamic_weights, ("f", 0), result * term.scale) continue if placeholders["scalars"]: a = placeholders["scalars"][0] b = Scalars(np.vstack([integrate_function(func, func.nonzero)[0] for func in test_funcs])) result = _compute_product_of_scalars([a, b]) weight_lbl, target = get_common_target(placeholders["scalars"]) if cfs.dynamic_form.weights is None: cfs.dynamic_form.weights = weight_lbl cfs.add_to(weight_lbl, target, result * term.scale) continue if placeholders["inputs"]: if len(placeholders["inputs"]) != 1: raise NotImplementedError input_var = placeholders["inputs"][0] input_func = input_var.data["input"] input_index = input_var.data["index"] input_exp = input_var.data["exponent"] input_order = input_var.order[0] result = np.array([[integrate_function(func, func.nonzero)[0]] for func in test_funcs]) cfs.add_to(weak_form.dynamic_weights, dict(name="G", order=input_order, exponent=input_exp), result * term.scale, column=input_index) cfs.dynamic_form.input_function = input_func continue # pure scalar terms, sort into corresponding matrices if placeholders["scalars"]: result = _compute_product_of_scalars(placeholders["scalars"]) weight_lbl, target = get_common_target(placeholders["scalars"]) if placeholders["inputs"]: input_var = placeholders["inputs"][0] input_func = input_var.data["input"] input_index = input_var.data["index"] input_exp = input_var.data["exponent"] # here we would need to provide derivative handles in the callable input_order = input_var.order[0] if input_order > 0: raise NotImplementedError # this would mean that the input term should appear in a matrix like E1 or E2 if target["name"] == "E": raise NotImplementedError cfs.add_to(weak_form.dynamic_weights, dict(name="G", order=input_order, exponent=input_exp), result * term.scale, column=input_index) cfs.dynamic_form.input_function = input_func continue if cfs.dynamic_form.weights is None: cfs.dynamic_form.weights = weight_lbl cfs.add_to(weight_lbl, target, result * term.scale) continue if weak_form.dynamic_weights is None: return cfs.dynamic_form else: return cfs
def _compute_product_of_scalars(scalars): if len(scalars) > 2: raise NotImplementedError if len(scalars) == 1: res = scalars[0].data elif scalars[0].data.shape == scalars[1].data.shape: res = np.prod(np.array([scalars[0].data, scalars[1].data]), axis=0) # guarantee dyadic product no matter in which order args are passed elif scalars[0].data.shape[0] == scalars[1].data.shape[1] == 1: res = np.dot(scalars[1].data, scalars[0].data) elif scalars[0].data.shape[1] == scalars[1].data.shape[0] == 1: res = np.dot(scalars[0].data, scalars[1].data) else: raise NotImplementedError return res
[docs]def simulate_state_space(sys_ss, sys_init_state, temp_domain, obs_ss=None, obs_init_state=None, settings=None): """ Wrapper to simulate a system given in state space form: .. math:: \\dot{q} = A_pq^p + A_{p-1}q^{p-1} + \\dotsb + A_0q + Bu. Args: sys_ss (:py:class:`StateSpace`): State space formulation of the system. sys_init_state (numpy.array): Initial state vector of the system. temp_domain (:py:class:`Domain`): Temporal domain object. obs_ss (:py:class:`Observer`): State space formulation of the observer. obs_init_state (numpy.array): Initial state vector of the observer. settings (dict): Parameters to pass to the :func:`set_integrator` method of the :class:`scipy.ode` class, with the integrator name included under the key :obj:`name`. Return: tuple: Time :py:class:`Domain` object and weights matrix/matrices. """ if not isinstance(sys_ss, StateSpace) or isinstance(sys_ss, Observer): raise TypeError input_handle = sys_ss.input if not isinstance(input_handle, SimulationInput): raise TypeError("Only pyinduct.simulation.SimulationInput supported.") if isinstance(input_handle, SimulationInputVector): if any([isinstance(input, ObserverError) for input in input_handle.input_vector]): raise TypeError t = [temp_domain[0]] q = [np.hstack((sys_init_state.flatten(), np.empty(0) if obs_init_state is None else obs_init_state.flatten()))] split_index = list(sys_ss.A.values())[0].shape[0] # TODO export cython code? def _rhs(_t, _q, sys_ss, obs_ss, split_index): u = sys_ss.input(time=_t, weights=_q[:split_index], weight_lbl=sys_ss.weight_lbl, fam_tree=sys_ss.family_tree) sys_q_t = sys_ss.rhs_hint(_t, _q[:split_index], u, sys_ss) if obs_ss is None: obs_q_t = np.empty(0) else: obs_q_t = obs_ss.rhs_hint(_t, _q[:split_index], u, sys_ss, _q[split_index:], obs_ss) return np.hstack((sys_q_t.flatten(), obs_q_t.flatten())) r = ode(_rhs) # TODO check for complex-valued matrices and use 'zvode' if settings: r.set_integrator(settings.pop("name"), **settings) else: # use some sane defaults r.set_integrator( "vode", max_step=temp_domain.step, method="adams", nsteps=1e3 ) r.set_f_params(sys_ss, obs_ss, split_index) r.set_initial_value(q[0], t[0]) for t_step in temp_domain[1:]: qn = r.integrate(t_step) if not r.successful(): warnings.warn("*** Error: Simulation aborted at t={} ***".format(r.t)) break t.append(r.t) q.append(qn) # create results q = np.array(q) def split_weight_matrix(q, family_tree): # indices to cut the weights matrix in slices (corresponding tor the weights in sys_ss.family_tree) if family_tree is None: slice_indices = [(0, q.shape[1])] else: slice_indices = [si["slice_indices"] for si in family_tree.values()] return [q[:, a:b] for a, b in slice_indices] sys_q = q[:, :split_index] sys_weights_matrices = split_weight_matrix(sys_q, sys_ss.family_tree) sim_domain = Domain(points=np.array(t), step=temp_domain.step) if obs_ss is None: return [sim_domain] + sys_weights_matrices else: obs_q = q[:, split_index:] obs_weights_matrices = split_weight_matrix(obs_q, obs_ss.family_tree) return [sim_domain] + sys_weights_matrices + obs_weights_matrices
[docs]def evaluate_approximation(base_label, weights, temp_domain, spat_domain, spat_order=0, name=""): """ Evaluate an approximation given by weights and functions at the points given in spatial and temporal steps. Args: weights: 2d np.ndarray where axis 1 is the weight index and axis 0 the temporal index. base_label (str): Functions to use for back-projection. temp_domain (:py:class:`Domain`): For steps to evaluate at. spat_domain (:py:class:`Domain`): For points to evaluate at (or in). spat_order: Spatial derivative order to use. name: Name to use. Return: :py:class:`pyinduct.visualization.EvalData` """ funcs = get_base(base_label, spat_order) if weights.shape[1] != funcs.shape[0]: raise ValueError("weights (len={0}) have to fit provided functions (len={1})!".format(weights.shape[1], funcs.size)) # evaluate shape functions at given points shape_vals = np.array([func.evaluation_hint(spat_domain) for func in funcs]) def eval_spatially(weight_vector): return np.real_if_close(np.dot(weight_vector, shape_vals), 1000) data = np.apply_along_axis(eval_spatially, 1, weights) return EvalData([temp_domain, spat_domain], data, name=name)
""" Control section """
[docs]class FeedbackLaw(object): """ This class represents the approximated formulation of a control law or observer error. It can be initialized with several terms (see children of :py:class:`pyinduct.placeholder.EquationTerm`). The equation is interpreted as .. math:: term_0 + term_1 + ... + term_N = u where :math:`u` is the control output. Args: terms (list): List with object(s) of type :py:class:`pyinduct.placeholder.EquationTerm`. """ def __init__(self, terms, name=""): if isinstance(terms, EquationTerm): terms = [terms] if not isinstance(terms, list): raise TypeError("only (list of) {0} allowed".format(EquationTerm)) for term in terms: if not isinstance(term, EquationTerm): raise TypeError("Only EquationTerm(s) are accepted.") self.terms = terms self.name = name
[docs]class Feedback(SimulationInput): """ Wrapper class for all state feedbacks that have to interact with the simulation environment. Args: feedback_law (:py:class:`FeedbackLaw`): Function handle that calculates the state feedback if provided with correct weights. """ def __init__(self, feedback_law): SimulationInput.__init__(self, name=feedback_law.name) c_forms = approximate_feedback_law(feedback_law) self._evaluator = LawEvaluator(c_forms, self._value_storage) def _calc_output(self, **kwargs): """ Calculates the feedback based on the current_weights. Keyword Args: weights: Current weights of the simulations system approximation. weights_lbl (str): Corresponding label of :code:`weights`. Return: dict: Feedback under the key :code:`"output"`. """ return self._evaluator(kwargs["weights"], kwargs["weight_lbl"], kwargs["fam_tree"])
[docs]class LawEvaluator(object): """ Object that evaluates the feedback law approximation given by a :py:class:`pyinduct.simulation.CanonicalForms` object. Args: cfs (:py:class:`pyinduct.simulation.FeedbackCanonicalForms`): evaluation handle """ def __init__(self, cfs, storage=None): self._cfs = cfs self._transformations = {} self._eval_vectors = {} self._storage = storage @staticmethod def _build_eval_vector(terms): """ Build a set of vectors that will compute the output by multiplication with the corresponding power of the weight vector. Args: terms (dict): coefficient vectors Return: dict: evaluation vector """ orders = set(terms["E"].keys()) powers = set(chain.from_iterable([list(mat) for mat in terms["E"].values()])) dim = next(iter(terms["E"][max(orders)].values())).shape vectors = dict() for power in powers: vector = np.hstack([terms["E"].get(order, {}).get(1, np.zeros(dim)) for order in range(max(orders) + 1)]) vectors.update({power: vector}) return vectors def __call__(self, weights, weight_label, family_tree=None): """ Evaluation function for approximated feedback law. Args: weights (numpy.ndarray): 1d ndarray of approximation weights. weight_label (string): Label of functions the weights correspond to. Return: dict: control output :math:`u` """ res = {} output = 0 + 0j # add dynamic part for lbl, law in self._cfs.get_dynamic_terms().items(): dst_weights = [0] if "E" in law is not None: # build eval vector if lbl not in self._eval_vectors.keys(): self._eval_vectors[lbl] = self._build_eval_vector(law) if family_tree is None: # collect information info = TransformationInfo() info.src_lbl = weight_label info.dst_lbl = lbl info.src_base = get_base(weight_label, 0) info.dst_base = get_base(lbl, 0) info.src_order = int(weights.size / info.src_base.size) - 1 info.dst_order = int(next(iter(self._eval_vectors[lbl].values())).size / info.dst_base.size) - 1 # look up transformation if info not in self._transformations.keys(): # fetch handle handle = get_weight_transformation(info) self._transformations[info] = handle # transform weights dst_weights = self._transformations[info](weights) # TODO: Support transformation hints for nested weights, too! elif isinstance(family_tree, collections.OrderedDict): if not lbl in family_tree: raise ValueError("Transformation hints for nested weights not supported yet.") s_start, s_stop = family_tree[lbl]["slice_indices"] dst_weights = weights[s_start: s_stop] else: raise NotImplementedError # evaluate vectors = self._eval_vectors[lbl] for p, vec in vectors.items(): output = output + np.dot(vec, np.power(dst_weights, p)) res[lbl] = dst_weights # add constant term static_terms = self._cfs.get_static_terms() if "f" in static_terms: output = output + static_terms["f"] # TODO: replace with the one from utils if abs(np.imag(output)) > np.finfo(np.complex128).eps * 100: print("Warning: Imaginary part of output is nonzero! out = {0}".format(output)) out = np.real_if_close(output, tol=10000000) if np.imag(out) != 0: raise ValueError("calculated complex control output u={0}," " check for errors in feedback law!".format(out)) res["output"] = out return res
[docs]class FeedbackCanonicalForms(object): """ Wrapper that holds several entities of canonical forms for different sets of weights. """ def __init__(self, name): self.name = name self._dynamic_forms = {} self._static_form = CanonicalForm(self.name + "static")
[docs] def add_to(self, weight_label, term, val): """ Add val to the canonical form for weight_label, see :py:func:`CanonicalForm.add_to` for further information. Args: weight_label (str): Basis to add onto. term: Coefficient to add onto, see :py:func:`CanonicalForm.add_to`. val: Values to add. """ if term["name"] in "fG": # hold f and g vector separately self._static_form.add_to(term, val) return if weight_label not in list(self._dynamic_forms.keys()): self._dynamic_forms[weight_label] = CanonicalForm("_".join([self.name + weight_label])) self._dynamic_forms[weight_label].add_to(term, val)
[docs] def get_static_terms(self): """ Return: Terms that do not depend on a certain weight set. """ return self._static_form.get_terms()
[docs] def get_dynamic_terms(self): """ Return: dict: Dictionary of terms for each weight set. """ return {label: val.get_terms() for label, val in self._dynamic_forms.items()}
[docs]def approximate_feedback_law(feedback_law): """ Function that approximates the feedback law, given by a list of sum terms that equal u. The result is a function handle that contains pre-evaluated terms and only needs the current weights (and their respective label) to be applied. Args: feedback_law (:py:class:`FeedbackLaw`): Function handle that calculates the feedback law output if provided with correct weights. Return: :py:class:`pyinduct.simulation.FeedbackCanonicalForms`: evaluation handle """ print("approximating feedback law {}".format(feedback_law.name)) if not isinstance(feedback_law, FeedbackLaw): raise TypeError("only input of Type FeedbackLaw allowed!") return _parse_feedback_law(feedback_law)
[docs]def _parse_feedback_law(law): """ Parses the given feedback law by approximating given terms. Args: law (list): List of :py:class:`pyinduct.placeholders.EquationTerm`'s Return: :py:class:`pyinduct.simulation.FeedbackCanonicalForms`: evaluation handle """ # check terms for term in law.terms: if not isinstance(term, EquationTerm): raise TypeError("only EquationTerm(s) accepted.") cfs = FeedbackCanonicalForms(law.name) for term in law.terms: placeholders = dict([ ("field_variables", term.arg.get_arg_by_class(FieldVariable)), ("scalars", term.arg.get_arg_by_class(Scalars)), ]) if placeholders["field_variables"]: field_var = placeholders["field_variables"][0] temp_order = field_var.order[0] func_lbl = field_var.data["func_lbl"] weight_lbl = field_var.data["weight_lbl"] init_funcs = get_base(func_lbl, field_var.order[1]) factors = np.atleast_2d([integrate_function(func, domain_intersection(term.limits, func.nonzero))[0] for func in init_funcs]) if placeholders["scalars"]: scales = placeholders["scalars"][0] res = np.prod(np.array([factors, scales]), axis=0) else: res = factors # HACK! hardcoded exponent cfs.add_to(weight_lbl, dict(name="E", order=temp_order, exponent=1), res * term.scale) elif placeholders["scalars"]: # TODO make sure that all have the same target form! scalars = placeholders["scalars"] if len(scalars) > 1: # TODO if one of 'em is just a scalar and no array an error occurs res = np.prod(np.array([scalars[0].data, scalars[1].data]), axis=0) else: res = scalars[0].data cfs.add_to(scalars[0].target_weight_label, get_common_target(scalars)[1], res * term.scale) else: raise NotImplementedError return cfs
""" Observer section """
[docs]class Observer(StateSpace): """ Standard observer class which correspond structurally to the standard state space implementation :py:class:`StateSpace` (from which it is derived). .. math:: \\dot{\\boldsymbol{x}}(t) &= \\boldsymbol{A}\\boldsymbol{x}(t) + \\boldsymbol{B}u(t) + \\boldsymbol{L} \\tilde{\\boldsymbol{y}} \\\\ \\boldsymbol{y}(t) &= \\boldsymbol{C}\\boldsymbol{x}(t) + \\boldsymbol{D}u(t) Where :math:`\\tilde{\\boldsymbol{y}}` is the observer error. The corresponding infinite dimensional observer has been approximated by a base given by weight_label. Args: weight_label: Label that has been used for approximation. a_matrices: :math:`\\boldsymbol{A_p}, \\dotsc, \\boldsymbol{A_0},` b_matrices: :math:`\\boldsymbol{B_q}, \\dotsc, \\boldsymbol{B_0},` input_handle: :math:`u(t)` f_vector: c_matrix: :math:`\\boldsymbol{C}` d_matrix: :math:`\\boldsymbol{D}` family_tree: l_matrices: """ def __init__(self, weight_label, a_matrices, b_matrices, input_handle=None, f_vector=None, c_matrix=None, d_matrix=None, family_tree=None, l_matrices=None): StateSpace.__init__(self, weight_label, a_matrices, b_matrices, input_handle, f_vector, c_matrix, d_matrix, family_tree) if isinstance(l_matrices, np.ndarray): self.L = {1: l_matrices} else: self.L = l_matrices if self.L is None: self.L = {1: np.zeros((self.A[1].shape[0], 1))} def rhs_hint(self, _t, _sys_q, _u, sys_ss, _obs_q, obs_ss): q_t = StateSpace.rhs_hint(self, _t, _obs_q, _u, obs_ss) obs_error = obs_ss.input(time=_t, sys_weights=_sys_q, sys_weights_lbl=sys_ss.weight_lbl, sys_fam_tree=sys_ss.family_tree, obs_weights=_obs_q, obs_weights_lbl=obs_ss.weight_lbl, obs_fam_tree=obs_ss.family_tree) for p, l_mat in obs_ss.L.items(): q_t = q_t + np.asarray(np.dot(l_mat, np.power(obs_error, p))).flatten() return np.asarray(q_t)
[docs]class ObserverError(SimulationInput): """ Wrapper class for all observer errors that have to interact with the simulation environment. The terms which have to approximated on the basis of the system weights have to provided through the argument :code:`sys_part` and the terms which have to approximated on the basis of the observer weights have to provided through the argument :code:`obs_part`. The observer error is provided as sum of the :py:class:`FeedbackLaw`'s :code:`sys_part` and :code:`obs_part`. Args: sys_part (:py:class:`FeedbackLaw`): Hold the terms which approximated from system weights. obs_part (:py:class:`FeedbackLaw`): Hold the terms which approximated from observer weights. Keyword Args: weighted_initial_error (numbers.Number): Time :math:`t^*` (default: None). If the kwarg is given the observer error will be weighted :math:`\\tilde{y}_{\\varphi}(t) = \\varphi(t)\\tilde{y}(t)` with the function .. math:: :nowrap: \\begin{align*} \\varphi(t) = \\left\\{\\begin{array}{lll} 0 & \\forall & t < 0 \\\\ (1-\\cos\\pi\\frac{t}{t^*}) / 2 & \\forall & t \\in [0, 1] \\\\ 1 & \\forall & t > 1. \\end{array}\\right. \\end{align*} """ def __init__(self, sys_part, obs_part, weighted_initial_error=None): SimulationInput.__init__(self, name="observer error: " + sys_part.name + " + " + obs_part.name) sys_c_forms = approximate_feedback_law(sys_part) self._sys_evaluator = LawEvaluator(sys_c_forms) obs_c_forms = approximate_feedback_law(obs_part) self._obs_evaluator = LawEvaluator(obs_c_forms) self._weighted_initial_error = weighted_initial_error def _calc_output(self, **kwargs): """ Calculates the observer error based on the system and the observer weights. Keyword Args: sys_weights: Current weights of the simulations system approximation. sys_weights_lbl (str): Corresponding label of :code:`sys_weights`. obs_weights: Current weights of the observer system approximation. obs_weights_lbl (str): Corresponding label of :code:`obs_weights`. Return: dict: Feedback under the key :code:`"output"`. """ sp = self._sys_evaluator(kwargs["sys_weights"], kwargs["sys_weights_lbl"], kwargs["sys_fam_tree"])["output"] op = self._obs_evaluator(kwargs["obs_weights"], kwargs["obs_weights_lbl"], kwargs["obs_fam_tree"])["output"] if self._weighted_initial_error is None or np.isclose(self._weighted_initial_error, 0): return dict(output=sp + op) else: t = np.clip(kwargs["time"], 0, self._weighted_initial_error) phi = (1 - np.cos(t / self._weighted_initial_error * np.pi)) * .5 return dict(output=phi * (sp + op))
[docs]def build_observer_from_state_space(state_space): """ Return a :py:class:`Observer` object based on the given :py:class:`StateSpace` object. The method return :code:`None` if state_space.input is not a instance of :py:class:`ObserverError` or if self._input_function is a instance of :py:class:`SimulationInputSum` which not hold any :py:class:`ObserverError` instance. Args: state_space (:py:class:`StateSpace`): State space to be convert. Returns: :py:class:`pyinduct.simulation.Observer` or None: See docstring. """ input_func = state_space.input if not isinstance(input_func, SimulationInput): raise TypeError("The input function of the given state space must be a instance from SimulationInput.") if not isinstance(input_func, ObserverError): if isinstance(input_func, SimulationInputVector): if not any([isinstance(i, ObserverError) for i in input_func.input_vector]): return None else: raise NotImplementedError else: return Observer(state_space.weight_lbl, state_space.A, None, state_space.input, state_space.f, state_space.C, state_space.D, state_space.family_tree, l_matrices=state_space.B) obs_b_matrices = dict() obs_l_matrices = dict() for exp, mat in state_space.B.items(): obs_b_matrices[exp] = np.hstack( tuple([state_space.B[exp][:, index] for index in state_space.input._sys_inputs.keys()]) ) obs_l_matrices[exp] = np.hstack( tuple([state_space.B[exp][:, index] for index in state_space.input._obs_errors.keys()]) ) return Observer(state_space.weight_lbl, state_space.A, obs_b_matrices, state_space.input, state_space.f, state_space.C, state_space.D, state_space.family_tree, obs_l_matrices)