"""
In the module :py:mod:`pyinduct.trajectory` are some trajectory generators defined.
Besides you can find here a trivial (constant) input signal generator as
well as input signal generator for equilibrium to equilibrium transitions for
hyperbolic and parabolic systems.
"""
import sympy as sp
import warnings
from .simulation import SimulationInput
from numbers import Number
from . import eigenfunctions as ef
import scipy.misc as sm
import scipy.signal as sig
import numpy as np
import pyqtgraph as pg
# TODO move this to a more feasible location
sigma_tanh = 1.1
K_tanh = 2.
[docs]class ConstantTrajectory(SimulationInput):
"""
Trivial trajectory generator for a constant value as simulation input signal.
Args:
const (numbers.Number): Desired constant value of the output.
"""
def __init__(self, const=0):
SimulationInput.__init__(self)
self._const = const
def _calc_output(self, **kwargs):
if isinstance(kwargs["time"], (list, np.ndarray)):
return dict(output=np.ones(len(np.atleast_1d(kwargs["time"]))) * self._const)
elif isinstance(kwargs["time"], Number):
return dict(output=self._const)
else:
raise NotImplementedError
[docs]class SmoothTransition:
"""
A smooth transition between two given steady-states *states* on an *interval*
using either:
polynomial method
trigonometric method
To create smooth transitions.
Args:
states (tuple): States at beginning and end of interval.
interval (tuple): Time interval.
method (str): Method to use (``poly`` or ``tanh``).
differential_order (int): Grade of differential flatness :math:`\\gamma`.
"""
def __init__(self, states, interval, method, differential_order=0):
self.yd = states
self.t0 = interval[0]
self.t1 = interval[1]
self.dt = interval[1] - interval[0]
# setup symbolic expressions
if method == "tanh":
tau, sigma = sp.symbols('tau, sigma')
# use a gevrey-order of alpha = 1 + 1/sigma
sigma = 1.1
phi = .5 * (1 + sp.tanh(2 * (2 * tau - 1) / ((4 * tau * (1 - tau)) ** sigma)))
elif method == "poly":
gamma = differential_order # + 1 # TODO check this against notes
tau, k = sp.symbols('tau, k')
alpha = sp.factorial(2 * gamma + 1)
f = sp.binomial(gamma, k) * (-1) ** k * tau ** (gamma + k + 1) / (gamma + k + 1)
phi = alpha / sp.factorial(gamma) ** 2 * sp.summation(f, (k, 0, gamma))
else:
raise NotImplementedError("method {} not implemented!".format(method))
# differentiate phi(tau), index in list corresponds to order
dphi_sym = [phi] # init with phi(tau)
for order in range(differential_order):
dphi_sym.append(dphi_sym[-1].diff(tau))
# lambdify
self.dphi_num = []
for der in dphi_sym:
self.dphi_num.append(sp.lambdify(tau, der, 'numpy'))
def __call__(self, *args, **kwargs):
return self._desired_values(args[0])
def _desired_values(self, t):
"""
Calculates the desired trajectory and its derivatives for time-step *t*.
Args:
t (array_like): Time-step for which trajectory and derivatives are needed.
Return:
numpy.ndarray: :math:`\\boldsymbol{y}_d = \\left(y_d, \\dot{y}_d, \\ddot{y}_d, \\dotsc, \\y_d^{(\\gamma)}\\right)`
"""
y = np.zeros((len(self.dphi_num)))
if t <= self.t0:
y[0] = self.yd[0]
elif t >= self.t1:
y[0] = self.yd[1]
else:
for order, dphi in enumerate(self.dphi_num):
if order == 0:
ya = self.yd[0]
else:
ya = 0
y[order] = ya + (self.yd[1] - self.yd[0]) * dphi((t - self.t0) / self.dt) * 1 / self.dt ** order
return y
[docs]class FlatString(SimulationInput):
"""
Class that implements a flatness based control approach
for the "string with mass" model.
"""
def __init__(self, y0, y1, z0, z1, t0, dt, params):
SimulationInput.__init__(self)
# store params
self._tA = t0
self._dt = dt
self._dz = z1 - z0
self._m = params.m # []=kg mass at z=0
self._tau = params.tau # []=m/s speed of wave translation in string
self._sigma = params.sigma # []=kgm/s**2 pretension of string
# construct trajectory generator for yd
ts = max(t0, self._dz * self._tau) # never too early
self.trajectory_gen = SmoothTransition((y0, y1), (ts, ts + dt), method="poly", differential_order=2)
# create vectorized functions
self.control_input = np.vectorize(self._control_input, otypes=[np.float])
self.system_state = np.vectorize(self._system_sate, otypes=[np.float])
def _control_input(self, t):
"""
Control input for system gained through flatness based approach that will
satisfy the target trajectory for y.
Args:
t: time
Return:
input force f
"""
yd1 = self.trajectory_gen(t - self._dz * self._tau)
yd2 = self.trajectory_gen(t + self._dz * self._tau)
return 0.5 * self._m * (yd2[2] + yd1[2]) + self._sigma * self._tau / 2 * (yd2[1] - yd1[1])
def _system_sate(self, z, t):
"""
x(z, t) of string-mass system for given flat output y.
Args:
z: location
t: time
Return:
state (deflection of string)
"""
yd1 = self.trajectory_gen(t - z * self._tau)
yd2 = self.trajectory_gen(t + z * self._tau)
return self._m / (2 * self._sigma * self._tau) * (yd2[1] - yd1[1]) + .5 * (yd1[0] + yd2[0])
def _calc_output(self, **kwargs):
"""
Use time to calculate system input and return force.
Keyword Args:
time:
Return:
dict: Result is the value of key "output".
"""
return dict(output=self._control_input(kwargs["time"]))
# TODO: kwarg: t_step
[docs]def gevrey_tanh(T, n, sigma=sigma_tanh, K=K_tanh):
"""
Provide Gevrey function
.. math::
\\eta(t) = \\left\\{ \\begin{array}{lcl}
0 & \\forall & t<0 \\\\
\\frac{1}{2} + \\frac{1}{2}\\tanh \\left(K \\frac{2(2t-1)}{(4(t^2-t))^\\sigma} \\right) & \\forall & 0\\le t \\le T \\\\
1 & \\forall & t>T \\end{array} \\right.
with the Gevrey-order :math:`\\rho=1+\\frac{1}{\\sigma}` and the derivatives up to order n.
Note:
For details of the recursive calculation of the derivatives see:
Rudolph, J., J. Winkler und F. Woittennek: Flatness Based Control of Distributed Parameter \
Systems: Examples and Computer Exercises from Various Technological Domains (Berichte aus \
der Steuerungs- und Regelungstechnik). Shaker Verlag GmbH, Germany, 2003.
Args:
T (numbers.Number): End of the time domain=[0,T].
n (int): The derivatives will calculated up to order n.
sigma (numbers.Number): Constant :math:`\\sigma` to adjust the Gevrey order :math:`\\rho=1+\\frac{1}{\\sigma}` of :math:`\\varphi(t)`.
K (numbers.Number): Constant to adust the slope of :math:`\\varphi(t)`.
Return:
tuple: (numpy.array([[:math:`\\varphi(t)`], ... ,[:math:`\\varphi^{(n)}(t)`]]), numpy.array([0,...,T])
"""
t_init = t = np.linspace(0., T, int(0.5 * 10 ** (2 + np.log10(T))))
# pop
t = np.delete(t, 0, 0)
t = np.delete(t, -1, 0)
# main
tau = t / T
a = dict()
a[0] = K * (4 * tau * (1 - tau)) ** (1 - sigma) / (2 * (sigma - 1))
a[1] = (2 * tau - 1) * (sigma - 1) / (tau * (1 - tau)) * a[0]
for k in range(2, n + 2):
a[k] = (tau * (1 - tau)) ** -1 * (
(sigma - 2 + k) * (2 * tau - 1) * a[k - 1] + (k - 1) * (2 * sigma - 4 + k) * a[k - 2])
yy = dict()
yy[0] = np.tanh(a[1])
if n > 0:
yy[1] = a[2] * (1 - yy[0] ** 2)
z = dict()
z[0] = (1 - yy[0] ** 2)
for i in range(2, n + 1):
sum_yy = np.zeros(len(t))
for k in range(i):
if k == 0:
sum_z = np.zeros(len(t))
for j in range(i):
sum_z += -sm.comb(i - 1, j) * yy[j] * yy[i - 1 - j]
z[i - 1] = sum_z
sum_yy += sm.comb(i - 1, k) * a[k + 2] * z[i - 1 - k]
yy[i] = sum_yy
# push
phi = np.nan * np.zeros((n + 1, len(t) + 2))
for i in range(n + 1):
phi_temp = 0.5 * yy[i]
if i == 0:
phi_temp += 0.5
phi_temp = np.insert(phi_temp, 0, [0.], axis=0)
phi[i, :] = np.append(phi_temp, [1.])
else:
phi_temp = np.insert(phi_temp, 0, [0.], axis=0)
# attention divide by T^i
phi[i, :] = np.append(phi_temp, [0.]) / T ** i
return phi, t_init
def _power_series_flat_out(z, t, n, param, y, bound_cond_type):
"""
Provide the power series approximation for x(z,t) and x'(z,t).
Args:
z (numpy.ndarray): [0, ..., l]
t (numpy.ndarray): [0, ... , t_end]
n (int): Series termination index.
param (array_like): [a2, a1, a0, alpha, beta]
y (array_like): Flat output and derivatives np.array([[y],...,[y^(n/2)]]).
Return:
Field variable x(z,t) and spatial derivative x'(z,t).
"""
# TODO: documentation
a2, a1, a0, alpha, beta = param
shape = (len(t), len(z))
x = np.nan * np.ones(shape)
d_x = np.nan * np.ones(shape)
# Actually power_series() is designed for robin boundary condition by z=0.
# With the following modification it can also used for dirichlet boundary condition by z=0.
if bound_cond_type is 'robin':
is_robin = 1.
elif bound_cond_type is 'dirichlet':
alpha = 1.
is_robin = 0.
else:
raise ValueError("Selected Boundary condition {0} not supported! Use 'robin' or 'dirichlet'".format(
bound_cond_type))
# TODO: flip iteration order: z <--> t, result: one or two instead len(t) call's
for i in range(len(t)):
sum_x = np.zeros(len(z))
for j in range(n):
sum_b = np.zeros(len(z))
for k in range(j + 1):
sum_b += sm.comb(j, k) * (-a0) ** (j - k) * y[k, i]
sum_x += (is_robin + alpha * z / (2. * j + 1.)) * z ** (2 * j) / sm.factorial(2 * j) / a2 ** j * sum_b
x[i, :] = sum_x
for i in range(len(t)):
sum_x = np.zeros(len(z))
for j in range(n):
sum_b = np.zeros(len(z))
for k in range(j + 2):
sum_b += sm.comb(j + 1, k) * (-a0) ** (j - k + 1) * y[k, i]
if j == 0:
sum_x += alpha * y[0, i]
sum_x += (is_robin + alpha * z / (2. * (j + 1))) * z ** (2 * j + 1) / sm.factorial(2 * j + 1) / a2 ** (
j + 1) * sum_b
d_x[i, :] = sum_x
return x, d_x
[docs]def coefficient_recursion(c0, c1, param):
"""
Return to the recursion
.. math:: c_k(t) = \\frac{ \dot c_{k-2}(t) - a_1 c_{k-1}(t) - a_0 c_{k-2}(t) }{ a_2 }
with initial values
.. math::
c_0 = numpy.array([c_0^{(0)}, ... , c_0^{(N)}]) \\\\
c_1 = numpy.array([c_1^{(0)}, ... , c_1^{(N)}])
as much as computable subsequent coefficients
.. math::
c_2 = numpy.array&([c_2^{(0)}, ... , c_2^{(N-1)}]) \\\\
c_3 = numpy.array&([c_3^{(0)}, ... , c_3^{(N-1)}]) \\\\
&\\vdots \\\\
c_{2N-1} = numpy.array&([c_{2N-1}^{(0)}]) \\\\
c_{2N} = numpy.array&([c_{2N}^{(0)}])
Args:
c0 (array_like): :math:`c_0`
c1 (array_like): :math:`c_1`
param (array_like): (a_2, a_1, a_0, None, None)
Return:
dict: :math:`C = \\{0: c_0, 1: c_1, ..., 2N-1: c_{2N-1}, 2N: c_{2N}\\}`
"""
# TODO: documentation: only constant coefficients
if c0.shape != c1.shape:
raise ValueError
a2, a1, a0, _, _ = param
N = c0.shape[0]
C = dict()
C[0] = c0
C[1] = c1
for i in range(2, 2 * N):
reduced_derivative_order = int(i / 2.)
C[i] = np.nan * np.zeros((N - reduced_derivative_order, c0.shape[1]))
for j in range(N - reduced_derivative_order):
C[i][j, :] = (C[i - 2][j + 1, :] - a1 * C[i - 1][j, :] - a0 * C[i - 2][j, :]) / a2
return C
[docs]def temporal_derived_power_series(z, C, up_to_order, series_termination_index, spatial_der_order=0):
"""
Compute the temporal derivatives
.. math:: q^{(j,i)}(z=z^*,t) = \\sum_{k=0}^{N} \\underbrace{c_{k+j}^{(i)}}_{\\text{C[k+j][i,:]}} \\frac{{z^*}^k}{k!}, \\qquad i=0,...,n.
Args:
z (numbers.Number): Evaluation point :math:`z^*`.
C (dict):
Coeffient dictionary which keys correspond to the coefficient index. The values are 2D numpy.array's.
For example C[1] should provide a 2d-array with the coefficient :math:`c_1(t)` and at least :math:`n`
temporal derivatives
.. math:: \\text{np.array}([c_1^{(0)}(t), ... , c_1^{(i)}(t)]).
up_to_order (int): Max. temporal derivative order :math:`n` to compute.
series_termination_index (int): Series termination index :math:`N`.
spatial_der_order (int): Spatial derivativ order :math:`j`.
Return:
numpy.array( [:math:`q^{(j,0)}, ... , q^{(j,n)}`] )
"""
if not isinstance(z, Number):
raise TypeError
if any([C[i].shape[0] - 1 < up_to_order for i in range(series_termination_index + 1)]):
raise ValueError
len_t = C[0].shape[1]
Q = np.nan * np.zeros((up_to_order + 1, len_t))
for i in range(up_to_order + 1):
sum_Q = np.zeros(len_t)
for j in range(series_termination_index + 1 - spatial_der_order):
sum_Q += C[j + spatial_der_order][i, :] * z ** (j) / sm.factorial(j)
Q[i, :] = sum_Q
return Q
[docs]def power_series(z, t, C, spatial_der_order=0, temporal_der_order=0):
"""
Compute the function values
.. math:: x^{(j,i)}(z,t)=\sum_{k=0}^{N} c_{k+j}^{(i)}(t) \\frac{z^k}{k!}.
Args:
z (array_like): Spatial steps to compute.
t (array like): Temporal steps to compute.
C (dict):
Coeffient dictionary which keys correspond to the coefficient index. The values are 2D numpy.array's.
For example C[1] should provide a 2d-array with the coefficient :math:`c_1(t)` and at least :math:`i`
temporal derivatives
.. math:: \\text{np.array}([c_1^{(0)}(t), ... , c_1^{(i)}(t)]).
spatial_der_order (int): Spatial derivative order :math:`j`.
temporal_der_order (int): Temporal derivative order :math:`i`.
Return:
numpy.array: Array of shape (len(t), len(z)).
"""
if not all([isinstance(item, (Number, np.ndarray)) for item in [z, t]]):
raise TypeError
z = np.atleast_1d(z)
t = np.atleast_1d(t)
if not all([len(item.shape) == 1 for item in [z, t]]):
raise ValueError
x = np.nan * np.zeros((len(t), len(z)))
for i in range(len(z)):
sum_x = np.zeros(t.shape[0])
for j in range(len(C) - spatial_der_order):
sum_x += C[j + spatial_der_order][temporal_der_order, :] * z[i] ** j / sm.factorial(j)
x[:, i] = sum_x
if any([dim == 1 for dim in x.shape]):
x = x.flatten()
return x
[docs]class InterpTrajectory(SimulationInput):
"""
Provides a system input through one-dimensional linear interpolation between
the given vectors :math:`u` and :math:`t`.
Args:
t (array_like): Vector :math:`t` with time steps.
u (array_like): Vector :math:`u` with function values, corresponding to the vector :math:`t`.
show_plot (boolean): A plot window with plot(t, u) will pop up if it is true.
"""
def __init__(self, t, u, show_plot=False):
SimulationInput.__init__(self)
self._t = t
self._T = t[-1]
self._u = u
self.scale = 1
if show_plot:
self.get_plot()
def _calc_output(self, **kwargs):
return dict(output=np.interp(kwargs["time"], self._t, self._u) * self.scale)
[docs] def get_plot(self):
pw = pg.plot(title="InterpTrajectory", labels=dict(left='u(t)', bottom='t'), pen='b')
pw.plot(self._t, self.__call__(time=self._t), pen='b')
# pw.plot([0, self._T], self.__call__(time=[0, self._T]), pen=None, symbolPen=pg.mkPen("g"))
pg.QtGui.QApplication.instance().exec_()
return pw
[docs]class SignalGenerator(InterpTrajectory):
"""
Signal generator that combines :py:mod:`scipy.signal.waveforms` and :py:class:`InterpTrajectory`.
Args:
waveform (str): A waveform which is provided from :py:mod:`scipy.signal.waveforms`.
t (array_like): Array with time steps or :py:class:`pyinduct.simulation.Domain` instance.
scale (numbers.Number): Scale factor: output = waveform_output*scale + offset.
offset (numbers.Number): Offset value: output = waveform_output*scale + offset.
kwargs: The corresponding keyword arguments to the desired :py:mod:`scipy.signal` waveform.
In addition to the kwargs of the desired waveform function from scipy.signal
(which will simply forwarded) the keyword arguments :py:obj:`frequency`
(for waveforms: 'sawtooth' and 'square') and :py:obj:`phase_shift`
(for all waveforms) provided.
"""
def __init__(self, waveform, t, scale=1, offset=0, **kwargs):
if waveform not in sig.waveforms.__all__:
raise ValueError('Desired waveform is not provided from scipy.signal module.')
if not any([isinstance(value, Number) for value in [scale, offset]]):
raise ValueError('scale and offset must be a Number')
self._signal = getattr(sig, waveform)
if waveform in {'sawtooth', 'square'}:
# pop not scipy.signal.waveform.__all__ kwarg
try:
frequency = kwargs.pop('frequency')
except KeyError:
warnings.warn('If keyword argument frequency is not provided, it is set to 1.')
frequency = 1
t_gen_sig = 2 * np.pi * frequency * t
else:
if 'frequency' in kwargs.keys():
raise NotImplementedError
t_gen_sig = t
# pop not scipy.signal.waveform.__all__ kwarg
try:
phase_shift = kwargs.pop('phase_shift')
except KeyError as e:
phase_shift = 0
u = self._signal(t_gen_sig - phase_shift, **kwargs) * scale + offset
InterpTrajectory.__init__(self, t, u)
[docs]class RadTrajectory(InterpTrajectory):
"""
Class that implements a flatness based control approach
for the reaction-advection-diffusion equation
.. math:: \\dot x(z,t) = a_2 x''(z,t) + a_1 x'(z,t) + a_0 x(z,t)
with the boundary condition
- :code:`bound_cond_type == "dirichlet"`: :math:`x(0,t)=0`
- A transition from :math:`x'(0,0)=0` to :math:`x'(0,T)=1` is considered.
- With :math:`x'(0,t) = y(t)` where :math:`y(t)` is the flat output.
- :code:`bound_cond_type == "robin"`: :math:`x'(0,t) = \\alpha x(0,t)`
- A transition from :math:`x(0,0)=0` to :math:`x(0,T)=1` is considered.
- With :math:`x(0,t) = y(t)` where :math:`y(t)` is the flat output.
and the actuation
- :code:`actuation_type == "dirichlet"`: :math:`x(l,t)=u(t)`
- :code:`actuation_type == "robin"`: :math:`x'(l,t) = -\\beta x(l,t) + u(t)`.
The flat output :math:`y(t)` will calculated with :py:func:`gevrey_tanh`.
"""
# TODO: kwarg: t_step
def __init__(self, l, T, param_original, bound_cond_type, actuation_type, n=80, sigma=sigma_tanh, K=K_tanh,
show_plot=False):
cases = {'dirichlet', 'robin'}
if bound_cond_type not in cases:
raise TypeError('Type of boundary condition by z=0 is not understood.')
if actuation_type not in cases:
raise TypeError('Type of actuation_type is not understood.')
self._l = l
self._T = T
self._a1_original = param_original[1]
self._param = ef.transform2intermediate(param_original)
self._bound_cond_type = bound_cond_type
self._actuation_type = actuation_type
self._n = n
self._sigma = sigma
self._K = K
self._z = np.array([self._l])
y, t = gevrey_tanh(self._T, self._n + 2, self._sigma, self._K)
x, d_x = _power_series_flat_out(self._z, t, self._n, self._param, y, bound_cond_type)
a2, a1, a0, alpha, beta = self._param
if self._actuation_type is 'dirichlet':
u = x[:, -1]
elif self._actuation_type is 'robin':
u = d_x[:, -1] + beta * x[:, -1]
else:
raise NotImplementedError
# actually the algorithm consider the pde
# d/dt x(z,t) = a_2 x''(z,t) + a_0 x(z,t)
# with the following back transformation are also
# pde's with advection term a_1 x'(z,t) considered
u *= np.exp(-self._a1_original / 2. / a2 * l)
InterpTrajectory.__init__(self, t, u, show_plot=show_plot)