From f080286c3d840c4f93d912bd0a36f998757dd981 Mon Sep 17 00:00:00 2001 From: Gabriele Gilardi Date: Sun, 21 Jun 2020 20:04:02 +0900 Subject: [PATCH] add functions value2diff and diff2value and re-organize --- Code_Python/filters.py | 347 +++++++++++++++---------- Code_Python/{utils.py => synthetic.py} | 134 +++++----- Code_Python/test.py | 52 ++-- 3 files changed, 309 insertions(+), 224 deletions(-) rename Code_Python/{utils.py => synthetic.py} (60%) diff --git a/Code_Python/filters.py b/Code_Python/filters.py index b211bef..de79e20 100644 --- a/Code_Python/filters.py +++ b/Code_Python/filters.py @@ -24,6 +24,7 @@ Notes: - the filter is applied starting from index. - non filtered data are set equal to the original input, i.e. Y[0:idx-1,:] = X[0:idx-1,:] +- if n_series = 1 then must be ( ..., 1) Filters: @@ -56,28 +57,59 @@ delta Band centered in P and in fraction (30% of P => 0.3, = 0.3*P, if P = 12 => 0.3*12 = 4) K Coefficient/gain Vn Look back sample (for the momentum) + +correction = update = measurement +prediction = motion + +X (n_states, 1) State estimate +P (n_states, n_states) Covariance estimate +F (n_states, n_states) State transition model +Z (n_obs, 1) Observations +H (n_obs, n_states) Observation model +R (n_obs, n_obs) Covariance of the observation noise +S (n_obs, n_obs) Covariance of the observation residual +K (n_states, n_obs) Optimal Kalman gain +Q (n_states, n_states) Covariance of the process noise matrix +Y (n_obs, 1) Observation residual (innovation) """ -import sys import numpy as np -import utils as utl +from scipy import signal -def filter_data(X, b, a): +def plot_signals(signals, idx_start=0, idx_end=None): + """ + signals must be all same lenght + """ + if (idx_end is None): + idx_end = len(signals[0]) + t = np.arange(idx_start, idx_end) + names = [] + count = 0 + for signal in signals: + plt.plot(t, signal[idx_start:idx_end]) + names.append(str(count)) + count += 1 + plt.grid(b=True) + plt.legend(names) + plt.show() + + +def filter_data(data, b, a): """ Applies a filter with transfer response coefficients and . """ - n_samples, n_series = X.shape + n_samples, n_series = data.shape nb = len(b) na = len(a) - idx = np.amax([0, nb - 1, na - 1]) - Y = X.copy() + idx = np.amax([0, nb-1, na-1]) + Y = data.copy() for i in range(idx, n_samples): tmp = np.zeros(n_series) for j in range(nb): - tmp = tmp + b[j] * X[i-j, :] # Numerator term + tmp = tmp + b[j] * data[i-j, :] # Numerator term for j in range(1, na): tmp = tmp - a[j] * Y[i-j, :] # Denominator term @@ -93,300 +125,334 @@ class Filter: """ """ self.data = np.asarray(data) - self.n_samples, self.n_series = data.shape - self.idx = 0 def Generic(self, b=1.0, a=1.0): """ Filter with generic transfer response coefficients and . """ - b = np.asarray(b) - a = np.asarray(a) - Y, self.idx = filter_data(self.data, b, a) + self.b = np.asarray(b) + self.a = np.asarray(a) + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def SMA(self, N=10): """ Simple moving average (?? order, FIR, ?? band). """ - b = np.ones(N) / N - a = np.array([1.0]) - Y, self.idx = filter_data(self.data, b, a) + self.b = np.ones(N) / N + self.a = np.array([1.0]) + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def EMA(self, N=10, alpha=None): """ Exponential moving average (?? order, IIR, pass ??). - If not given, is determined as equivalent to a N-SMA. """ if (alpha is None): alpha = 2.0 / (N + 1.0) - b = np.array([alpha]) - a = np.array([1.0, -(1.0 - alpha)]) - Y, self.idx = filter_data(self.data, b, a) + + self.b = np.array([alpha]) + self.a = np.array([1.0, - (1.0 - alpha)]) + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def WMA(self, N=10): """ Weighted moving average (?? order, FIR, pass ??). - Example: N = 5 --> [5.0, 4.0, 3.0, 2.0, 1.0] / 15.0 """ w = np.arange(N, 0, -1) - b = w / np.sum(w) - a = np.array([1.0]) - Y, self.idx = filter_data(self.data, b, a) + + self.b = w / np.sum(w) + self.a = np.array([1.0]) + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def MSMA(self, N=10): """ Modified simple moving average (?? order, FIR, pass ??). - Example: N = 4 --> [0.5, 1.0, 1.0, 1.0, 0.5] / 4.0 """ w = np.ones(N+1) w[0] = 0.5 w[N] = 0.5 - b = w / N - a = np.array([1.0]) - Y, self.idx = filter_data(self.data, b, a) + + self.b = w / N + self.a = np.array([1.0]) + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def MLSQ(self, N=5): """ Modified simple moving average (?? order, FIR, pass ??). - Only N = 5, 7, 9, and 11 are implemented. If not returns the unfiltered dataset. """ if (N == 5): - b = np.array([7.0, 24.0, 34.0, 24.0, 7.0]) / 96.0 + w = np.array([7.0, 24.0, 34.0, 24.0, 7.0]) / 96.0 + elif (N == 7): - b = np.array([1.0, 6.0, 12.0, 14.0, 12.0, 6.0, 1.0]) / 52.0 + w = np.array([1.0, 6.0, 12.0, 14.0, 12.0, 6.0, 1.0]) / 52.0 + elif (N == 9): - b = np.array([-1.0, 28.0, 78.0, 108.0, 118.0, 108.0, 78.0, 28.0, + w = np.array([-1.0, 28.0, 78.0, 108.0, 118.0, 108.0, 78.0, 28.0, -1.0]) / 544.0 + elif (N == 11): - b = np.array([-11.0, 18.0, 88.0, 138.0, 168.0, 178.0, 168.0, - 138.0, 88.0, 18.0, -11.0]) / 980.0 + w = np.array([-11.0, 18.0, 88.0, 138.0, 168.0, 178.0, 168.0, 138.0, + 88.0, 18.0, -11.0]) / 980.0 + else: print("Warning: data returned unfiltered (wrong N)") self.idx = 0 return self.data - a = np.array([1.0]) - Y, self.idx = filter_data(self.data, b, a) + + self.b = w + self.a = np.array([1.0]) + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def ButterOrig(self, N=2, P=10): """ Butterworth original version (?? order, IIR, pass ??). - Only N = 2 and 3 are implemented. If not returns the unfiltered dataset. """ if (N == 2): beta = np.exp(-np.sqrt(2.0) * np.pi / P) alpha = 2.0 * beta * np.cos(np.sqrt(2.0) * np.pi / P) - b = np.array([1.0, 2.0, 1.0]) * (1.0 - alpha + beta ** 2.0) / 4.0 - a = np.array([1.0, -alpha, beta ** 2.0]) + wb = np.array([1.0, 2.0, 1.0]) * (1.0 - alpha + beta ** 2.0) / 4.0 + wa = np.array([1.0, - alpha, beta ** 2.0]) + elif (N == 3): beta = np.exp(-np.pi / P) alpha = 2.0 * beta * np.cos(np.sqrt(3.0) * np.pi / P) - b = np.array([1.0, 3.0, 3.0, 1.0]) \ - * (1.0 - alpha + beta ** 2.0) * (1.0 - beta ** 2.0) / 8.0 - a = np.array([1.0, - (alpha + beta ** 2.0), - (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) + wb = np.array([1.0, 3.0, 3.0, 1.0]) * (1.0 - beta ** 2.0) \ + * (1.0 - alpha + beta ** 2.0) / 8.0 + wa = np.array([1.0, - (alpha + beta ** 2.0), + (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) + else: print("Warning: data returned unfiltered (wrong N)") self.idx = 0 return self.data - Y, self.idx = filter_data(self.data, b, a) + + self.b = wb + self.a = wa + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def ButterMod(self, N=2, P=10): """ Butterworth modified version (?? order, IIR, pass ??). - Only N = 2 and 3 are implemented. If not returns the unfiltered dataset. """ if (N == 2): beta = np.exp(-np.sqrt(2.0) * np.pi / P) alpha = 2.0 * beta * np.cos(np.sqrt(2.0) * np.pi / P) - b = np.array([1.0 - alpha + beta ** 2.0]) - a = np.array([1.0, -alpha, beta ** 2.0]) + wb = np.array([1.0 - alpha + beta ** 2.0]) + wa = np.array([1.0, - alpha, beta ** 2.0]) + elif (N == 3): beta = np.exp(-np.pi / P) alpha = 2.0 * beta * np.cos(np.sqrt(3.0) * np.pi / P) - b = np.array([1.0 - alpha * (1.0 - beta ** 2.0) - beta ** 4.0]) - a = np.array([1.0, - (alpha + beta ** 2.0), - (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) + wb = np.array([1.0 - alpha * (1.0 - beta ** 2.0) - beta ** 4.0]) + wa = np.array([1.0, - (alpha + beta ** 2.0), + (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) + else: print("Warning: data returned unfiltered (wrong N)") self.idx = 0 return self.data - Y, self.idx = filter_data(self.data, b, a) + + self.b = wb + self.a = wa + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def SuperSmooth(self, N=2, P=10): """ SuperSmooth (?? order, IIR, pass ??). - Only N = 2 and 3 are implemented. If not returns the unfiltered dataset. """ if (N == 2): beta = np.exp(-np.sqrt(2.0) * np.pi / P) alpha = 2.0 * beta * np.cos(np.sqrt(2.0) * np.pi / P) - w = 1.0 - alpha + beta ** 2.0 - b = np.array([w, w]) / 2.0 - a = np.array([1.0, - alpha, beta ** 2.0]) + wb = np.array([1.0, 1.0]) * (1.0 - alpha + beta ** 2.0) / 2.0 + wa = np.array([1.0, - alpha, beta ** 2.0]) + elif (N == 3): beta = np.exp(-np.pi / P) alpha = 2.0 * beta * np.cos(1.738 * np.pi / P) - w = 1.0 - alpha * (1.0 - beta ** 2.0) - beta ** 4.0 - b = np.array([w, w]) / 2.0 - a = np.array([1.0, - (alpha + beta ** 2.0), - (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) + wb = np.array([1.0, 1.0]) \ + * (1.0 - alpha * (1.0 - beta ** 2.0) - beta ** 4.0) / 2.0 + wa = np.array([1.0, - (alpha + beta ** 2.0), + (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) + else: print("Warning: data returned unfiltered (wrong N)") self.idx = 0 return self.data - Y, self.idx = filter_data(self.data, b, a) + + self.b = wb + self.a = wa + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def GaussLow(self, N=1, P=2): """ Gauss low pass (IIR, N-th order, low pass). - Must be P > 1. If not returns the unfiltered dataset. """ if (P < 2): print("Warning: data returned unfiltered (P < 2)") self.idx = 0 return self.data + A = 2.0 ** (1.0 / N) - 1.0 B = 4.0 * np.sin(np.pi / P) ** 2.0 C = 2.0 * (np.cos(2.0 * np.pi / P) - 1.0) alpha = (-B + np.sqrt(B ** 2.0 - 4.0 * A * C)) / (2.0 * A) - b = np.array([alpha]) - a = np.array([1.0, - (1.0 - alpha)]) + + self.b = np.array([alpha]) + self.a = np.array([1.0, - (1.0 - alpha)]) Y = self.data.copy() for i in range(N): - Y, self.idx = filter_data(Y, b, a) + Y, self.idx = filter_data(Y, self.b, self.a) + return Y def GaussHigh(self, N=1, P=5): """ Gauss high pass (IIR, Nth order, high pass). - Must be P > 4. If not returns the unfiltered dataset. """ if (P < 5): print("Warning: data returned unfiltered (P < 5)") self.idx = 0 return self.data + A = 2.0 ** (1.0 / N) * np.sin(np.pi / P) ** 2.0 - 1.0 B = 2.0 * (2.0 ** (1.0 / N) - 1.0) * (np.cos(2.0 * np.pi / P) - 1.0) C = - B alpha = (-B - np.sqrt(B ** 2.0 - 4.0 * A * C)) / (2.0 * A) - b = np.array([1.0 - alpha / 2.0, -(1.0 - alpha / 2.0)]) - a = np.array([1.0, - (1.0 - alpha)]) + + self.b = np.array([1.0 - alpha / 2.0, -(1.0 - alpha / 2.0)]) + self.a = np.array([1.0, - (1.0 - alpha)]) Y = self.data - self.data[0, :] for i in range(N): - Y, self.idx = filter_data(Y, b, a) + Y, self.idx = filter_data(Y, self.b, self.a) + return Y def BandPass(self, P=5, delta=0.3): """ Band-pass (type, order, IIR). - Example: delta = 0.3, P = 12 (30% of P => 0.3, = 0.3*P, if P = 12 => 0.3*12 = 4) """ beta = np.cos(2.0 * np.pi / P) gamma = np.cos(4.0 * np.pi * delta / P) alpha = 1.0 / gamma - np.sqrt(1.0 / gamma ** 2 - 1.0) - b = np.array([(1.0 - alpha) / 2.0, 0.0, - (1.0 - alpha) / 2.0]) - a = np.array([1.0, - beta * (1.0 + alpha), alpha]) - Y, self.idx = filter_data(self.data, b, a) + + self.b = np.array([(1.0 - alpha) / 2.0, 0.0, - (1.0 - alpha) / 2.0]) + self.a = np.array([1.0, - beta * (1.0 + alpha), alpha]) + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def BandStop(self, P=5, delta=0.3): """ - band-stop (type, order, IIR) - + Band-stop (type, order, IIR) Example: delta = 0.3, P = 12 (30% of P => 0.3, = 0.3*P, if P = 12 => 0.3*12 = 4) """ beta = np.cos(2.0 * np.pi / P) gamma = np.cos(4.0 * np.pi * delta / P) alpha = 1.0 / gamma - np.sqrt(1.0 / gamma ** 2 - 1.0) - b = np.array([(1.0 + alpha) / 2.0, - beta * (1.0 + alpha), - (1.0 + alpha) / 2.0]) - a = np.array([1.0, -beta * (1.0 + alpha), alpha]) - Y, self.idx = filter_data(self.data, b, a) + + self.b = np.array([1.0, - 2.0 * beta, 1.0]) * (1.0 + alpha) / 2.0 + self.a = np.array([1.0, - beta * (1.0 + alpha), alpha]) + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def ZEMA1(self, N=10, alpha=None, K=1.0, Vn=5): """ Zero lag Exponential Moving Average (type 1). - If not given, is determined as equivalent to a N-SMA. """ if (alpha is None): alpha = 2.0 / (N + 1.0) - b = np.zeros(Vn+1) - b[0] = alpha * (1.0 + K) - b[Vn] = - alpha * K - a = np.array([1.0, - (1.0 - alpha)]) - Y, self.idx = filter_data(self.data, b, a) + + w = np.zeros(Vn+1) + w[0] = alpha * (1.0 + K) + w[Vn] = - alpha * K + + self.b = w + self.a = np.array([1.0, - (1.0 - alpha)]) + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def ZEMA2(self, N=10, alpha=None, K=1.0): """ Zero lag Exponential Moving Average (type 2). - If not given, is determined as equivalent to a N-SMA. """ if (alpha is None): alpha = 2.0 / (N + 1.0) - b = np.array([alpha * (1.0 + K)]) - a = np.array([1.0, alpha * K - (1.0 - alpha)]) - Y, self.idx = filter_data(self.data, b, a) + + self.b = np.array([alpha * (1.0 + K)]) + self.a = np.array([1.0, alpha * K - (1.0 - alpha)]) + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def InstTrend(self, N=10, alpha=None): """ Instantaneous Trendline (2nd order, IIR, low pass). - If not given, is determined as equivalent to a N-SMA. """ if (alpha is None): alpha = 2.0 / (N + 1.0) - b = np.array([alpha - alpha ** 2.0 / 4.0, alpha ** 2.0 / 2.0, - - alpha + 3.0 * alpha ** 2.0 / 4.0]) - a = np.array([1.0, - 2.0 * (1.0 - alpha), (1.0 - alpha) ** 2.0]) - Y, self.idx = filter_data(self.data, b, a) + + self.b = np.array([alpha - alpha ** 2.0 / 4.0, alpha ** 2.0 / 2.0, + - alpha + 3.0 * alpha ** 2.0 / 4.0]) + self.a = np.array([1.0, - 2.0 * (1.0 - alpha), (1.0 - alpha) ** 2.0]) + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def SincFunction(self, N=10, nel=10): """ Sinc function (order, FIR, pass). - (N > 1, cut off at 0.5/N) """ - b = np.zeros(nel) - b[0] = 1.0 / N - k = np.arange(1, nel) - b[1:] = np.sin(np.pi * k / N) / (np.pi * k) - a = np.array([1.0]) - Y, self.idx = filter_data(self.data, b, a) + K = np.arange(1, nel) + w = np.zeros(nel) + w[0] = 1.0 / N + w[1:] = np.sin(np.pi * K / N) / (np.pi * K) + + self.b = w + self.a = np.array([1.0]) + Y, self.idx = filter_data(self.data, self.b, self.a) + return Y def Decycler(self, P=10): """ Decycler (?? order, IIR, pass ??). Gauss,HP,1st,P - Built subtracting high pass Gauss filter from 1 (order 1) Must be P > 4. If not returns the unfiltered dataset. """ @@ -394,6 +460,7 @@ class Filter: print("Warning: data returned unfiltered (P < 5)") self.idx = 0 return self.data + Y = self.data - self.GaussHigh(N=1, P=P) return Y @@ -407,17 +474,18 @@ class Filter: """ P_low = np.amin([P1, P2]) P_high = np.amax([P1, P2]) + if (P_low < 5): print("Warning: data returned unfiltered (P_low < 5)") self.idx = 0 return self.data + Y = self.GaussHigh(N=2, P=P_low) - self.GaussHigh(N=2, P=P_high) return Y def ABG(self, alpha=0.0, beta=0.0, gamma=0.0, dt=1.0): """ alpha-beta-gamma - For numerical stability: 0 < alpha, beta < 1 """ # If necessary change scalars to arrays @@ -427,17 +495,18 @@ class Filter: beta = np.ones(self.n_samples) * beta if (np.ndim(gamma) == 0): gamma = np.ones(self.n_samples) * gamma - + # Initialize Y_corr = self.data.copy() Y_pred = self.data.copy() - x0 = self.data[0,:] + x0 = self.data[0, :] v0 = np.zeros(self.n_series) a0 = np.zeros(self.n_series) + for i in range(1, self.n_samples): # Predictor (predicts state in ) - x_pred = x0 + dt * v0 + 0.5 * a0 * dt ** 2 + x_pred = x0 + dt * v0 + 0.5 * a0 * dt ** 2.0 v_pred = v0 + dt * a0 a_pred = a0 Y_pred[i, :] = x_pred @@ -448,70 +517,78 @@ class Filter: # Corrector (corrects state in ) x_corr = x_pred + alpha[i] * r v_corr = v_pred + (beta[i] / dt) * r - a_corr = a_pred + (2.0 * gamma[i] / dt ** 2) *r + a_corr = a_pred + (2.0 * gamma[i] / dt ** 2.0) * r # Save value and prepare next iteration Y_corr[i, :] = x_corr x0 = x_corr v0 = v_corr a0 = a_corr - + self.idx = 1 return Y_corr, Y_pred - def Kalman(self, sigma_x, sigma_v, dt, abg_type="abg"): """ Steady-state Kalman filter (also limited to one-dimension) """ - L = (sigma_x / sigma_v) * dt ** 2 + L = (sigma_x / sigma_v) * dt ** 2.0 # Alpha filter if (abg_type == 'a'): - alpha = (-L ** 2 + np.sqrt(L ** 4 + 16.0 * L ** 2)) / 8.0 + alpha = (-L ** 2.0 + np.sqrt(L ** 4.0 + 16.0 * L ** 2.0)) / 8.0 beta = 0.0 gamma = 0.0 # Alpha-Beta filter elif(abg_type == 'ab'): - r = (4.0 + L - np.sqrt(8.0 * L + L ** 2)) / 4.0 - alpha = 1.0 - r ** 2 + r = (4.0 + L - np.sqrt(8.0 * L + L ** 2.0)) / 4.0 + alpha = 1.0 - r ** 2.0 beta = 2.0 * (2.0 - alpha) - 4.0 * np.sqrt(1.0 - alpha) gamma = 0.0 - - #Alpha-Beta-Gamma filter + + # Alpha-Beta-Gamma filter else: b = (L / 2.0) - 3.0 c = (L / 2.0) + 3.0 - d = -1.0 - p = c - b **2 /3.0 - q = 2.0 * b **3 /27.0 - b * c /3.0 + d - v = np.sqrt(q ** 2 + 4.0 * p ** 3 / 27.0) + d = - 1.0 + p = c - b ** 2.0 / 3.0 + q = (2.0 * b ** 3.0) / 27.0 - (b * c) / 3.0 + d + v = np.sqrt(q ** 2.0 + (4.0 * p ** 3.0) / 27.0) z = - (q + v / 2.0) ** (1.0 / 3.0) s = z - p / (3.0 * z) - b / 3.0 - alpha = 1.0 - s ** 2 - beta = 2.0 * (1 - s) ** 2 - gamma = beta ** 2 / (2.0 * alpha) + alpha = 1.0 - s ** 2.0 + beta = 2.0 * (1 - s) ** 2.0 + gamma = (beta ** 2.0) / (2.0 * alpha) # Apply filter Y = self.abg(alpha=alpha, beta=beta, gamma=gamma, dt=dt) return Y -""" -correction = update = measurement -prediction = motion + def plot_frequency(self): + """ + """ + w, h = signal.freqz(self.b, self.a) + h_db = 20.0 * np.log10(np.abs(h)) + wf = w / (2.0 * np.pi) + plt.plot(wf, h_db) + plt.axhline(-3.0, lw=1.5, ls='--', C='r') + plt.grid(b=True) + plt.xlim(np.amin(wf), np.amax(wf)) + plt.xlabel('$omega$ [rad/sample]') + plt.ylabel('$h$ [db]') + plt.show() -X (n_states, 1) State estimate -P (n_states, n_states) Covariance estimate -F (n_states, n_states) State transition model -Z (n_obs, 1) Observations -H (n_obs, n_states) Observation model -R (n_obs, n_obs) Covariance of the observation noise -S (n_obs, n_obs) Covariance of the observation residual -K (n_states, n_obs) Optimal Kalman gain -Q (n_states, n_states) Covariance of the process noise matrix -Y (n_obs, 1) Observation residual (innovation) - -""" \ No newline at end of file + def plot_lag(self): + """ + """ + w, gd = signal.group_delay((self.b, self.a)) + wf = w / (2.0 * np.pi) + plt.plot(wf, gd) + plt.grid(b=True) + plt.xlim(np.amin(wf), np.amax(wf)) + plt.xlabel('$omega$ [rad/sample]') + plt.ylabel('$gd$ [samples]') + plt.show() diff --git a/Code_Python/utils.py b/Code_Python/synthetic.py similarity index 60% rename from Code_Python/utils.py rename to Code_Python/synthetic.py index 23a6fa7..a172e84 100644 --- a/Code_Python/utils.py +++ b/Code_Python/synthetic.py @@ -4,9 +4,7 @@ Utility functions for ????. Copyright (c) 2020 Gabriele Gilardi """ -from scipy import signal import numpy as np -import matplotlib.pyplot as plt def normalize_data(X, param=(), ddof=0): @@ -63,61 +61,61 @@ def scale_data(X, param=()): return Xs, param -def plot_signals(signals, idx_start=0, idx_end=None): +def value2diff(X, mode=None): """ + from value to difference in abs or % + diff in value first element is zero + diff in % first element is one """ - if (idx_end is None): - idx_end = len(signals[0]) - t = np.arange(idx_start, idx_end) - names = [] - count = 0 - for signal in signals: - plt.plot(t, signal[idx_start:idx_end]) - names.append(str(count)) - count += 1 - plt.grid(b=True) - plt.legend(names) - plt.show() + + # Difference in value + if (mode == 'V'): + dX = np.zeros_like(X) + dX[1:, :] = X[1:, :] - X[:-1, :] + + # Difference in percent + else: + dX = np.ones_like(X) + dX[1:, :] = X[1:, :] / X[:-1, :] - 1.0 + + return dX -def plot_frequency_response(b, a=1.0): +def diff2value(dX, mode=None): """ + from difference in abs or % to value (first row should be all zeros but + will be over-written + + Reference X[0,:] is assumed to be zero. If X0[0,:] is the desired + reference, the actual vector X can be determined as X0+X + + Reference X[0,:] is assumed to be one. If X0[0,:] is the desired + reference, the actual vector X can be determined as X0*X """ - b = np.asarray(b) - a = np.asarray(a) - - w, h = signal.freqz(b, a) - h_db = 20.0 * np.log10(abs(h)) - wf = w / (2.0 * np.pi) - - plt.plot(wf, h_db) - plt.axhline(-3.0, lw=1.5, ls='--', C='r') - plt.grid(b=True) - plt.xlim(np.amin(wf), np.amax(wf)) - # plt.ylim(-40.0, 0.0) - plt.xlabel('$\omega$ [rad/sample]') - plt.ylabel('$h$ [db]') - plt.show() + # Value from the difference (first row equal to zero) + # X[0, :] = 0 + # X[1, :] = X[0, :] + dX[1, :] = dX[1, :] + # X[2, :] = X[0, :] + dX[1, :] + dX[2, :] = dX[1, :] + dX[2, :] + # .... + if (mode == 'V'): + X = np.zeros_like(dX) + X[1:, :] = np.cumsum(dX[1:, :], axis=0) + + # Value from percent (first row equal to 1) + # X[0, :] = 1 + # X[1, :] = X[0, :] * (1 + dX[1, :]) = (1 + dX[1, :]) + # X[2, :] = X[1, :] * (1 + dX[2, :]) + # = X[0, :] * (1 + dX[1, :]) * (1 + dX[2, :]) + # = (1 + dX[1, :]) * (1 + dX[2, :]) + # .... + else: + X = np.ones_like(dX) + X[1:, :] = np.cumprod((1.0 + dX), axis=0) + + return X -def plot_lag_response(b, a=1.0): - """ - """ - b = np.asarray(b) - a = np.asarray(a) - - w, gd = signal.group_delay((b, a)) - wf = w / (2.0 * np.pi) - - plt.plot(wf, gd) - plt.grid(b=True) - plt.xlim(np.amin(wf), np.amax(wf)) - plt.xlabel('$\omega$ [rad/sample]') - plt.ylabel('$gd$ [samples]') - plt.show() - - -def synthetic_wave(P, A=None, PH=None, num=1000): +def synthetic_wave(per, amp=None, pha=None, num=1000): """ Generates a multi-sinewave. P = [ P1 P2 ... Pn ] Periods @@ -128,29 +126,29 @@ def synthetic_wave(P, A=None, PH=None, num=1000): Default phases are zeros Time is from 0 to largest period (default 1000 steps) """ - n_waves = len(P) - P = np.asarray(P) + n_waves = len(per) + per = np.asarray(per) # Define amplitudes and phases - if (A is None): - A = np.ones(n_waves) + if (amp is None): + amp = np.ones(n_waves) else: - A = np.asarray(A) - if (PH is None): - PH = np.zeros(n_waves) + amp = np.asarray(amp) + if (pha is None): + pha = np.zeros(n_waves) else: - PH = np.asarray(PH) + pha = np.asarray(pha) # Add all the waves - t = np.linspace(0.0, np.amax(P), num=num) + t = np.linspace(0.0, np.amax(per), num=num) f = np.zeros(len(t)) for i in range(n_waves): - f = f + A[i] * np.sin(2.0 * np.pi * t / P[i] + PH[i]) + f = f + amp[i] * np.sin(2.0 * np.pi * t / per[i] + pha[i]) return t, f -def synthetic_series(data, multivariate=False): +def synthetic_series(X, multiv=False): """ """ n_samples, n_series = data.shape @@ -159,10 +157,10 @@ def synthetic_series(data, multivariate=False): if ((n_samples % 2) == 0): print("Warning: data reduced by one (even number of samples)") n_samples = n_samples - 1 - data = data[0:n_samples, :] + X = X[0:n_samples, :] # FFT of the original data - data_fft = np.fft.fft(data, axis=0) + X_fft = np.fft.fft(X, axis=0) # Parameters half_len = (n_samples - 1) // 2 @@ -170,7 +168,7 @@ def synthetic_series(data, multivariate=False): idx2 = np.arange(half_len+1, n_samples, dtype=int) # If multivariate the random phases is the same - if (multivariate): + if (multiv): phases = np.random.rand(half_len, 1) phases1 = np.tile(np.exp(2.0 * np.pi * 1j * phases), (1, n_series)) phases2 = np.conj(np.flipud(phases1)) @@ -182,11 +180,11 @@ def synthetic_series(data, multivariate=False): phases2 = np.conj(np.flipud(phases1)) # FFT of the synthetic data - synt_fft = data_fft.copy() - synt_fft[idx1, :] = data_fft[idx1, :] * phases1 - synt_fft[idx2, :] = data_fft[idx2, :] * phases2 + synt_fft = X_fft.copy() + synt_fft[idx1, :] = X_fft[idx1, :] * phases1 + synt_fft[idx2, :] = X_fft[idx2, :] * phases2 # Inverse FFT of the synthetic data - synt_data = np.real(np.fft.ifft(synt_fft, axis=0)) + X_synt = np.real(np.fft.ifft(synt_fft, axis=0)) - return synt_data + return X_synt diff --git a/Code_Python/test.py b/Code_Python/test.py index 0215ca2..b1b889b 100644 --- a/Code_Python/test.py +++ b/Code_Python/test.py @@ -13,14 +13,19 @@ ToDo: - example for alpha-beta-gamma using variable sigma as in financial time series (see Ehler) - example using noisy multi-sine-waves +- synt: boot, paper Vinod (as a class?) +- vectors must be ( .., 1) +- reduce the vector diff by one and pass initial value + (with zero/one as default) """ import sys import numpy as np -import filters as flt -import utils as utl import matplotlib.pyplot as plt +import filters as flt +import synthetic as syn + # Read data to filter if len(sys.argv) != 2: print("Usage: python test.py ") @@ -34,7 +39,7 @@ data = data.reshape(n_samples, -1) np.random.seed(1294404794) -spx = flt.Filter(data) +# spx = flt.Filter(data) # res, bb, aa = spx.SincFunction(2, 50) # print(bb) @@ -55,21 +60,26 @@ spx = flt.Filter(data) # plt.plot(t,f) # plt.show() aa = np.array([ - [0.8252, 0.2820], - [1.3790, 0.0335], - [-1.0582, -1.3337], - [-0.4686, 1.1275], - [-0.2725, 0.3502], - [1.0984, -0.2991], - [-0.2779, 0.0229], - [0.7015, -0.2620], - [-2.0518, -1.7502], - [-0.3538, -0.2857], - [-0.8236, -0.8314], - [-1.5771, -0.9792], - [0.5080, -1.1564]]) -synt_aa = utl.synthetic_series(data, False) -plt.plot(synt_aa) -plt.plot(data) -plt.show() - + [ 0.8252, 0.2820], + [ 1.3790, 0.0335], + [-1.0582, -1.3337], + [-0.4686, 1.1275], + [-0.2725, 0.3502], + [ 1.0984, -0.2991], + [-0.2779, 0.0229], + [ 0.7015, -0.2620], + [-2.0518, -1.7502], + [-0.3538, -0.2857], + [-0.8236, -0.8314], + [-1.5771, -0.9792], + [ 0.5080, -1.1564]]) +# synt_aa = utl.synthetic_series(data, False) +# plt.plot(synt_aa) +# plt.plot(data) +# plt.show() +print(data[0:10, :]) +bb = syn.value2diff(data, mode='V') +print(bb[0:10, :]) +bb[0, 0] = 1399.48 +cc = syn.diff2value(bb, mode='V') +print(cc[0:10, :]) \ No newline at end of file