""" Signal Filtering and Generation of Synthetic Time-Series. Copyright (c) 2020 Gabriele Gilardi X (n_samples, ) Dataset to filter (input) b (n_b, ) Transfer response coefficients (numerator) a (n_a, ) Transfer response coefficients (denominator) Y (n_samples, ) Filtered dataset (output) idx scalar First filtered element in Y n_samples Number of samples in the input dataset nb Number of coefficients in array na Number of coefficients in array Notes: - the filter is applied starting from index idx = MAX(0, nb-1, na-1). - non filtered data are set equal to the input, i.e. Y[0:idx-1] = X[0:idx-1] - X must be a 1D array. Filter list: ----------- Generic b, a Generic filter SMA N Simple moving average EMA N/alpha Exponential moving average WMA N Weighted moving average MSMA N Modified simple moving average MLSQ N Modified least-squares quadratic (N = 5, 7, 9, 11) ButterOrig P, N Butterworth original filter (N = 2, 3) ButterMod P, N Butterworth modified filter (N = 2, 3) SuperSmooth P, N Supersmoother filter (N = 2, 3) GaussLow P, N Gauss low pass filter (P > 1) GaussHigh P, N Gauss high pass filter (P > 4) BandPass P, delta Band-pass filter BandStop P, delta Band-stop filter ZEMA1 N/alpha, K, Vn Zero-lag EMA (type 1) ZEMA2 N/alpha, K Zero-lag EMA (type 2) InstTrend N/alpha Instantaneous trendline SincFilter P, nel Sinc function filter (N > 1) Decycler P De-cycler filter (P >= 4) DecyclerOsc P1, P2 De-cycle oscillator (P >= 4) ABG alpha, beta, Alpha-beta-gamma filter (0 < alpha, beta < 1) gamma, dt Kalman sigma_x, dt One-dimensional steady-state Kalman filter sigma_v N Order/smoothing factor/number of previous samples alpha Damping term P, P1, P2 Cut-off/critical period (50% power loss, -3 dB) delta Semi-band centered in P K Coefficient/gain Vn Look-back sample nel Number of frequencies in the sinc function alpha Parameter(s) to correct the position in the ABG filter beta Parameter(s) to correct the velocity in the ABG filter gamma Parameter(s) to correct the acceleration in the ABG filter dt Sampling interval in the ABG and Kalman filters sigma_x Process variance in the Kalman filter sigma_v Noise variance in the Kalman filter """ import numpy as np from scipy import signal import matplotlib.pyplot as plt def plot_signals(signals, names=None, start=0, end=None): """ Plot the signals specified in list with their names specified in list . Each signal is plotted in its full length unless differently specified. """ # Identify the signals by index if their name is not specified if (names is None): legend = [] count = 0 else: legend = names # Loop over the signals t_max = 0 for signal in signals: signal = signal.flatten() t_max = np.amax([t_max, len(signal)]) plt.plot(signal) # If no name is given use the list index to identify the signals if (names is None): legend.append('Signal [' + str(count) + ']') count += 1 # Plot and format plt.xlabel('Index') plt.ylabel('Value') plt.grid(b=True) plt.legend(legend) if (end is None): plt.xlim(start, t_max) else: plt.xlim(start, end) plt.show() def filter_data(data, b, a): """ Applies a filter with transfer response coefficients (numerator) and (denominator). """ n_samples = len(data) nb = len(b) na = len(a) idx = np.amax([0, nb-1, na-1]) # Index of the 1st filtered sample Y = data.copy() # Loop over the samples for i in range(idx, n_samples): tmp = 0 # Contribution from the numerator term (input samples) for j in range(nb): tmp += b[j] * data[i-j] # Contribution from the denominator term (previous output samples) for j in range(1, na): tmp -= a[j] * Y[i-j] Y[i] = tmp / a[0] return Y, idx class Filter: def __init__(self, data): """ Initialize the filter object. """ self.data = np.asarray(data).flatten() self.idx = 0 self.b = 0.0 self.a = 0.0 def Generic(self, b=1.0, a=1.0): """ Filter with generic transfer response coefficients and . """ self.b = np.asarray(b) self.a = np.asarray(a) Y, self.idx = filter_data(self.data, self.b, self.a) return Y def SMA(self, N=5): """ Simple moving average. """ self.b = np.ones(N) / N self.a = np.array([1.0]) Y, self.idx = filter_data(self.data, self.b, self.a) return Y def EMA(self, N=5, alpha=None): """ Exponential moving average. If not given, is determined as equivalent to a N-SMA. """ if (alpha is None): alpha = 2.0 / (N + 1.0) self.b = np.array([alpha]) self.a = np.array([1.0, - (1.0 - alpha)]) Y, self.idx = filter_data(self.data, self.b, self.a) return Y def WMA(self, N=5): """ Weighted moving average. Example: N = 5 --> [5, 4, 3, 2, 1] / 15. """ w = np.arange(N, 0, -1) self.b = w / np.sum(w) self.a = np.array([1.0]) Y, self.idx = filter_data(self.data, self.b, self.a) return Y def MSMA(self, N=5): """ Modified simple moving average. Example: N = 5 --> [1/2, 1, 1, 1, 1, 1/2] / 5. """ w = np.ones(N+1) w[0] = 0.5 w[N] = 0.5 self.b = w / N self.a = np.array([1.0]) Y, self.idx = filter_data(self.data, self.b, self.a) return Y def MLSQ(self, N=5): """ Modified least-squares quadratic. Must be N = 5, 7, 9, or 11. If wrong N, prints a warning and returns the unfiltered dataset. """ if (N == 5): w = np.array([7, 24, 34, 24, 7]) / 96 elif (N == 7): w = np.array([1, 6, 12, 14, 12, 6, 1]) / 52 elif (N == 9): w = np.array([-1, 28, 78, 108, 118, 108, 78, 28, -1]) / 544 elif (N == 11): w = np.array([-11, 18, 88, 138, 168, 178, 168, 138, 88, 18, -11]) / 980 else: print("Warning: data returned unfiltered (MLSQ - Wrong N)") self.idx = 0 return self.data self.b = w self.a = np.array([1.0]) Y, self.idx = filter_data(self.data, self.b, self.a) return Y def ButterOrig(self, N=2, P=10): """ Butterworth original filter. Must be N = 2 or 3. If wrong N, prints a warning and returns the unfiltered dataset. """ if (N == 2): beta = np.exp(-np.sqrt(2.0) * np.pi / P) alpha = 2.0 * beta * np.cos(np.sqrt(2.0) * np.pi / P) wb = np.array([1.0, 2.0, 1.0]) * (1.0 - alpha + beta ** 2.0) / 4.0 wa = np.array([1.0, - alpha, beta ** 2.0]) elif (N == 3): beta = np.exp(-np.pi / P) alpha = 2.0 * beta * np.cos(np.sqrt(3.0) * np.pi / P) wb = np.array([1.0, 3.0, 3.0, 1.0]) * (1.0 - beta ** 2.0) \ * (1.0 - alpha + beta ** 2.0) / 8.0 wa = np.array([1.0, - (alpha + beta ** 2.0), (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) else: print("Warning: data returned unfiltered (ButterOrig - Wrong N)") self.idx = 0 return self.data self.b = wb self.a = wa Y, self.idx = filter_data(self.data, self.b, self.a) return Y def ButterMod(self, N=2, P=10): """ Butterworth modified filter. It is derived from the Butterworth original filter deleting all but the constant term at the numerator. Must be N = 2 or 3. If wrong N, prints a warning and returns the unfiltered dataset. """ if (N == 2): beta = np.exp(-np.sqrt(2.0) * np.pi / P) alpha = 2.0 * beta * np.cos(np.sqrt(2.0) * np.pi / P) wb = np.array([1.0 - alpha + beta ** 2.0]) wa = np.array([1.0, - alpha, beta ** 2.0]) elif (N == 3): beta = np.exp(-np.pi / P) alpha = 2.0 * beta * np.cos(np.sqrt(3.0) * np.pi / P) wb = np.array([1.0 - alpha * (1.0 - beta ** 2.0) - beta ** 4.0]) wa = np.array([1.0, - (alpha + beta ** 2.0), (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) else: print("Warning: data returned unfiltered (ButterMod - Wrong N)") self.idx = 0 return self.data self.b = wb self.a = wa Y, self.idx = filter_data(self.data, self.b, self.a) return Y def SuperSmooth(self, N=2, P=10): """ Supersmoother filter. It is derived from the Butterworth modified filter adding a two-element moving average at the numerator. Must be N = 2 or 3. If wrong N, prints a warning and returns the unfiltered dataset. """ if (N == 2): beta = np.exp(-np.sqrt(2.0) * np.pi / P) alpha = 2.0 * beta * np.cos(np.sqrt(2.0) * np.pi / P) wb = np.array([1.0, 1.0]) * (1.0 - alpha + beta ** 2.0) / 2.0 wa = np.array([1.0, - alpha, beta ** 2.0]) elif (N == 3): beta = np.exp(-np.pi / P) alpha = 2.0 * beta * np.cos(np.sqrt(3.0) * np.pi / P) wb = np.array([1.0, 1.0]) \ * (1.0 - alpha * (1.0 - beta ** 2.0) - beta ** 4.0) / 2.0 wa = np.array([1.0, - (alpha + beta ** 2.0), (1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) else: print("Warning: data returned unfiltered (SuperSmooth - Wrong N)") self.idx = 0 return self.data self.b = wb self.a = wa Y, self.idx = filter_data(self.data, self.b, self.a) return Y def GaussLow(self, N=1, P=10): """ Gauss low pass filter. Must be P > 1. If wrong P, prints a warning and returns the unfiltered dataset. """ if (P <= 1): print("Warning: data returned unfiltered (GaussLow - Wrong P)") self.idx = 0 return self.data A = 2.0 ** (1.0 / N) - 1.0 B = 4.0 * np.sin(np.pi / P) ** 2.0 C = 2.0 * (np.cos(2.0 * np.pi / P) - 1.0) alpha = (-B + np.sqrt(B ** 2.0 - 4.0 * A * C)) / (2.0 * A) self.b = np.array([alpha]) self.a = np.array([1.0, - (1.0 - alpha)]) Y = self.data.copy() for i in range(N): Y, self.idx = filter_data(Y, self.b, self.a) return Y def GaussHigh(self, N=1, P=10): """ Gauss high pass filter. Must be P > 4. If wrong P, prints a warning and returns the unfiltered dataset. """ if (P <= 4): print("Warning: data returned unfiltered (GaussHigh - Wrong P)") self.idx = 0 return self.data A = 2.0 ** (1.0 / N) * np.sin(np.pi / P) ** 2.0 - 1.0 B = 2.0 * (2.0 ** (1.0 / N) - 1.0) * (np.cos(2.0 * np.pi / P) - 1.0) C = - B alpha = (-B - np.sqrt(B ** 2.0 - 4.0 * A * C)) / (2.0 * A) self.b = np.array([1.0 - alpha / 2.0, -(1.0 - alpha / 2.0)]) self.a = np.array([1.0, - (1.0 - alpha)]) Y = self.data - self.data[0] # Shift to zero for i in range(N): Y, self.idx = filter_data(Y, self.b, self.a) return Y def BandPass(self, P=10, delta=0.3): """ Band-pass filter. Example: delta = 0.3, P = 10 --> 0.3 * 10 = 3 --> band is [7, 13] """ beta = np.cos(2.0 * np.pi / P) gamma = np.cos(4.0 * np.pi * delta / P) alpha = 1.0 / gamma - np.sqrt(1.0 / gamma ** 2 - 1.0) self.b = np.array([(1.0 - alpha) / 2.0, 0.0, - (1.0 - alpha) / 2.0]) self.a = np.array([1.0, - beta * (1.0 + alpha), alpha]) Y, self.idx = filter_data(self.data, self.b, self.a) return Y def BandStop(self, P=5, delta=0.3): """ Band-stop filter. Example: delta = 0.3, P = 10 --> 0.3 * 10 = 3 --> band is [7, 13] """ beta = np.cos(2.0 * np.pi / P) gamma = np.cos(4.0 * np.pi * delta / P) alpha = 1.0 / gamma - np.sqrt(1.0 / gamma ** 2 - 1.0) self.b = np.array([1.0, - 2.0 * beta, 1.0]) * (1.0 + alpha) / 2.0 self.a = np.array([1.0, - beta * (1.0 + alpha), alpha]) Y, self.idx = filter_data(self.data, self.b, self.a) return Y def ZEMA1(self, N=10, alpha=None, K=1.0, Vn=5): """ Zero-lag EMA (type 1). It is an alpha-beta type filter with sub-optimal parameters. If not given, is determined as equivalent to a N-SMA. """ if (alpha is None): alpha = 2.0 / (N + 1.0) w = np.zeros(Vn + 1) w[0] = alpha * (1.0 + K) w[Vn] = - alpha * K self.b = w self.a = np.array([1.0, - (1.0 - alpha)]) Y, self.idx = filter_data(self.data, self.b, self.a) return Y def ZEMA2(self, N=10, alpha=None, K=1.0): """ Zero-lag EMA (type 2). It is derived from the type 1 ZEMA removing the look-back term Vn. If not given, is determined as equivalent to a N-SMA. """ if (alpha is None): alpha = 2.0 / (N + 1.0) self.b = np.array([alpha * (1.0 + K)]) self.a = np.array([1.0, alpha * K - (1.0 - alpha)]) Y, self.idx = filter_data(self.data, self.b, self.a) return Y def InstTrend(self, N=10, alpha=None): """ Instantaneous Trendline. It is created by removing the dominant cycle from the signal. If not given, is determined as equivalent to a N-SMA. """ if (alpha is None): alpha = 2.0 / (N + 1.0) self.b = np.array([alpha - alpha ** 2.0 / 4.0, alpha ** 2.0 / 2.0, - alpha + 3.0 * alpha ** 2.0 / 4.0]) self.a = np.array([1.0, - 2.0 * (1.0 - alpha), (1.0 - alpha) ** 2.0]) Y, self.idx = filter_data(self.data, self.b, self.a) return Y def SincFilter(self, P=10, nel=10): """ Sinc function filter. The cut off point is at 0.5/P. Must be P > 1. If wrong P, prints a warning and returns the unfiltered dataset. """ if (P <= 1): print("Warning: data returned unfiltered (SincFilter - Wrong P)") self.idx = 0 return self.data K = np.arange(1, nel) w = np.zeros(nel) w[0] = 1.0 / P w[1:] = np.sin(np.pi * K / P) / (np.pi * K) self.b = w self.a = np.array([1.0]) Y, self.idx = filter_data(self.data, self.b, self.a) return Y def Decycler(self, P=10): """ De-cycler filter. It is derived subtracting a 1st order high pass Gauss filter from 1. Must be P > 4. If wrong P, prints a warning and returns the unfiltered dataset. """ if (P <= 4): print("Warning: data returned unfiltered (Decycler - Wrong P)") self.idx = 0 return self.data Y = self.data - self.GaussHigh(N=1, P=P) return Y def DecyclerOsc(self, P1=5, P2=10): """ De-cycler oscillator. It is derived subtracting a 2nd order high pass Gauss filter with higher cut-off period from a 2nd order high pass Gauss filter with higher cut-off period. Must be P > 4. If wrong P, prints a warning and returns the unfiltered dataset. """ P_low = np.amin([P1, P2]) P_high = np.amax([P1, P2]) if (P_low <= 4): print("Warning: data returned unfiltered (DecyclerOsc - Wrong P)") self.idx = 0 return self.data Y = self.GaussHigh(N=2, P=P_low) - self.GaussHigh(N=2, P=P_high) return Y def ABG(self, alpha=0.0, beta=0.0, gamma=0.0, dt=1.0): """ Alpha-beta-gamma filter. It is a predictor-corrector type of filter. Arguments alpha, beta, and gamma can be a scalar (used for all samples) or an array with one value for each sample. For numerical stability it should be 0 < alpha, beta < 1. """ n_samples = len(self.data) Y = np.zeros(n_samples) # Change scalar arguments to arrays if necessary if (np.ndim(alpha) == 0): alpha = np.ones(n_samples) * alpha if (np.ndim(beta) == 0): beta = np.ones(n_samples) * beta if (np.ndim(gamma) == 0): gamma = np.ones(n_samples) * gamma # Initialize x0 = self.data[0] v0 = 0.0 a0 = 0.0 Y[0] = x0 for i in range(1, n_samples): # Predictor (predicts state in ) x_pred = x0 + dt * v0 + 0.5 * a0 * dt ** 2.0 v_pred = v0 + dt * a0 a_pred = a0 # Residual (innovation) r = self.data[i] - x_pred # Corrector (corrects state in ) x_corr = x_pred + alpha[i] * r v_corr = v_pred + (beta[i] / dt) * r a_corr = a_pred + (2.0 * gamma[i] / dt ** 2.0) * r # Save value and prepare next iteration x0 = x_corr v0 = v_corr a0 = a_corr Y[i] = x_corr self.idx = 1 return Y def Kalman(self, sigma_x, sigma_v, dt, abg_type="abg"): """ One-dimensional steady-state Kalman filter. It is obtained from the alpha-beta-gamma filter using the process variance, the noise variance and optimizing the three parameters. Arguments sigma_x and sigma_v can be a scalar (used for all samples) or an array with one value for each sample. """ L = (sigma_x / sigma_v) * dt ** 2.0 # Alpha filter if (abg_type == 'a'): alpha = (-L ** 2.0 + np.sqrt(L ** 4.0 + 16.0 * L ** 2.0)) / 8.0 beta = 0.0 gamma = 0.0 # Alpha-Beta filter elif(abg_type == 'ab'): r = (4.0 + L - np.sqrt(8.0 * L + L ** 2.0)) / 4.0 alpha = 1.0 - r ** 2.0 beta = 2.0 * (2.0 - alpha) - 4.0 * np.sqrt(1.0 - alpha) gamma = 0.0 # Alpha-Beta-Gamma filter else: b = (L / 2.0) - 3.0 c = (L / 2.0) + 3.0 d = - 1.0 p = c - b ** 2.0 / 3.0 q = (2.0 * b ** 3.0) / 27.0 - (b * c) / 3.0 + d v = np.sqrt(q ** 2.0 + (4.0 * p ** 3.0) / 27.0) z = - (q + v / 2.0) ** (1.0 / 3.0) s = z - p / (3.0 * z) - b / 3.0 alpha = 1.0 - s ** 2.0 beta = 2.0 * (1 - s) ** 2.0 gamma = (beta ** 2.0) / (2.0 * alpha) # Apply the alpha-beta-gamma filter Y = self.ABG(alpha=alpha, beta=beta, gamma=gamma, dt=dt) return Y def plot_response(self): """ Plots the frequency response (in decibels) and the lag (group delay) of the filter with coefficients and . """ # Frequency response w, h = signal.freqz(self.b, self.a) h_db = 20.0 * np.log10(np.abs(h)) # Convert to decibels # Lag / Group delay w, gd = signal.group_delay((self.b, self.a)) # Scale frequency to [0, 0.5] wf = w / (2.0 * np.pi) # Plot and format frequency response plt.subplot(1, 2, 1) plt.plot(wf, h_db) plt.axhline(-3.0, lw=1.5, ls='--', C='r') # -3 dB (50% power loss) plt.grid(b=True) plt.xlim(np.amin(wf), np.amax(wf)) plt.xlabel(r'$\omega$ [rad/sample]') plt.ylabel('$h$ [db]') legend = ['Filter', '-3dB'] plt.legend(legend) plt.title('Frequency Response') # Plot and format lag/group delay plt.subplot(1, 2, 2) plt.plot(wf, gd) plt.grid(b=True) plt.xlim(np.amin(wf), np.amax(wf)) plt.xlabel(r'$\omega$ [rad/sample]') plt.ylabel('$gd$ [samples]') plt.title('Lag / Group Delay') # Show plots plt.suptitle('Example "Response"') plt.show()