From cfa166d6d8d007aea941c7cb378928b70501283b Mon Sep 17 00:00:00 2001 From: Gabriele Gilardi Date: Sun, 14 Jun 2020 20:24:04 +0900 Subject: [PATCH] save copy --- Code_Python/KalmanFilter.py | 178 +++++++++++++++ Code_Python/filters.py | 418 ++++++++++++++++++++++++++++-------- Code_Python/test.py | 27 ++- Code_Python/utils.py | 344 +++++++++++++++++++++++++++++ 4 files changed, 864 insertions(+), 103 deletions(-) create mode 100644 Code_Python/KalmanFilter.py diff --git a/Code_Python/KalmanFilter.py b/Code_Python/KalmanFilter.py new file mode 100644 index 0000000..99e8f07 --- /dev/null +++ b/Code_Python/KalmanFilter.py @@ -0,0 +1,178 @@ + +import numpy as np + +measurements = np.array([5., 6., 7., 9., 10.]) +motion = np.array([1., 1., 2., 1., 1.]) +measurement_sigma = 4. +motion_sigma = 2. +mu = 0. +sigma = 1000. + +# Measurement +def Update( mean1, var1, mean2, var2 ): + mean = (var2*mean1 + var1*mean2) / (var1 + var2) + var = 1.0 / (1.0/var1 + 1.0/var2) + return [mean, var] + +# Motion +def Predict( mean1, var1, U, varU ): + mean = mean1 + U + var = var1 + varU + return [mean, var] + +for n in range(len(measurements)): + [mu, sigma] = Update(mu, sigma, measurements[n], measurement_sigma) + print('Update : ', n, [mu, sigma]) + [mu, sigma] = Predict(mu, sigma, motion[n],motion_sigma) + print('Predict: ', n, [mu, sigma]) + +print(' ') +print(Update(1,1,3,1)) + +# ------------------------------------------------------- + +import numpy as np + +measurements = [ 1., 2., 3. ] +dt = 1. + +# Initial state (location and velocity) +x = np.array([[ 0. ], + [ 0. ]]) +# Initial uncertainty +P = np.array([[ 1000., 0. ], + [ 0., 1000. ]]) +# External motion +U = np.array([[ 0. ], + [ 0. ]]) +# Next state function +F = np.array([[ 1., dt ], + [ 0., 1. ]]) +# Measurement function +H = np.array([[ 1., 0. ]]) +# Measurement uncertainty +R = np.array([[ 1. ]]) +# Identity matrix +I = np.eye(2) + + +def filter(x, P): + + for z in (measurements): + + # Measurement + Htra = H.T + S = H.dot(P.dot(Htra)) + R + Sinv = np.linalg.inv(S) + K = P.dot(Htra.dot(Sinv)) + y = z - H.dot(x) + xp = x +K.dot(y) + Pp = P - K.dot(H.dot(P)) + + # Prediction + x = F.dot(xp) + U + Ftra = F.T + P = F.dot(Pp.dot(Ftra)) + + print('x = ',x) + print('P = ',P) + +filter(x, P) + +# ------------------------------------------------------- + +import numpy as np + +# x0 = 4. +# y0 = 12. +# measurements = np.array([[ 5., 10. ], +# [ 6., 8. ], +# [ 7., 6. ], +# [ 8., 4. ], +# [ 9., 2. ], +# [ 10., 0. ]]) +# x0 = -4. +# y0 = 8. +# measurements = np.array([[ 1., 4. ], +# [ 6., 0. ], +# [ 11., -4. ], +# [ 16., -8. ]]) +# x0 = 1. +# y0 = 19. +# measurements = np.array([[ 1., 17. ], +# [ 1., 15. ], +# [ 1., 13. ], +# [ 1., 11. ]]) +x0 = 1. +y0 = 19. +measurements = np.array([[ 2., 17. ], + [ 0., 15. ], + [ 2., 13. ], + [ 0., 11. ]]) +# Time step +dt = 0.1 +# Initial state (location and velocity) +x = np.array([[ x0 ], + [ y0 ], + [ 0. ], + [ 0. ]]) +# Initial uncertainty +P = np.array([[ 0., 0., 0., 0. ], + [ 0., 0., 0., 0. ], + [ 0., 0., 1000., 0. ], + [ 0., 0., 0., 1000. ]]) +# External motion +U = np.array([[ 0. ], + [ 0. ], + [ 0. ], + [ 0. ]]) +# Next state function +F = np.array([[ 1., 0., dt, 0. ], + [ 0., 1., 0., dt ], + [ 0., 0., 1., 0. ], + [ 0., 0., 0., 1. ]]) +# Measurement function +H = np.array([[ 1., 0., 0., 0. ], + [ 0., 1., 0., 0. ]]) +# Measurement uncertainty +R = np.array([[ 0.1, 0. ], + [ 0. , 0.1 ]]) +# Measurement vector +z = np.zeros((2,1)) + + +def filter(x, P): + + for n in range(len(measurements)): + + z[0][0] = measurements[n][0] + z[1][0] = measurements[n][1] + + # Prediction + xp = F.dot(x) + U + Ftra = F.T + Pp = F.dot(P.dot(Ftra)) + + # Measurement + Htra = H.T + S = H.dot(Pp.dot(Htra)) + R + Sinv = np.linalg.inv(S) + K = Pp.dot(Htra.dot(Sinv)) + y = z - H.dot(xp) + x = xp +K.dot(y) + P = Pp - K.dot(H.dot(Pp)) + # print(z) + # print('x = ') + # print(x) + # print('P = ') + # print(P) + # print(' ') + + return x, P + + +x_final, P_final = filter(x, P) +print('x = ') +print(x_final) +print('P = ') +print(P_final) diff --git a/Code_Python/filters.py b/Code_Python/filters.py index abbc32c..1c0e2e4 100644 --- a/Code_Python/filters.py +++ b/Code_Python/filters.py @@ -4,83 +4,95 @@ Class for filter/smooth data. Copyright (c) 2020 Gabriele Gilardi -N = order/smoothing factor/number of past bars - alpha = damping term +References (both from John F. Ehlers): +[1] "Cycle Analytics for Traders: Advanced Technical Trading Concepts". +[2] "Signal Analysis, Filters And Trading Strategies". -General b,a Generic case (param = [b; a]) -SMA N Simple Moving Average -EMA N Exponential Moving Average -PassBand P,delta Pass band filter -StopBand P,delta Stop band filter -InstTrendline alpha Instantaneous trendline -GaussLow P,N Gauss, low pass (must be P > 1) -ZEMA1 N,K,Vn Zero-lag EMA (type 1) -ZEMA2 N,K Zero-lag EMA (type 2) -LWMA N Linearly Weighted Moving Average -MSMA N Modified Simple Moving Average -MLSQ N Modified Least-Squares Quadratic -GaussHigh P,N Gauss, high pass (must be P > 4) -ButterOrig P,N Butterworth original, order N (2 or 3) -ButterMod P,N Butterworth modified, order N (2 or 3) -SuperSmoother P, N Super smoother -SincFunction N Sinc function (N > 1, cut off at 0.5/N) +X (n_samples, n_series) Dataset to filter +b (n_b, ) Numerator coefficients +a (n_a, ) Denominator coefficients +Y (n_samples, n_series) Filtered dataset +idx scalar First filtered element in Y + +n_samples Number of data to filter +n_series Number of series to filter +nb Number of coefficients (numerator) +na Number of coefficients (denominator) + +Notes: +- the filter is applied starting from index. +- non filtered data are set equal to the original input, i.e. + Y[0:idx-1,:] = X[0:idx-1,:] + + +Filters (ref. [1]) +------------------ + +Generic b, a Generic case +SMA N Simple Moving Average +EMA N Exponential Moving Average +WMA N Weighted moving average +MSMA N Modified Simple Moving Average +MLSQ N Modified Least-Squares Quadratic (N = 5, 7, 9, 11) +ButterOrig P, N Butterworth original (N = 2, 3) +ButterMod P, N Butterworth modified (N = 2, 3) +SuperSmoother P, N Super smoother (N = 2, 3) +GaussLow P, N Gauss, low pass (P > 1) +GaussHigh P, N Gauss, high pass (P > 4) +Decycler P Decycler +DecyclerOsc P1, P2 Decycle oscillator + + + +ZEMA1 N, K, Vn Zero-lag EMA (type 1) +ZEMA2 N, K Zero-lag EMA (type 2) +MEMA N, Ns Modified EMA (with cubic velocity extimation) +PassBand P, delta Pass band filter +StopBand P, delta Stop band filter +InstTrendline alpha Instantaneous trendline +SincFunction N Sinc function (N > 1, cut off at 0.5/N) +Roofing P1, P2 Gauss,HP,2nd,P1 + Supersmoother,P2 + +N Order/smoothing factor/number of previous samples +alpha Damping term +P, P1, P2 Cut-off/critical period (50% power loss, -3 dB) + -b Coefficients at the numerator -a Coefficients at the denominator -P Cut of period (50% power loss, -3 dB) -N Order/smoothing factor K Coefficient/gain Vn Look back bar for the momentum delta Band centered in P and in percent (0.3 => 30% of P, = 0.3*P, if P = 10 => 0.3*10 = 3) -alpha Damping term nt Times the filter is called (order) +Ns Look back bar/skip factor """ import sys import numpy as np +import utils as utl def filter_data(X, b, a): """ - Applies a generic filter. - - Inputs: - X (n_samples, n_series) Data to filter - b Transfer response coefficients (numerator) - a Transfer response coefficients (denominator) - - Outputs: - Y Filtered data - idx Index of the first element in Y filtered - - Notes: - - the filter is applied from element 0 to len(X). - - elements from 0 to (idx-1) are set equal to the original input. + Applies a filter with transfer response coefficients and . """ n_samples, n_series = X.shape - Nb = len(b) - Na = len(a) - idx = np.amax([0, Nb-1, Na-1]) + nb = len(b) + na = len(a) + idx = np.amax([0, nb - 1, na - 1]) Y = X.copy() - # Apply filter for i in range(idx, n_samples): - tmp = np.zeros(n_series) - # Contribution from [b] (numerator) - for j in range(Nb): - tmp = tmp + b[j] * X[i-j,:] + for j in range(nb): + tmp = tmp + b[j] * X[i-j,:] # Numerator term - # Contribution from [a] (denominator) - for j in range(1, Na): - tmp = tmp - a[j] * Y[i-j, :] + for j in range(1, na): + tmp = tmp - a[j] * Y[i-j, :] # Denominator term - # Filtered value Y[i,:] = tmp / a[0] return Y, idx @@ -90,15 +102,24 @@ class Filter: def __init__(self, X): """ - X (nel_X, ) Data to filter """ self.X = np.asarray(X) + self.n_samples, self.n_series = X.shape self.idx = 0 + def Generic(self, b=1.0, a=1.0): + """ + Filter with generic transfer response coefficients and . + """ + b = np.asarray(b) + a = np.asarray(a) + Y, self.idx = filter_data(self.X, b, a) + return Y + def SMA(self, N=10): """ - Simple moving average (type ???). + Simple moving average (?? order, FIR, ?? band). """ b = np.ones(N) / N a = np.array([1.0]) @@ -107,20 +128,217 @@ class Filter: def EMA(self, N=10, alpha=None): """ - Exponential moving average (type ???). + Exponential moving average (?? order, IIR, pass ??). - If is not given it is determined as equivalent to a N-SMA. + If not given, is determined as equivalent to a N-SMA. """ if (alpha is None): alpha = 2.0 / (N + 1.0) b = np.array([alpha]) - a = np.array([1.0, alpha - 1.0]) + a = np.array([1.0, -(1.0 - alpha)]) Y, self.idx = filter_data(self.X, b, a) return Y + def WMA(self, N=10): + """ + Weighted moving average (?? order, FIR, pass ??). + + Example: N = 5 --> [5.0, 4.0, 3.0, 2.0, 1.0] / 15.0 + """ + w = np.arange(N,0,-1) + b = w / np.sum(w) + a = np.array([1.0]) + Y, self.idx = filter_data(self.X, b, a) + return Y + + def MSMA(self, N=10): + """ + Modified simple moving average (?? order, FIR, pass ??). + + Example: N = 4 --> [0.5, 1.0, 1.0, 1.0, 0.5] / 4.0 + """ + w = np.ones(N+1) + w[0] = 0.5 + w[N] = 0.5 + b = w / N + a = np.array([1.0]) + Y, self.idx = filter_data(self.X, b, a) + return Y + + def MLSQ(self, N=5): + """ + Modified simple moving average (?? order, FIR, pass ??). + + Only N = 5, 7, 9, and 11 are implemented. If not return the unfiltered + dataset. + """ + if (N == 5): + b = np.array([7.0, 24.0, 34.0, 24.0, 7.0]) / 96.0 + elif (N == 7): + b = np.array([1.0, 6.0, 12.0, 14.0, 12.0, 6.0, 1.0]) / 52.0 + elif (N == 9): + b = np.array([-1.0, 28.0, 78.0, 108.0, 118.0, 108.0, 78.0, 28.0, \ + -1.0]) / 544.0 + elif (N == 11): + b = np.array([-11.0, 18.0, 88.0, 138.0, 168.0, 178.0, 168.0, \ + 138.0, 88.0, 18.0, -11.0]) / 980.0 + else: + Y = self.X.copy() + return Y + a = np.array([1.0]) + Y, self.idx = filter_data(self.X, b, a) + return Y + + def ButterOrig(self, N=2, P=10): + """ + Butterworth original version (?? order, IIR, pass ??). + + Only N = 2 and 3 are implemented. If not return the unfiltered dataset. + """ + if (N == 2): + beta = np.exp(-np.sqrt(2.0) * np.pi / P) + alpha = 2.0 * beta * np.cos(np.sqrt(2.0) * np.pi / P) + b = np.array([1.0, 2.0, 1.0]) * (1.0 - alpha + beta ** 2.0) / 4.0 + a = np.array([1.0, -alpha, beta ** 2.0]) + elif (N == 3): + beta = np.exp(-np.pi / P) + alpha = 2.0 * beta * np.cos(np.sqrt(3.0) * np.pi / P) + b = np.array([1.0, 3.0, 3.0, 1.0]) \ + * (1.0 - alpha + beta ** 2.0) * (1.0 - beta ** 2.0) / 8.0 + a = np.array([1.0, - (alpha + beta ** 2.0), \ + (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) + else: + Y = self.X.copy() + return Y + Y, self.idx = filter_data(self.X, b, a) + return Y + + + def ButterMod(self, N=2, P=10): + """ + Butterworth modified version (?? order, IIR, pass ??). + + Only N = 2 and 3 are implemented. If not return the unfiltered dataset. + """ + if (N == 2): + beta = np.exp(-np.sqrt(2.0) * np.pi / P) + alpha = 2.0 * beta * np.cos(np.sqrt(2.0) * np.pi / P) + b = np.array([1.0 - alpha + beta ** 2.0]) + a = np.array([1.0, -alpha, beta ** 2.0]) + elif (N == 3): + beta = np.exp(-np.pi / P) + alpha = 2.0 * beta * np.cos(np.sqrt(3.0) * np.pi / P) + b = np.array([1.0 - alpha * (1.0 - beta ** 2.0) - beta ** 4.0]) + a = np.array([1.0, - (alpha + beta ** 2.0), \ + (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) + else: + Y = self.X.copy() + return Y + Y, self.idx = filter_data(self.X, b, a) + return Y + + + def SuperSmoother(self, N=2, P=10): + """ + SuperSmoother (?? order, IIR, pass ??). + + Only N = 2 and 3 are implemented. If not return the unfiltered dataset. + """ + if (N == 2): + beta = np.exp(-np.sqrt(2.0) * np.pi / P) + alpha = 2.0 * beta * np.cos(np.sqrt(2.0) * np.pi / P) + w = 1.0 - alpha + beta ** 2.0 + b = np.array([w, w]) / 2.0 + a = np.array([1.0, - alpha, beta ** 2.0]) + elif (N == 3): + beta = np.exp(-np.pi / P) + alpha = 2.0 * beta * np.cos(1.738 * np.pi / P) + w = 1.0 - alpha * (1.0 - beta ** 2.0) - beta ** 4.0 + b = np.array([w, w]) / 2.0 + a = np.array([1.0, - (alpha + beta ** 2.0), \ + (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) + else: + Y = self.X.copy() + return Y + Y, self.idx = filter_data(self.X, b, a) + return Y + + def GaussLow(self, P=2, N=1): + """ + Gauss low pass (IIR, N-th order, low pass). + + Must be P > 1. If not return the unfiltered dataset. + """ + if (P < 2): + Y = self.X.copy() + return Y + A = 2.0 ** (1.0 / N) - 1.0 + B = 4.0 * np.sin(np.pi / P) ** 2.0 + C = 2.0 * (np.cos(2.0 * np.pi/ P) - 1.0) + alpha = (-B + np.sqrt(B ** 2.0 - 4.0 * A * C)) / (2.0 * A) + b = np.array([alpha]) + a = np.array([1.0, - (1.0 - alpha)]) + Y = self.X.copy() + for i in range(N): + Y, self.idx = filter_data(Y, b, a) + return Y, b, a + + def GaussHigh(self, P=5, N=1): + """ + Gauss high pass (IIR, Nth order, high pass). + + Must be P > 4. If not return the unfiltered dataset. + """ + if (P < 5): + Y = self.X.copy() + return Y + A = 2.0 ** (1.0 / N) * np.sin(np.pi / P) ** 2.0 - 1.0 + B = 2.0 * (2.0 ** (1.0 / N) - 1.0) * (np.cos(2.0 * np.pi / P) - 1.0) + C = - B + alpha = (-B - np.sqrt(B ** 2.0 - 4.0 * A * C)) / (2.0 * A) + b = np.array([1.0 - alpha / 2.0, -(1.0 - alpha / 2.0)]) + a = np.array([1.0, - (1.0 - alpha)]) + Y = self.X - self.X[0, :] + for i in range(N): + Y, self.idx = filter_data(Y, b, a) + return Y, b, a + + + + def Decycler(self, P=10): + """ + Decycler (?? order, IIR, pass ??). Gauss,HP,1st,P + """ + pass + + # % Built subtracting high pass Gauss filter from 1 (only order 1) + # case 'Decycler' + # P = param(1); + # [TMP,nLast] = FiltersBasic(IN,'GaussHigh',[P 1]); + # OUT = IN - TMP; + + def DecyclerOsc(self, Pmin=1, Pmax=5): + """ + Decycler (?? order, IIR, pass ??). Gauss,HP,2nd,Pmax - Gauss,HP,2nd,Pmin + """ + pass + + # % (Gauss, HP, 2nd, Pmax - Gauss, HP, 2nd Pmin) + # % Pmax = larger cut off period + # % Pmin = smaller cut off period + # case 'DecyclerOsc' + # P1 = min(param); + # P2 = max(param); + # [TMP1,nLast] = FiltersBasic(IN,'GaussHigh',[P1 2]); + # [TMP2,nLast] = FiltersBasic(IN,'GaussHigh',[P2 2]); + # OUT = TMP2 - TMP1; + + + + def InstTrend(self, alpha=0.5): """ - Instantaneous Trendline (2nd order, IIR, low pass, Ehlers.) + Instantaneous Trendline (2nd order, IIR, low pass, Ehlers.). """ b = np.array([alpha - alpha ** 2.0 / 4.0, alpha ** 2.0 / 2.0, - alpha + 3.0 * alpha ** 2.0 / 4.0]) @@ -131,10 +349,6 @@ class Filter: def PassBand(self, P=5, delta=0.3): """ Pass Band (type ???). - - P = cut-off period (50% power loss, -3 dB) - delta = band centered in P and in percent - (Example: 0.3 => 30% of P => 0.3*P, if P = 10 => 0.3*10 = 3) """ beta = np.cos(2.0 * np.pi / P) gamma = np.cos(4.0 * np.pi * delta) / P @@ -147,9 +361,6 @@ class Filter: def StopBand(self, P=5, delta=0.3): """ Stop Band - P = cut-off period (50% power loss, -3 dB) - delta = band centered in P and in percent - (Example: 0.3 => 30% of P => 0.3*P, if P = 10 => 0.3*10 = 3) """ beta = cos(2.0*pi/float(P)) gamma = cos(2.0*pi*(2.0*delta)/float(P)) @@ -160,33 +371,15 @@ class Filter: Y, self.idx = Generalized(self.X, b, a) return Y - def GaussLow(self, P=2, N=1): - """ - Gauss Low (low pass, IIR, N-th order, must be P > 1) - P = cut-off period (50% power loss, -3 dB) - N = times the filter is called (order) - """ - P = np.array([2, P], dtype=int).max() # or error? warning? - A = 2.0**(1.0/float(N)) - 1.0 - B = 4.0*sin(pi/float(P))**2.0 - C = 2.0*(cos(2.0*pi/float(P)) - 1.0) - delta = sqrt(B**2.0 - 4.0*A*C) - alpha = (-B + delta)/(2.0*A) - b = np.array([alpha]) - a = np.array([1.0, -(1.0-alpha)]) - Y = np.copy(self.X) - for i in range(N): - Y, self.idx = Generalized(Y, b, a) - return Y - def ZEMA1(self, N=10, K=1.0, Vn=5): + def ZEMA1(self, N=10, alpha=None, K=1.0, Vn=5): """ - Zero lag Exponential Moving Average (type 1) - N = order/smoothing factor - K = coefficient/gain - Vn = look back bar for the momentum - The damping term is determined as equivalent to a N-SMA + Zero lag Exponential Moving Average (type 1). + + If not given, is determined as equivalent to a N-SMA. """ + if (alpha is None): + alpha = 2.0 / (N + 1.0) alpha = 2.0 / (float(N) + 1.0) b = np.zeros(Vn+1) b[0] = alpha * (1.0 + K) @@ -195,12 +388,47 @@ class Filter: Y, self.idx = Generalized(self.X, b, a) return Y - def Generalized(self, a, b): - """ - Generic filter with transfer response coefficients and - """ - b = np.asarray(b) - a = np.asarray(a) - Y, self.idx = filter_data(self.X, b, a) - return Y + +# % Zero lag Exponential Moving Average (type 2) +# % alpha is determined as equivalent to a N-SMA +# case 'ZEMA2' +# N = param(1); +# alpha = 2/(N+1); +# K = param(2); +# b = alpha*(1+K); +# a = [1 alpha*K-(1-alpha)]; +# [OUT,nLast] = GeneralizedFilter(IN,b,a,3); + + + + + +# % Sinc function +# case 'SincFunction' +# N = param(1); +# nel = 50; +# k=1:nel-1; +# b = [ 1/N sin(pi*k/N)./(pi*k) ]; +# a = 1; +# [OUT,nLast] = GeneralizedFilter(IN,b,a,1); + + # % Roofing filter: Gauss high pass 2nd order filter + SuperSmoother + # % P1 = cut off period for GaussHigh + # % P2 = cut off period for SuperSmoother + # case 'Roofing' + # P1 = param(1); + # P2 = param(2); + # [TMP,nLast] = FiltersBasic(IN,'GaussHigh',[P1 2]); + # [OUT,nLast] = FiltersBasic(TMP,'SuperSmoother',[P2 1]); + + + # % MEMA - Uses cubic velocity extimation + # case 'MEMA' + # N = param(1); + # Ns = param(2); + # alpha = 2/(N+1); + # [Vel,Acc,nLast] = FiltersMak(IN,'Cubic',Ns); + # for i = nLast:-1:1 + # OUT(i) = alpha*(IN(i)+Vel(i)/Ns) + (1-alpha)*OUT(i+1); + # end diff --git a/Code_Python/test.py b/Code_Python/test.py index 0793c0c..2223f87 100644 --- a/Code_Python/test.py +++ b/Code_Python/test.py @@ -3,15 +3,15 @@ Filters for time series. Copyright (c) 2020 Gabriele Gilardi - - ToDo: - use NaN/input values for points not filtered? -- what to use as previous Y in recursive filters? i.e. set Y=X or set Y=0? -- plot filtered data (utils, with generic number of arguments passed) -- plot filter characteristics (utils) - return idx? - util to test filter (impulse, utils) +- warning in filter when wrong order? +- use self.a and self.b +- remove a and b from plots +- in comments write what filters do +- is necessary to copy X for Y untouched? """ import sys @@ -31,7 +31,18 @@ n_samples = data.shape[0] data = data.reshape(n_samples, -1) spx = flt.Filter(data) -EMA = spx.EMA() -print(EMA[0:5,:]) + # args = (spx.X, spx.SMA(N=5), spx.EMA(alpha=0.7)) -# utl.plot_signals(args) \ No newline at end of file +# utl.plot_signals(args) + +# alpha = 0.8 +# bb = np.array([alpha]) +# aa = np.array([1.0, alpha - 1.0]) + + +res, bb, aa = spx.GaussHigh(N=3, P=10) +print(bb) +print(aa) + +utl.plot_frequency_response(bb, aa) +utl.plot_lag_response(bb, aa) diff --git a/Code_Python/utils.py b/Code_Python/utils.py index cfae932..9e2235c 100644 --- a/Code_Python/utils.py +++ b/Code_Python/utils.py @@ -4,6 +4,7 @@ Utility functions for ????. Copyright (c) 2020 Gabriele Gilardi """ +from scipy import signal import numpy as np import matplotlib.pyplot as plt @@ -117,3 +118,346 @@ def plot_signals(signals): plt.grid(b=True) plt.show() + + +def plot_frequency_response(b, a=1.0): + """ + """ + b = np.asarray(b) + a = np.asarray(a) + + w, h = signal.freqz(b, a) + h_db = 20.0 * np.log10(abs(h)) + wf = w / (2.0 * np.pi) + + plt.plot(wf, h_db) + plt.axhline(-3.0, lw=1.5, ls='--', C='r') + plt.grid(b=True) + plt.xlim(np.amin(wf), np.amax(wf)) + plt.xlabel('$\omega$ [rad/sample]') + plt.ylabel('$h$ [db]') + plt.title('b = ' + np.array_str(np.around(b, decimals=2)) \ + + ', a = ' + np.array_str(np.around(a, decimals=2))) + plt.show() + + +def plot_lag_response(b, a=1.0): + """ + """ + b = np.asarray(b) + a = np.asarray(a) + + w, gd = signal.group_delay((b, a)) + wf = w / (2.0 * np.pi) + + plt.plot(wf, gd) + plt.grid(b=True) + plt.xlim(np.amin(wf), np.amax(wf)) + plt.xlabel('$\omega$ [rad/sample]') + plt.ylabel('$gd$ [samples]') + plt.title('b = ' + np.array_str(np.around(b, decimals=2)) \ + + ', a = ' + np.array_str(np.around(a, decimals=2))) + plt.show() + + + +# function [y,t] = interpSignal(x,n1,n2,varargin) +# if (nargin == 4) +# dt = varargin{1}; +# else +# dt = 20; +# end +# N = length(x); +# Ts = (n2-n1)/(N-1); +# n = n1:Ts:n2; +# t = n1:Ts/dt:n2; +# nt = length(t); +# y = zeros(1,nt); +# for i = 1:nt +# a = x.*sinc( (t(i)-n)/Ts ); +# y(i) = sum(a); +# end +# end + + +# function [f] = plotFn(type,func,varargin) +# % 0 Plot real function +# % 1 Plot real/imag values +# % 2 Plot real/imag values in polar form +# % 3 Plot magnitude/phase + +# clf +# tiny = 1e-7; + +# % Check if func is a function or data +# if ( isa(func,"function_handle") ) +# N = varargin{1}; +# n = (0:N-1)'; +# f = func(n); +# else +# N = length(func); +# n = (0:N-1)'; +# f = func; +# end + +# % Clean data +# xRe = real(f); +# xIm = imag(f); +# xRe( abs(xRe) < tiny ) = 0; +# xIm( abs(xIm) < tiny ) = 0; + +# switch (type) + +# % Plot real function +# case 0 + +# stem(n,xRe,"b","filled") +# xlim([n(1) n(N)]) +# grid on +# xlabel("n") +# ylabel("f") +# box on + +# % Plot real/imag function +# case 1 +# subplot(2,1,1) +# stem(n,xRe,"b","filled") +# xlim([n(1) n(N)]) +# grid on +# xlabel("n") +# ylabel("Re") +# box on +# subplot(2,1,2) +# stem(n,xIm,"b","filled") +# xlim([n(1) n(N)]) +# grid on +# xlabel("n") +# ylabel("Im") +# box on + +# % Plot real/imag function in polar form +# case 2 +# scatter(xRe,xIm,"b","filled") +# maxRe = max( abs(xRe) ); +# maxIm = max( abs(xIm) ); +# m = max(maxRe,maxIm); +# dx = 2*m/50; +# text(xRe+dx,xIm,num2str(n)) +# xlim( [-m +m ]) +# ylim( [-m +m ]) +# axis("square") +# grid on +# hold on +# plot([-m 0; +m 0],[0 -m; 0 +m],"k") +# hold off +# xlabel("Real") +# ylabel("Imag") +# box on + +# % Plot magnitude/phase +# case 3 +# xMa = sqrt( xRe.^2 + xIm.^2 ); +# xAr = atan2(xIm,xRe); +# subplot(2,1,1) +# stem(n,xMa,"b","filled") +# xlim([n(1) n(N)]) +# grid on +# xlabel("n") +# ylabel("Magnitude") +# box on +# subplot(2,1,2) +# stem(n,xAr,"b","filled") +# xlim([n(1) n(N)]) +# ylim([-pi pi]) +# grid on +# xlabel("n") +# ylabel("Phase [rad]") +# box on + +# end + +# end + + +# function sP = SyntQT(P,type) +# % Function: generate a random (synthetic) price curve +# % +# % Inputs +# % ------ +# % P prices (for type equal to 'P' and 'R') +# % normal distribution data (for type equal to 'N') +# % P(1) = mean +# % P(2) = std +# % P(3) = length +# % type type of generation +# % P use returns from price +# % R use returns normal distribution +# % N use specified normal distribution +# % +# % Output +# % ------ +# % sP generated synthetic prices + +# % Check for number of arguments +# if (nargin ~= 2) +# fprintf(1,'\n'); +# error('Wrong number of arguments'); +# end + +# switch(type) + +# % Use actual returns from P to generate values +# case 'P' +# R = Price2ret(P,'S'); % "simple" method +# sR = phaseran(R,1); + +# % Use normal distribution to generate values +# % (mean and std are from the actual returns of P) +# case 'R' +# R = Price2ret(P,'S'); % "simple" method +# sR = normrnd(mean(R),std(R),length(R),1); + +# % Use defined normal distribution to generate values +# % P(1)=mean, P(2)=std, P(3)=length +# case 'N' +# sR = normrnd(P(1),P(2),P(3),1); + +# otherwise +# fprintf(1,'\n'); +# error('Type not recognized'); + +# end + +# % Use 'simple' method and P0 = 1 to determine price +# sP = Ret2price(sR,'S'); + +# end % End of function + + +# % Input data +# % ---------- +# % recblk: is a 2D array. Row: time sample. Column: recording. +# % An odd number of time samples (height) is expected. If that is not +# % the case, recblock is reduced by 1 sample before the surrogate +# % data is created. +# % The class must be double and it must be nonsparse. + +# % nsurr: is the number of image block surrogates that you want to +# % generate. + +# % Output data +# % --------------------- +# % surrblk: 3D multidimensional array image block with the surrogate +# % datasets along the third dimension + +# % Example 1 +# % --------- +# % x = randn(31,4); +# % x(:,4) = sum(x,2); % Create correlation in the data +# % r1 = corrcoef(x) +# % surr = phaseran(x,10); +# % r2 = corrcoef(surr(:,:,1)) % Check that the correlation is preserved + +# % Carlos Gias +# % Date: 21/08/2011 + +# % Reference: +# % Prichard, D., Theiler, J. Generating Surrogate Data for Time Series +# % with Several Simultaneously Measured Variables (1994) +# % Physical Review Letters, Vol 73, Number 7 + +# function surrblk = phaseran(recblk,nsurr) + +# % Get parameters +# [nfrms,nts] = size(recblk); +# if ( rem(nfrms,2) == 0 ) +# nfrms = nfrms-1; +# recblk = recblk(1:nfrms,:); +# end + +# % Get parameters +# len_ser = (nfrms-1)/2; +# interv1 = 2:len_ser+1; +# interv2 = len_ser+2:nfrms; + +# % Fourier transform of the original dataset +# fft_recblk = fft(recblk); + +# % Create the surrogate recording blocks one by one +# surrblk = zeros(nfrms,nts,nsurr); +# for k = 1:nsurr +# ph_rnd = rand([len_ser 1]); + +# % Create the random phases for all the time series +# ph_interv1 = repmat(exp(2*pi*1i*ph_rnd),1,nts); +# ph_interv2 = conj(flipud( ph_interv1)); + +# % Randomize all the time series simultaneously +# fft_recblk_surr = fft_recblk; +# fft_recblk_surr(interv1,:) = fft_recblk(interv1,:).*ph_interv1; +# fft_recblk_surr(interv2,:) = fft_recblk(interv2,:).*ph_interv2; + +# % Inverse transform +# surrblk(:,:,k)= real(ifft(fft_recblk_surr)); +# end + +# end % End of function + + +# function [Tv,Av] = SyntWave(P,varargin) +# % Function: generate a multi-sinewave +# % +# % Inputs +# % ------ +# % P = [ P1 P2 ... Pn ] Periods +# % varargin = A, T, PH Amplitudes, time, phases +# % +# % A = [ A1 A2 ... An ] Amplitudes +# % T = [ ts tf dt] Time info: from ts to tf with step dt +# % PH = [PH1 PH2 ... PHn] Phases (rad) +# % +# % Outputs +# % ------- +# % Av = SUM(1 to n) of [ A*sin(2*pi*f*t + PH) ] +# % Tv Time (ts to tf with step dt) +# % +# % Default amplitudes are ones +# % Default time is from 0 to largest period (1000 steps) +# % Default phases are zeros + +# % Check for arguments +# if (nargin == 1) +# np = length(P); +# A = ones(1,np); +# T = [0 max(P) max(P)/1000]; +# PH = zeros(1,np); +# elseif (nargin == 2) +# np = length(P); +# A = varargin{1}; +# T = [0 max(P) max(P)/1000]; +# PH = zeros(1,np); +# elseif (nargin == 3) +# np = length(P); +# A = varargin{1}; +# T = varargin{2}; +# PH = zeros(1,np); +# elseif (nargin == 4) +# np = length(P); +# A = varargin{1}; +# T = varargin{2}; +# PH = varargin{3}; +# else +# fprintf(1,'\n'); +# error('Wrong number of arguments'); +# end + +# % Add all waves +# Tv = T(1):T(3):T(2); +# Av = zeros(1,length(Tv)); +# for j = 1:np +# Av = Av + A(j)*sin(2*pi*Tv/P(j)+PH(j)); +# end + +# end % End of function + +