From d986665244202f88b37ba436af84d48d3ec29b1b Mon Sep 17 00:00:00 2001 From: Gabriele Gilardi Date: Fri, 19 Jun 2020 20:21:18 +0900 Subject: [PATCH] add: kalman filter, ABG filters, synthetic wave --- Code_Python/KalmanCourse.py | 183 ------------------------ Code_Python/KalmanFilter.py | 106 -------------- Code_Python/filters.py | 182 ++++++++++++++++++----- Code_Python/test.py | 38 ++--- Code_Python/utils.py | 277 +++++++----------------------------- README.md | 4 +- 6 files changed, 218 insertions(+), 572 deletions(-) delete mode 100644 Code_Python/KalmanCourse.py delete mode 100644 Code_Python/KalmanFilter.py diff --git a/Code_Python/KalmanCourse.py b/Code_Python/KalmanCourse.py deleted file mode 100644 index ef63cdb..0000000 --- a/Code_Python/KalmanCourse.py +++ /dev/null @@ -1,183 +0,0 @@ - -# import numpy as np - -# measurements = np.array([5., 6., 7., 9., 10.]) -# motion = np.array([1., 1., 2., 1., 1.]) -# measurement_sigma = 4. -# motion_sigma = 2. -# mu = 0. -# sigma = 1000. - -# # Measurement -# def Update( mean1, var1, mean2, var2 ): -# mean = (var2*mean1 + var1*mean2) / (var1 + var2) -# var = 1.0 / (1.0/var1 + 1.0/var2) -# return [mean, var] - -# # Motion -# def Predict( mean1, var1, U, varU ): -# mean = mean1 + U -# var = var1 + varU -# return [mean, var] - -# for n in range(len(measurements)): -# [mu, sigma] = Update(mu, sigma, measurements[n], measurement_sigma) -# print('Update : ', n, [mu, sigma]) -# [mu, sigma] = Predict(mu, sigma, motion[n],motion_sigma) -# print('Predict: ', n, [mu, sigma]) - -# print(' ') -# print(Update(1,1,3,1)) - -# ------------------------------------------------------- - -# import numpy as np - -# measurements = [ 1., 2., 3. ] -# dt = 1. - -# # Initial state (location and velocity) -# x = np.array([[ 0. ], -# [ 0. ]]) -# # Initial uncertainty -# P = np.array([[ 1000., 0. ], -# [ 0., 1000. ]]) -# # External motion -# U = np.array([[ 0. ], -# [ 0. ]]) -# # Next state function -# F = np.array([[ 1., dt ], -# [ 0., 1. ]]) -# # Measurement function -# H = np.array([[ 1., 0. ]]) -# # Measurement uncertainty -# R = np.array([[ 1. ]]) -# # Identity matrix -# I = np.eye(2) - - -# def filter(x, P): - -# step = 0 -# for z in (measurements): -# step += 1 -# print("step = ", step, " meas. = ", z) - -# # Measurement -# Htra = H.T -# S = H.dot(P.dot(Htra)) + R -# Sinv = np.linalg.inv(S) -# K = P.dot(Htra.dot(Sinv)) -# y = z - H.dot(x) -# xp = x +K.dot(y) -# Pp = P - K.dot(H.dot(P)) - -# # Prediction -# x = F.dot(xp) + U -# Ftra = F.T -# P = F.dot(Pp.dot(Ftra)) - -# print('x =') -# print(x) -# print('P =') -# print(P) - -# filter(x, P) - -# # ------------------------------------------------------- - -import numpy as np - -x0 = 4. -y0 = 12. -measurements = np.array([[ 5., 10. ], - [ 6., 8. ], - [ 7., 6. ], - [ 8., 4. ], - [ 9., 2. ], - [ 10., 0. ]]) -# x0 = -4. -# y0 = 8. -# measurements = np.array([[ 1., 4. ], -# [ 6., 0. ], -# [ 11., -4. ], -# [ 16., -8. ]]) -# x0 = 1. -# y0 = 19. -# measurements = np.array([[ 1., 17. ], -# [ 1., 15. ], -# [ 1., 13. ], -# [ 1., 11. ]]) -# x0 = 1. -# y0 = 19. -# measurements = np.array([[ 2., 17. ], -# [ 0., 15. ], -# [ 2., 13. ], -# [ 0., 11. ]]) -# Time step -dt = 0.1 -# Initial state (location and velocity) -x = np.array([[ x0 ], - [ y0 ], - [ 0. ], - [ 0. ]]) -# Initial uncertainty -P = np.array([[ 0., 0., 0., 0. ], - [ 0., 0., 0., 0. ], - [ 0., 0., 1000., 0. ], - [ 0., 0., 0., 1000. ]]) -# External motion -U = np.array([[ 0. ], - [ 0. ], - [ 0. ], - [ 0. ]]) -# Next state function -F = np.array([[ 1., 0., dt, 0. ], - [ 0., 1., 0., dt ], - [ 0., 0., 1., 0. ], - [ 0., 0., 0., 1. ]]) -# Measurement function -H = np.array([[ 1., 0., 0., 0. ], - [ 0., 1., 0., 0. ]]) -# Measurement uncertainty -R = np.array([[ 0.1, 0. ], - [ 0. , 0.1 ]]) -# Measurement vector -z = np.zeros((2,1)) - - -def filter(x, P): - - for n in range(len(measurements)): - - z[0][0] = measurements[n][0] - z[1][0] = measurements[n][1] - - # Prediction - xp = F.dot(x) + U - Ftra = F.T - Pp = F.dot(P.dot(Ftra)) - - # Measurement - Htra = H.T - S = H.dot(Pp.dot(Htra)) + R - Sinv = np.linalg.inv(S) - K = Pp.dot(Htra.dot(Sinv)) - y = z - H.dot(xp) - x = xp +K.dot(y) - P = Pp - K.dot(H.dot(Pp)) - # print(z) - # print('x = ') - # print(x) - # print('P = ') - # print(P) - # print(' ') - - return x, P - - -x_final, P_final = filter(x, P) -print('x = ') -print(x_final) -print('P = ') -print(P_final) diff --git a/Code_Python/KalmanFilter.py b/Code_Python/KalmanFilter.py deleted file mode 100644 index a82bb68..0000000 --- a/Code_Python/KalmanFilter.py +++ /dev/null @@ -1,106 +0,0 @@ -""" - -correction = update = measurement -prediction = motion - -X (n_states, 1) State vector -P (n_states, n_states) Covariance matrix of X -F (n_states, n_states) State transition matrix -U (n_states, 1) Input/control/drift vector -Z (n_meas, 1) Measurament vector -H (n_meas, n_states) Measurament matrix -R (n_meas, n_meas) Covariance matrix of Z -S (n_meas, n_meas) Covariance matrix (?) -K (n_states, m) Kalman gain -Q (n_states, n_states) Covariance matrix (?) - -Data (n_meas, n_samples) Measurements -Fext (n_states, n_samples) External driver -X0 (n_states, 1) Initial state vector -P0 (n_states, n_states) Initial covariance matrix of X -""" - -import numpy as np - - -class KalmanFilter: - - def __init__(self, F, H, Q, R): - """ - """ - self.F = F - self.Q = Q - self.H = H - self.R = R - - - def prediction(self, X, P, U): - X = self.F @ X + U - P = self.F @ P @ self.F.T + self.Q - return X, P - - - def update(self, X, P, Z): - """ - """ - S = self.H @ P @ self.H.T + self.R - K = P @ self.H.T @ np.linalg.inv(S) - X = X + K @ (Z - self.H @ X) - P = P - K @ self.H @ P - return X, P - - - def applyFilter(self, Data, Fext, X0, P0): - """ - """ - pass - - -# Measurements -data = np.array([[5., 6., 7., 8., 9., 10.], - [10., 8., 6., 4., 2., 0.]]) - -# Initial state vector -X0 = np.array([[4. ], - [12.], - [0. ], - [0. ]]) - -# Initial covariance matrix of X -P0 = np.array([[0., 0., 0., 0.], - [0., 0., 0., 0.], - [0., 0., 1000., 0.], - [0., 0., 0., 1000.]]) - -# External motion -Fext = np.zeros_like(data) - -# Next state function -dt = 0.1 -F = np.array([[ 1., 0., dt, 0. ], - [ 0., 1., 0., dt ], - [ 0., 0., 1., 0. ], - [ 0., 0., 0., 1. ]]) -# Measurement function -H = np.array([[ 1., 0., 0., 0. ], - [ 0., 1., 0., 0. ]]) -# Measurement uncertainty -R = np.array([[ 0.1, 0. ], - [ 0. , 0.1 ]]) - -def filter(x, P): - - step = 0 - for z in (measurements): - step += 1 - print("step = ", step, " meas. = ", z) - - # Update - - - print('x =') - print(x) - print('P =') - print(P) - -filter(x, P) diff --git a/Code_Python/filters.py b/Code_Python/filters.py index ec5d223..b211bef 100644 --- a/Code_Python/filters.py +++ b/Code_Python/filters.py @@ -46,6 +46,8 @@ InstTrend N/alpha Instantaneous trendline SincFunction N Sinc function Decycler P Decycler, 1-GaussHigh (P>=5) DecyclerOsc P1,P2 Decycle oscillator, GH(P1) - GH(P2), (P1>=5) +ABG +Kalman N Order/smoothing factor/number of previous samples alpha Damping term @@ -87,12 +89,12 @@ def filter_data(X, b, a): class Filter: - def __init__(self, X): + def __init__(self, data): """ """ - self.X = np.asarray(X) + self.data = np.asarray(data) - self.n_samples, self.n_series = X.shape + self.n_samples, self.n_series = data.shape self.idx = 0 def Generic(self, b=1.0, a=1.0): @@ -101,7 +103,7 @@ class Filter: """ b = np.asarray(b) a = np.asarray(a) - Y, self.idx = filter_data(self.X, b, a) + Y, self.idx = filter_data(self.data, b, a) return Y def SMA(self, N=10): @@ -110,7 +112,7 @@ class Filter: """ b = np.ones(N) / N a = np.array([1.0]) - Y, self.idx = filter_data(self.X, b, a) + Y, self.idx = filter_data(self.data, b, a) return Y def EMA(self, N=10, alpha=None): @@ -123,7 +125,7 @@ class Filter: alpha = 2.0 / (N + 1.0) b = np.array([alpha]) a = np.array([1.0, -(1.0 - alpha)]) - Y, self.idx = filter_data(self.X, b, a) + Y, self.idx = filter_data(self.data, b, a) return Y def WMA(self, N=10): @@ -135,7 +137,7 @@ class Filter: w = np.arange(N, 0, -1) b = w / np.sum(w) a = np.array([1.0]) - Y, self.idx = filter_data(self.X, b, a) + Y, self.idx = filter_data(self.data, b, a) return Y def MSMA(self, N=10): @@ -149,7 +151,7 @@ class Filter: w[N] = 0.5 b = w / N a = np.array([1.0]) - Y, self.idx = filter_data(self.X, b, a) + Y, self.idx = filter_data(self.data, b, a) return Y def MLSQ(self, N=5): @@ -170,11 +172,11 @@ class Filter: b = np.array([-11.0, 18.0, 88.0, 138.0, 168.0, 178.0, 168.0, 138.0, 88.0, 18.0, -11.0]) / 980.0 else: - Y = self.X.copy() + print("Warning: data returned unfiltered (wrong N)") self.idx = 0 - return Y + return self.data a = np.array([1.0]) - Y, self.idx = filter_data(self.X, b, a) + Y, self.idx = filter_data(self.data, b, a) return Y def ButterOrig(self, N=2, P=10): @@ -196,10 +198,10 @@ class Filter: a = np.array([1.0, - (alpha + beta ** 2.0), (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) else: - Y = self.X.copy() + print("Warning: data returned unfiltered (wrong N)") self.idx = 0 - return Y - Y, self.idx = filter_data(self.X, b, a) + return self.data + Y, self.idx = filter_data(self.data, b, a) return Y def ButterMod(self, N=2, P=10): @@ -220,10 +222,10 @@ class Filter: a = np.array([1.0, - (alpha + beta ** 2.0), (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) else: - Y = self.X.copy() + print("Warning: data returned unfiltered (wrong N)") self.idx = 0 - return Y - Y, self.idx = filter_data(self.X, b, a) + return self.data + Y, self.idx = filter_data(self.data, b, a) return Y def SuperSmooth(self, N=2, P=10): @@ -246,10 +248,10 @@ class Filter: a = np.array([1.0, - (alpha + beta ** 2.0), (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) else: - Y = self.X.copy() + print("Warning: data returned unfiltered (wrong N)") self.idx = 0 - return Y - Y, self.idx = filter_data(self.X, b, a) + return self.data + Y, self.idx = filter_data(self.data, b, a) return Y def GaussLow(self, N=1, P=2): @@ -259,16 +261,16 @@ class Filter: Must be P > 1. If not returns the unfiltered dataset. """ if (P < 2): - Y = self.X.copy() + print("Warning: data returned unfiltered (P < 2)") self.idx = 0 - return Y + return self.data A = 2.0 ** (1.0 / N) - 1.0 B = 4.0 * np.sin(np.pi / P) ** 2.0 C = 2.0 * (np.cos(2.0 * np.pi / P) - 1.0) alpha = (-B + np.sqrt(B ** 2.0 - 4.0 * A * C)) / (2.0 * A) b = np.array([alpha]) a = np.array([1.0, - (1.0 - alpha)]) - Y = self.X.copy() + Y = self.data.copy() for i in range(N): Y, self.idx = filter_data(Y, b, a) return Y @@ -280,16 +282,16 @@ class Filter: Must be P > 4. If not returns the unfiltered dataset. """ if (P < 5): - Y = self.X.copy() + print("Warning: data returned unfiltered (P < 5)") self.idx = 0 - return Y + return self.data A = 2.0 ** (1.0 / N) * np.sin(np.pi / P) ** 2.0 - 1.0 B = 2.0 * (2.0 ** (1.0 / N) - 1.0) * (np.cos(2.0 * np.pi / P) - 1.0) C = - B alpha = (-B - np.sqrt(B ** 2.0 - 4.0 * A * C)) / (2.0 * A) b = np.array([1.0 - alpha / 2.0, -(1.0 - alpha / 2.0)]) a = np.array([1.0, - (1.0 - alpha)]) - Y = self.X - self.X[0, :] + Y = self.data - self.data[0, :] for i in range(N): Y, self.idx = filter_data(Y, b, a) return Y @@ -306,7 +308,7 @@ class Filter: alpha = 1.0 / gamma - np.sqrt(1.0 / gamma ** 2 - 1.0) b = np.array([(1.0 - alpha) / 2.0, 0.0, - (1.0 - alpha) / 2.0]) a = np.array([1.0, - beta * (1.0 + alpha), alpha]) - Y, self.idx = filter_data(self.X, b, a) + Y, self.idx = filter_data(self.data, b, a) return Y def BandStop(self, P=5, delta=0.3): @@ -322,7 +324,7 @@ class Filter: b = np.array([(1.0 + alpha) / 2.0, - beta * (1.0 + alpha), (1.0 + alpha) / 2.0]) a = np.array([1.0, -beta * (1.0 + alpha), alpha]) - Y, self.idx = filter_data(self.X, b, a) + Y, self.idx = filter_data(self.data, b, a) return Y def ZEMA1(self, N=10, alpha=None, K=1.0, Vn=5): @@ -337,7 +339,7 @@ class Filter: b[0] = alpha * (1.0 + K) b[Vn] = - alpha * K a = np.array([1.0, - (1.0 - alpha)]) - Y, self.idx = filter_data(self.X, b, a) + Y, self.idx = filter_data(self.data, b, a) return Y def ZEMA2(self, N=10, alpha=None, K=1.0): @@ -350,7 +352,7 @@ class Filter: alpha = 2.0 / (N + 1.0) b = np.array([alpha * (1.0 + K)]) a = np.array([1.0, alpha * K - (1.0 - alpha)]) - Y, self.idx = filter_data(self.X, b, a) + Y, self.idx = filter_data(self.data, b, a) return Y def InstTrend(self, N=10, alpha=None): @@ -364,7 +366,7 @@ class Filter: b = np.array([alpha - alpha ** 2.0 / 4.0, alpha ** 2.0 / 2.0, - alpha + 3.0 * alpha ** 2.0 / 4.0]) a = np.array([1.0, - 2.0 * (1.0 - alpha), (1.0 - alpha) ** 2.0]) - Y, self.idx = filter_data(self.X, b, a) + Y, self.idx = filter_data(self.data, b, a) return Y def SincFunction(self, N=10, nel=10): @@ -378,8 +380,8 @@ class Filter: k = np.arange(1, nel) b[1:] = np.sin(np.pi * k / N) / (np.pi * k) a = np.array([1.0]) - Y, self.idx = filter_data(self.X, b, a) - return Y, b, a + Y, self.idx = filter_data(self.data, b, a) + return Y def Decycler(self, P=10): """ @@ -389,10 +391,10 @@ class Filter: Must be P > 4. If not returns the unfiltered dataset. """ if (P < 5): - Y = self.X.copy() + print("Warning: data returned unfiltered (P < 5)") self.idx = 0 - return Y - Y = self.X - self.GaussHigh(N=1, P=P) + return self.data + Y = self.data - self.GaussHigh(N=1, P=P) return Y def DecyclerOsc(self, P1=5, P2=10): @@ -405,9 +407,111 @@ class Filter: """ P_low = np.amin([P1, P2]) P_high = np.amax([P1, P2]) - if (P1 < 5): - Y = self.X.copy() + if (P_low < 5): + print("Warning: data returned unfiltered (P_low < 5)") self.idx = 0 - return Y + return self.data Y = self.GaussHigh(N=2, P=P_low) - self.GaussHigh(N=2, P=P_high) return Y + + def ABG(self, alpha=0.0, beta=0.0, gamma=0.0, dt=1.0): + """ + alpha-beta-gamma + + For numerical stability: 0 < alpha, beta < 1 + """ + # If necessary change scalars to arrays + if (np.ndim(alpha) == 0): + alpha = np.ones(self.n_samples) * alpha + if (np.ndim(beta) == 0): + beta = np.ones(self.n_samples) * beta + if (np.ndim(gamma) == 0): + gamma = np.ones(self.n_samples) * gamma + + # Initialize + Y_corr = self.data.copy() + Y_pred = self.data.copy() + x0 = self.data[0,:] + v0 = np.zeros(self.n_series) + a0 = np.zeros(self.n_series) + for i in range(1, self.n_samples): + + # Predictor (predicts state in ) + x_pred = x0 + dt * v0 + 0.5 * a0 * dt ** 2 + v_pred = v0 + dt * a0 + a_pred = a0 + Y_pred[i, :] = x_pred + + # Residual (innovation) + r = self.data[i, :] - x_pred + + # Corrector (corrects state in ) + x_corr = x_pred + alpha[i] * r + v_corr = v_pred + (beta[i] / dt) * r + a_corr = a_pred + (2.0 * gamma[i] / dt ** 2) *r + + # Save value and prepare next iteration + Y_corr[i, :] = x_corr + x0 = x_corr + v0 = v_corr + a0 = a_corr + + self.idx = 1 + + return Y_corr, Y_pred + + + def Kalman(self, sigma_x, sigma_v, dt, abg_type="abg"): + """ + Steady-state Kalman filter (also limited to one-dimension) + """ + L = (sigma_x / sigma_v) * dt ** 2 + + # Alpha filter + if (abg_type == 'a'): + alpha = (-L ** 2 + np.sqrt(L ** 4 + 16.0 * L ** 2)) / 8.0 + beta = 0.0 + gamma = 0.0 + + # Alpha-Beta filter + elif(abg_type == 'ab'): + r = (4.0 + L - np.sqrt(8.0 * L + L ** 2)) / 4.0 + alpha = 1.0 - r ** 2 + beta = 2.0 * (2.0 - alpha) - 4.0 * np.sqrt(1.0 - alpha) + gamma = 0.0 + + #Alpha-Beta-Gamma filter + else: + b = (L / 2.0) - 3.0 + c = (L / 2.0) + 3.0 + d = -1.0 + p = c - b **2 /3.0 + q = 2.0 * b **3 /27.0 - b * c /3.0 + d + v = np.sqrt(q ** 2 + 4.0 * p ** 3 / 27.0) + z = - (q + v / 2.0) ** (1.0 / 3.0) + s = z - p / (3.0 * z) - b / 3.0 + alpha = 1.0 - s ** 2 + beta = 2.0 * (1 - s) ** 2 + gamma = beta ** 2 / (2.0 * alpha) + + # Apply filter + Y = self.abg(alpha=alpha, beta=beta, gamma=gamma, dt=dt) + + return Y + +""" +correction = update = measurement +prediction = motion + +X (n_states, 1) State estimate +P (n_states, n_states) Covariance estimate +F (n_states, n_states) State transition model +Z (n_obs, 1) Observations +H (n_obs, n_states) Observation model +R (n_obs, n_obs) Covariance of the observation noise +S (n_obs, n_obs) Covariance of the observation residual +K (n_states, n_obs) Optimal Kalman gain +Q (n_states, n_states) Covariance of the process noise matrix +Y (n_obs, 1) Observation residual (innovation) + +""" \ No newline at end of file diff --git a/Code_Python/test.py b/Code_Python/test.py index a0ae1e5..36456db 100644 --- a/Code_Python/test.py +++ b/Code_Python/test.py @@ -4,22 +4,22 @@ Filters for time series. Copyright (c) 2020 Gabriele Gilardi ToDo: -- use NaN/input values for points not filtered? -- return idx? -- util to test filter (impulse, utils) -- warning in filter when wrong order? or save flag with true/false if computed -- use self.a and self.b -- remove a and b from plots - in comments write what filters do - is necessary to copy X for Y untouched? - decide default values in functions - check conditions on P and N +- why lag plot gives errors +- fix plotting function +- example for alpha-beta-gamma using variable sigma as in financial time series + (see Ehler) +- example using noisy multi-sine-waves """ import sys import numpy as np import filters as flt import utils as utl +import matplotlib.pyplot as plt # Read data to filter if len(sys.argv) != 2: @@ -42,15 +42,19 @@ spx = flt.Filter(data) # aa = np.array([1.0, alpha - 1.0]) -res, bb, aa = spx.SincFunction(2, 50) -print(bb) -print(aa) -utl.plot_frequency_response(bb, aa) -utl.plot_lag_response(bb, aa) +# res, bb, aa = spx.SincFunction(2, 50) +# print(bb) +# print(aa) +# utl.plot_frequency_response(bb, aa) +# utl.plot_lag_response(bb, aa) +# sigma_x = 0.1 +# sigma_v = 0.1 * np.ones(n_samples) +# res = spx.Kalman(sigma_x=sigma_x, sigma_v=sigma_v, dt=1.0, abg_type="abg") +alpha = 0.5 +beta = 0.005 +gamma = 0.0 +Yc, Yp = spx.ABG(alpha=alpha, beta=beta, gamma=gamma, dt=1.0) +signals = (spx.data[:, 0], Yc[:, 0], Yp[:, 0]) +utl.plot_signals(signals, 0, 50) + -# res = spx.DecyclerOsc(30, 60) -# print(res[0:10, :]) -signals = (spx.X, res) -print(spx.idx) -utl.plot_signals(signals) -# print(spx.X[0:20]) \ No newline at end of file diff --git a/Code_Python/utils.py b/Code_Python/utils.py index 8a47d0d..1abad4a 100644 --- a/Code_Python/utils.py +++ b/Code_Python/utils.py @@ -63,61 +63,20 @@ def scale_data(X, param=()): return Xs, param -def calc_rmse(a, b): - """ - Calculates the root-mean-square-error of arrays and . If the arrays - are multi-column, the RMSE is calculated as all the columns are one single - vector. - """ - # Convert to (n, ) dimension - a = a.flatten() - b = b.flatten() - - # Root-mean-square-error - rmse = np.sqrt(((a - b) ** 2).sum() / len(a)) - - return rmse - - -def calc_corr(a, b): - """ - Calculates the correlation between arrays and . If the arrays are - multi-column, the correlation is calculated as all the columns are one - single vector. - """ - # Convert to (n, ) dimension - a = a.flatten() - b = b.flatten() - - # Correlation - corr = np.corrcoef(a, b)[0, 1] - - return corr - - -def calc_accu(a, b): - """ - Calculates the accuracy (in %) between arrays and . The two arrays - must be column/row vectors. - """ - # Convert to (n, ) dimension - a = a.flatten() - b = b.flatten() - - # Correlation - accu = 100.0 * (a == b).sum() / len(a) - - return accu - - -def plot_signals(signals): +def plot_signals(signals, idx_start=0, idx_end=None): """ """ + if (idx_end is None): + idx_end = len(signals[0]) + t = np.arange(idx_start, idx_end) + names = [] + count = 0 for signal in signals: - plt.plot(signal) - + plt.plot(t, signal[idx_start:idx_end]) + names.append(str(count)) + count += 1 plt.grid(b=True) - plt.xlim(0, 100) + plt.legend(names) plt.show() @@ -138,8 +97,6 @@ def plot_frequency_response(b, a=1.0): # plt.ylim(-40.0, 0.0) plt.xlabel('$\omega$ [rad/sample]') plt.ylabel('$h$ [db]') - plt.title('b = ' + np.array_str(np.around(b, decimals=2)) \ - + ', a = ' + np.array_str(np.around(a, decimals=2))) plt.show() @@ -157,128 +114,53 @@ def plot_lag_response(b, a=1.0): plt.xlim(np.amin(wf), np.amax(wf)) plt.xlabel('$\omega$ [rad/sample]') plt.ylabel('$gd$ [samples]') - plt.title('b = ' + np.array_str(np.around(b, decimals=2)) \ - + ', a = ' + np.array_str(np.around(a, decimals=2))) plt.show() +def synthetic_wave(per, amp=None, pha=None, tim=None): + """ + Generates a multi-sinewave. + P = [ P1 P2 ... Pn ] Periods + varargin = A, T, PH Amplitudes, time, phases + A = [ A1 A2 ... An ] Amplitudes + T = [ ts tf dt] Time info: from ts to tf in dt steps + PH = [PH1 PH2 ... PHn] Phases (rad) -# function [y,t] = interpSignal(x,n1,n2,varargin) -# if (nargin == 4) -# dt = varargin{1}; -# else -# dt = 20; -# end -# N = length(x); -# Ts = (n2-n1)/(N-1); -# n = n1:Ts:n2; -# t = n1:Ts/dt:n2; -# nt = length(t); -# y = zeros(1,nt); -# for i = 1:nt -# a = x.*sinc( (t(i)-n)/Ts ); -# y(i) = sum(a); -# end -# end - - -# function [f] = plotFn(type,func,varargin) -# % 0 Plot real function -# % 1 Plot real/imag values -# % 2 Plot real/imag values in polar form -# % 3 Plot magnitude/phase - -# clf -# tiny = 1e-7; - -# % Check if func is a function or data -# if ( isa(func,"function_handle") ) -# N = varargin{1}; -# n = (0:N-1)'; -# f = func(n); -# else -# N = length(func); -# n = (0:N-1)'; -# f = func; -# end - -# % Clean data -# xRe = real(f); -# xIm = imag(f); -# xRe( abs(xRe) < tiny ) = 0; -# xIm( abs(xIm) < tiny ) = 0; - -# switch (type) - -# % Plot real function -# case 0 - -# stem(n,xRe,"b","filled") -# xlim([n(1) n(N)]) -# grid on -# xlabel("n") -# ylabel("f") -# box on - -# % Plot real/imag function -# case 1 -# subplot(2,1,1) -# stem(n,xRe,"b","filled") -# xlim([n(1) n(N)]) -# grid on -# xlabel("n") -# ylabel("Re") -# box on -# subplot(2,1,2) -# stem(n,xIm,"b","filled") -# xlim([n(1) n(N)]) -# grid on -# xlabel("n") -# ylabel("Im") -# box on - -# % Plot real/imag function in polar form -# case 2 -# scatter(xRe,xIm,"b","filled") -# maxRe = max( abs(xRe) ); -# maxIm = max( abs(xIm) ); -# m = max(maxRe,maxIm); -# dx = 2*m/50; -# text(xRe+dx,xIm,num2str(n)) -# xlim( [-m +m ]) -# ylim( [-m +m ]) -# axis("square") -# grid on -# hold on -# plot([-m 0; +m 0],[0 -m; 0 +m],"k") -# hold off -# xlabel("Real") -# ylabel("Imag") -# box on - -# % Plot magnitude/phase -# case 3 -# xMa = sqrt( xRe.^2 + xIm.^2 ); -# xAr = atan2(xIm,xRe); -# subplot(2,1,1) -# stem(n,xMa,"b","filled") -# xlim([n(1) n(N)]) -# grid on -# xlabel("n") -# ylabel("Magnitude") -# box on -# subplot(2,1,2) -# stem(n,xAr,"b","filled") -# xlim([n(1) n(N)]) -# ylim([-pi pi]) -# grid on -# xlabel("n") -# ylabel("Phase [rad]") -# box on + Av = SUM(1 to n) of [ A*sin(2*pi*f*t + PH) ] + Tv Time (ts to tf with step dt) -# end - -# end + Default amplitudes are ones + Default time is from 0 to largest period (1000 steps) + Default phases are zeros + """ + n_waves = len(per) + per = np.asarray(per) + + # Check for amplitudes, times, and phases + if (amp is None): + amp = np.ones(n_waves) + else: + amp = np.asarray(amp) + if (tim is None): + t_start = 0.0 + t_end = np.amax(per) + n_steps = 500 + else: + t_start = tim[0] + t_end = tim[1] + n_steps = int(tim[2]) + if (pha is None): + pha = np.zeros(n_waves) + else: + pha = np.asarray(pha) + + # Add all the waves + t = np.linspace(t_start, t_end, num=n_steps) + f = np.zeros(len(t)) + for i in range(n_waves): + f = f + amp[i] * np.sin(2.0 * np.pi * t / per[i] + pha[i]) + + return t, f # function sP = SyntQT(P,type) @@ -406,60 +288,5 @@ def plot_lag_response(b, a=1.0): # end % End of function -# function [Tv,Av] = SyntWave(P,varargin) -# % Function: generate a multi-sinewave -# % -# % Inputs -# % ------ -# % P = [ P1 P2 ... Pn ] Periods -# % varargin = A, T, PH Amplitudes, time, phases -# % -# % A = [ A1 A2 ... An ] Amplitudes -# % T = [ ts tf dt] Time info: from ts to tf with step dt -# % PH = [PH1 PH2 ... PHn] Phases (rad) -# % -# % Outputs -# % ------- -# % Av = SUM(1 to n) of [ A*sin(2*pi*f*t + PH) ] -# % Tv Time (ts to tf with step dt) -# % -# % Default amplitudes are ones -# % Default time is from 0 to largest period (1000 steps) -# % Default phases are zeros - -# % Check for arguments -# if (nargin == 1) -# np = length(P); -# A = ones(1,np); -# T = [0 max(P) max(P)/1000]; -# PH = zeros(1,np); -# elseif (nargin == 2) -# np = length(P); -# A = varargin{1}; -# T = [0 max(P) max(P)/1000]; -# PH = zeros(1,np); -# elseif (nargin == 3) -# np = length(P); -# A = varargin{1}; -# T = varargin{2}; -# PH = zeros(1,np); -# elseif (nargin == 4) -# np = length(P); -# A = varargin{1}; -# T = varargin{2}; -# PH = varargin{3}; -# else -# fprintf(1,'\n'); -# error('Wrong number of arguments'); -# end - -# % Add all waves -# Tv = T(1):T(3):T(2); -# Av = zeros(1,length(Tv)); -# for j = 1:np -# Av = Av + A(j)*sin(2*pi*Tv/P(j)+PH(j)); -# end - -# end % End of function diff --git a/README.md b/README.md index 34eb85e..cc4cfce 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,9 @@ ## Reference -- [1] John F. Ehlers, "[Cycle Analytics for Traders: Advanced Technical Trading Concepts](http://www.mesasoftware.com/ehlers_books.htm)". +- John F. Ehlers, "[Cycle Analytics for Traders: Advanced Technical Trading Concepts](http://www.mesasoftware.com/ehlers_books.htm)". -- [2] John F. Ehlers, "[Signal Analysis, Filters And Trading Strategies](http://www.mesasoftware.com/ehlers_technical_papers.htm)". +- Wikipedia, "[Alpha beta filter](https://en.wikipedia.org/wiki/Alpha_beta_filter)". ## Characteristics