add: kalman filter, ABG filters, synthetic wave

master
Gabriele Gilardi 2020-06-19 20:21:18 +09:00
rodzic 8fef296eba
commit d986665244
6 zmienionych plików z 218 dodań i 572 usunięć

Wyświetl plik

@ -1,183 +0,0 @@
# import numpy as np
# measurements = np.array([5., 6., 7., 9., 10.])
# motion = np.array([1., 1., 2., 1., 1.])
# measurement_sigma = 4.
# motion_sigma = 2.
# mu = 0.
# sigma = 1000.
# # Measurement
# def Update( mean1, var1, mean2, var2 ):
# mean = (var2*mean1 + var1*mean2) / (var1 + var2)
# var = 1.0 / (1.0/var1 + 1.0/var2)
# return [mean, var]
# # Motion
# def Predict( mean1, var1, U, varU ):
# mean = mean1 + U
# var = var1 + varU
# return [mean, var]
# for n in range(len(measurements)):
# [mu, sigma] = Update(mu, sigma, measurements[n], measurement_sigma)
# print('Update : ', n, [mu, sigma])
# [mu, sigma] = Predict(mu, sigma, motion[n],motion_sigma)
# print('Predict: ', n, [mu, sigma])
# print(' ')
# print(Update(1,1,3,1))
# -------------------------------------------------------
# import numpy as np
# measurements = [ 1., 2., 3. ]
# dt = 1.
# # Initial state (location and velocity)
# x = np.array([[ 0. ],
# [ 0. ]])
# # Initial uncertainty
# P = np.array([[ 1000., 0. ],
# [ 0., 1000. ]])
# # External motion
# U = np.array([[ 0. ],
# [ 0. ]])
# # Next state function
# F = np.array([[ 1., dt ],
# [ 0., 1. ]])
# # Measurement function
# H = np.array([[ 1., 0. ]])
# # Measurement uncertainty
# R = np.array([[ 1. ]])
# # Identity matrix
# I = np.eye(2)
# def filter(x, P):
# step = 0
# for z in (measurements):
# step += 1
# print("step = ", step, " meas. = ", z)
# # Measurement
# Htra = H.T
# S = H.dot(P.dot(Htra)) + R
# Sinv = np.linalg.inv(S)
# K = P.dot(Htra.dot(Sinv))
# y = z - H.dot(x)
# xp = x +K.dot(y)
# Pp = P - K.dot(H.dot(P))
# # Prediction
# x = F.dot(xp) + U
# Ftra = F.T
# P = F.dot(Pp.dot(Ftra))
# print('x =')
# print(x)
# print('P =')
# print(P)
# filter(x, P)
# # -------------------------------------------------------
import numpy as np
x0 = 4.
y0 = 12.
measurements = np.array([[ 5., 10. ],
[ 6., 8. ],
[ 7., 6. ],
[ 8., 4. ],
[ 9., 2. ],
[ 10., 0. ]])
# x0 = -4.
# y0 = 8.
# measurements = np.array([[ 1., 4. ],
# [ 6., 0. ],
# [ 11., -4. ],
# [ 16., -8. ]])
# x0 = 1.
# y0 = 19.
# measurements = np.array([[ 1., 17. ],
# [ 1., 15. ],
# [ 1., 13. ],
# [ 1., 11. ]])
# x0 = 1.
# y0 = 19.
# measurements = np.array([[ 2., 17. ],
# [ 0., 15. ],
# [ 2., 13. ],
# [ 0., 11. ]])
# Time step
dt = 0.1
# Initial state (location and velocity)
x = np.array([[ x0 ],
[ y0 ],
[ 0. ],
[ 0. ]])
# Initial uncertainty
P = np.array([[ 0., 0., 0., 0. ],
[ 0., 0., 0., 0. ],
[ 0., 0., 1000., 0. ],
[ 0., 0., 0., 1000. ]])
# External motion
U = np.array([[ 0. ],
[ 0. ],
[ 0. ],
[ 0. ]])
# Next state function
F = np.array([[ 1., 0., dt, 0. ],
[ 0., 1., 0., dt ],
[ 0., 0., 1., 0. ],
[ 0., 0., 0., 1. ]])
# Measurement function
H = np.array([[ 1., 0., 0., 0. ],
[ 0., 1., 0., 0. ]])
# Measurement uncertainty
R = np.array([[ 0.1, 0. ],
[ 0. , 0.1 ]])
# Measurement vector
z = np.zeros((2,1))
def filter(x, P):
for n in range(len(measurements)):
z[0][0] = measurements[n][0]
z[1][0] = measurements[n][1]
# Prediction
xp = F.dot(x) + U
Ftra = F.T
Pp = F.dot(P.dot(Ftra))
# Measurement
Htra = H.T
S = H.dot(Pp.dot(Htra)) + R
Sinv = np.linalg.inv(S)
K = Pp.dot(Htra.dot(Sinv))
y = z - H.dot(xp)
x = xp +K.dot(y)
P = Pp - K.dot(H.dot(Pp))
# print(z)
# print('x = ')
# print(x)
# print('P = ')
# print(P)
# print(' ')
return x, P
x_final, P_final = filter(x, P)
print('x = ')
print(x_final)
print('P = ')
print(P_final)

Wyświetl plik

@ -1,106 +0,0 @@
"""
correction = update = measurement
prediction = motion
X (n_states, 1) State vector
P (n_states, n_states) Covariance matrix of X
F (n_states, n_states) State transition matrix
U (n_states, 1) Input/control/drift vector
Z (n_meas, 1) Measurament vector
H (n_meas, n_states) Measurament matrix
R (n_meas, n_meas) Covariance matrix of Z
S (n_meas, n_meas) Covariance matrix (?)
K (n_states, m) Kalman gain
Q (n_states, n_states) Covariance matrix (?)
Data (n_meas, n_samples) Measurements
Fext (n_states, n_samples) External driver
X0 (n_states, 1) Initial state vector
P0 (n_states, n_states) Initial covariance matrix of X
"""
import numpy as np
class KalmanFilter:
def __init__(self, F, H, Q, R):
"""
"""
self.F = F
self.Q = Q
self.H = H
self.R = R
def prediction(self, X, P, U):
X = self.F @ X + U
P = self.F @ P @ self.F.T + self.Q
return X, P
def update(self, X, P, Z):
"""
"""
S = self.H @ P @ self.H.T + self.R
K = P @ self.H.T @ np.linalg.inv(S)
X = X + K @ (Z - self.H @ X)
P = P - K @ self.H @ P
return X, P
def applyFilter(self, Data, Fext, X0, P0):
"""
"""
pass
# Measurements
data = np.array([[5., 6., 7., 8., 9., 10.],
[10., 8., 6., 4., 2., 0.]])
# Initial state vector
X0 = np.array([[4. ],
[12.],
[0. ],
[0. ]])
# Initial covariance matrix of X
P0 = np.array([[0., 0., 0., 0.],
[0., 0., 0., 0.],
[0., 0., 1000., 0.],
[0., 0., 0., 1000.]])
# External motion
Fext = np.zeros_like(data)
# Next state function
dt = 0.1
F = np.array([[ 1., 0., dt, 0. ],
[ 0., 1., 0., dt ],
[ 0., 0., 1., 0. ],
[ 0., 0., 0., 1. ]])
# Measurement function
H = np.array([[ 1., 0., 0., 0. ],
[ 0., 1., 0., 0. ]])
# Measurement uncertainty
R = np.array([[ 0.1, 0. ],
[ 0. , 0.1 ]])
def filter(x, P):
step = 0
for z in (measurements):
step += 1
print("step = ", step, " meas. = ", z)
# Update
print('x =')
print(x)
print('P =')
print(P)
filter(x, P)

Wyświetl plik

@ -46,6 +46,8 @@ InstTrend N/alpha Instantaneous trendline
SincFunction N Sinc function SincFunction N Sinc function
Decycler P Decycler, 1-GaussHigh (P>=5) Decycler P Decycler, 1-GaussHigh (P>=5)
DecyclerOsc P1,P2 Decycle oscillator, GH(P1) - GH(P2), (P1>=5) DecyclerOsc P1,P2 Decycle oscillator, GH(P1) - GH(P2), (P1>=5)
ABG
Kalman
N Order/smoothing factor/number of previous samples N Order/smoothing factor/number of previous samples
alpha Damping term alpha Damping term
@ -87,12 +89,12 @@ def filter_data(X, b, a):
class Filter: class Filter:
def __init__(self, X): def __init__(self, data):
""" """
""" """
self.X = np.asarray(X) self.data = np.asarray(data)
self.n_samples, self.n_series = X.shape self.n_samples, self.n_series = data.shape
self.idx = 0 self.idx = 0
def Generic(self, b=1.0, a=1.0): def Generic(self, b=1.0, a=1.0):
@ -101,7 +103,7 @@ class Filter:
""" """
b = np.asarray(b) b = np.asarray(b)
a = np.asarray(a) a = np.asarray(a)
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def SMA(self, N=10): def SMA(self, N=10):
@ -110,7 +112,7 @@ class Filter:
""" """
b = np.ones(N) / N b = np.ones(N) / N
a = np.array([1.0]) a = np.array([1.0])
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def EMA(self, N=10, alpha=None): def EMA(self, N=10, alpha=None):
@ -123,7 +125,7 @@ class Filter:
alpha = 2.0 / (N + 1.0) alpha = 2.0 / (N + 1.0)
b = np.array([alpha]) b = np.array([alpha])
a = np.array([1.0, -(1.0 - alpha)]) a = np.array([1.0, -(1.0 - alpha)])
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def WMA(self, N=10): def WMA(self, N=10):
@ -135,7 +137,7 @@ class Filter:
w = np.arange(N, 0, -1) w = np.arange(N, 0, -1)
b = w / np.sum(w) b = w / np.sum(w)
a = np.array([1.0]) a = np.array([1.0])
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def MSMA(self, N=10): def MSMA(self, N=10):
@ -149,7 +151,7 @@ class Filter:
w[N] = 0.5 w[N] = 0.5
b = w / N b = w / N
a = np.array([1.0]) a = np.array([1.0])
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def MLSQ(self, N=5): def MLSQ(self, N=5):
@ -170,11 +172,11 @@ class Filter:
b = np.array([-11.0, 18.0, 88.0, 138.0, 168.0, 178.0, 168.0, b = np.array([-11.0, 18.0, 88.0, 138.0, 168.0, 178.0, 168.0,
138.0, 88.0, 18.0, -11.0]) / 980.0 138.0, 88.0, 18.0, -11.0]) / 980.0
else: else:
Y = self.X.copy() print("Warning: data returned unfiltered (wrong N)")
self.idx = 0 self.idx = 0
return Y return self.data
a = np.array([1.0]) a = np.array([1.0])
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def ButterOrig(self, N=2, P=10): def ButterOrig(self, N=2, P=10):
@ -196,10 +198,10 @@ class Filter:
a = np.array([1.0, - (alpha + beta ** 2.0), a = np.array([1.0, - (alpha + beta ** 2.0),
(1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) (1.0 + alpha) * beta ** 2.0, - beta ** 4.0])
else: else:
Y = self.X.copy() print("Warning: data returned unfiltered (wrong N)")
self.idx = 0 self.idx = 0
return Y return self.data
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def ButterMod(self, N=2, P=10): def ButterMod(self, N=2, P=10):
@ -220,10 +222,10 @@ class Filter:
a = np.array([1.0, - (alpha + beta ** 2.0), a = np.array([1.0, - (alpha + beta ** 2.0),
(1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) (1.0 + alpha) * beta ** 2.0, - beta ** 4.0])
else: else:
Y = self.X.copy() print("Warning: data returned unfiltered (wrong N)")
self.idx = 0 self.idx = 0
return Y return self.data
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def SuperSmooth(self, N=2, P=10): def SuperSmooth(self, N=2, P=10):
@ -246,10 +248,10 @@ class Filter:
a = np.array([1.0, - (alpha + beta ** 2.0), a = np.array([1.0, - (alpha + beta ** 2.0),
(1.0 + alpha) * beta ** 2.0, - beta ** 4.0]) (1.0 + alpha) * beta ** 2.0, - beta ** 4.0])
else: else:
Y = self.X.copy() print("Warning: data returned unfiltered (wrong N)")
self.idx = 0 self.idx = 0
return Y return self.data
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def GaussLow(self, N=1, P=2): def GaussLow(self, N=1, P=2):
@ -259,16 +261,16 @@ class Filter:
Must be P > 1. If not returns the unfiltered dataset. Must be P > 1. If not returns the unfiltered dataset.
""" """
if (P < 2): if (P < 2):
Y = self.X.copy() print("Warning: data returned unfiltered (P < 2)")
self.idx = 0 self.idx = 0
return Y return self.data
A = 2.0 ** (1.0 / N) - 1.0 A = 2.0 ** (1.0 / N) - 1.0
B = 4.0 * np.sin(np.pi / P) ** 2.0 B = 4.0 * np.sin(np.pi / P) ** 2.0
C = 2.0 * (np.cos(2.0 * np.pi / P) - 1.0) C = 2.0 * (np.cos(2.0 * np.pi / P) - 1.0)
alpha = (-B + np.sqrt(B ** 2.0 - 4.0 * A * C)) / (2.0 * A) alpha = (-B + np.sqrt(B ** 2.0 - 4.0 * A * C)) / (2.0 * A)
b = np.array([alpha]) b = np.array([alpha])
a = np.array([1.0, - (1.0 - alpha)]) a = np.array([1.0, - (1.0 - alpha)])
Y = self.X.copy() Y = self.data.copy()
for i in range(N): for i in range(N):
Y, self.idx = filter_data(Y, b, a) Y, self.idx = filter_data(Y, b, a)
return Y return Y
@ -280,16 +282,16 @@ class Filter:
Must be P > 4. If not returns the unfiltered dataset. Must be P > 4. If not returns the unfiltered dataset.
""" """
if (P < 5): if (P < 5):
Y = self.X.copy() print("Warning: data returned unfiltered (P < 5)")
self.idx = 0 self.idx = 0
return Y return self.data
A = 2.0 ** (1.0 / N) * np.sin(np.pi / P) ** 2.0 - 1.0 A = 2.0 ** (1.0 / N) * np.sin(np.pi / P) ** 2.0 - 1.0
B = 2.0 * (2.0 ** (1.0 / N) - 1.0) * (np.cos(2.0 * np.pi / P) - 1.0) B = 2.0 * (2.0 ** (1.0 / N) - 1.0) * (np.cos(2.0 * np.pi / P) - 1.0)
C = - B C = - B
alpha = (-B - np.sqrt(B ** 2.0 - 4.0 * A * C)) / (2.0 * A) alpha = (-B - np.sqrt(B ** 2.0 - 4.0 * A * C)) / (2.0 * A)
b = np.array([1.0 - alpha / 2.0, -(1.0 - alpha / 2.0)]) b = np.array([1.0 - alpha / 2.0, -(1.0 - alpha / 2.0)])
a = np.array([1.0, - (1.0 - alpha)]) a = np.array([1.0, - (1.0 - alpha)])
Y = self.X - self.X[0, :] Y = self.data - self.data[0, :]
for i in range(N): for i in range(N):
Y, self.idx = filter_data(Y, b, a) Y, self.idx = filter_data(Y, b, a)
return Y return Y
@ -306,7 +308,7 @@ class Filter:
alpha = 1.0 / gamma - np.sqrt(1.0 / gamma ** 2 - 1.0) alpha = 1.0 / gamma - np.sqrt(1.0 / gamma ** 2 - 1.0)
b = np.array([(1.0 - alpha) / 2.0, 0.0, - (1.0 - alpha) / 2.0]) b = np.array([(1.0 - alpha) / 2.0, 0.0, - (1.0 - alpha) / 2.0])
a = np.array([1.0, - beta * (1.0 + alpha), alpha]) a = np.array([1.0, - beta * (1.0 + alpha), alpha])
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def BandStop(self, P=5, delta=0.3): def BandStop(self, P=5, delta=0.3):
@ -322,7 +324,7 @@ class Filter:
b = np.array([(1.0 + alpha) / 2.0, - beta * (1.0 + alpha), b = np.array([(1.0 + alpha) / 2.0, - beta * (1.0 + alpha),
(1.0 + alpha) / 2.0]) (1.0 + alpha) / 2.0])
a = np.array([1.0, -beta * (1.0 + alpha), alpha]) a = np.array([1.0, -beta * (1.0 + alpha), alpha])
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def ZEMA1(self, N=10, alpha=None, K=1.0, Vn=5): def ZEMA1(self, N=10, alpha=None, K=1.0, Vn=5):
@ -337,7 +339,7 @@ class Filter:
b[0] = alpha * (1.0 + K) b[0] = alpha * (1.0 + K)
b[Vn] = - alpha * K b[Vn] = - alpha * K
a = np.array([1.0, - (1.0 - alpha)]) a = np.array([1.0, - (1.0 - alpha)])
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def ZEMA2(self, N=10, alpha=None, K=1.0): def ZEMA2(self, N=10, alpha=None, K=1.0):
@ -350,7 +352,7 @@ class Filter:
alpha = 2.0 / (N + 1.0) alpha = 2.0 / (N + 1.0)
b = np.array([alpha * (1.0 + K)]) b = np.array([alpha * (1.0 + K)])
a = np.array([1.0, alpha * K - (1.0 - alpha)]) a = np.array([1.0, alpha * K - (1.0 - alpha)])
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def InstTrend(self, N=10, alpha=None): def InstTrend(self, N=10, alpha=None):
@ -364,7 +366,7 @@ class Filter:
b = np.array([alpha - alpha ** 2.0 / 4.0, alpha ** 2.0 / 2.0, b = np.array([alpha - alpha ** 2.0 / 4.0, alpha ** 2.0 / 2.0,
- alpha + 3.0 * alpha ** 2.0 / 4.0]) - alpha + 3.0 * alpha ** 2.0 / 4.0])
a = np.array([1.0, - 2.0 * (1.0 - alpha), (1.0 - alpha) ** 2.0]) a = np.array([1.0, - 2.0 * (1.0 - alpha), (1.0 - alpha) ** 2.0])
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y return Y
def SincFunction(self, N=10, nel=10): def SincFunction(self, N=10, nel=10):
@ -378,8 +380,8 @@ class Filter:
k = np.arange(1, nel) k = np.arange(1, nel)
b[1:] = np.sin(np.pi * k / N) / (np.pi * k) b[1:] = np.sin(np.pi * k / N) / (np.pi * k)
a = np.array([1.0]) a = np.array([1.0])
Y, self.idx = filter_data(self.X, b, a) Y, self.idx = filter_data(self.data, b, a)
return Y, b, a return Y
def Decycler(self, P=10): def Decycler(self, P=10):
""" """
@ -389,10 +391,10 @@ class Filter:
Must be P > 4. If not returns the unfiltered dataset. Must be P > 4. If not returns the unfiltered dataset.
""" """
if (P < 5): if (P < 5):
Y = self.X.copy() print("Warning: data returned unfiltered (P < 5)")
self.idx = 0 self.idx = 0
return Y return self.data
Y = self.X - self.GaussHigh(N=1, P=P) Y = self.data - self.GaussHigh(N=1, P=P)
return Y return Y
def DecyclerOsc(self, P1=5, P2=10): def DecyclerOsc(self, P1=5, P2=10):
@ -405,9 +407,111 @@ class Filter:
""" """
P_low = np.amin([P1, P2]) P_low = np.amin([P1, P2])
P_high = np.amax([P1, P2]) P_high = np.amax([P1, P2])
if (P1 < 5): if (P_low < 5):
Y = self.X.copy() print("Warning: data returned unfiltered (P_low < 5)")
self.idx = 0 self.idx = 0
return Y return self.data
Y = self.GaussHigh(N=2, P=P_low) - self.GaussHigh(N=2, P=P_high) Y = self.GaussHigh(N=2, P=P_low) - self.GaussHigh(N=2, P=P_high)
return Y return Y
def ABG(self, alpha=0.0, beta=0.0, gamma=0.0, dt=1.0):
"""
alpha-beta-gamma
For numerical stability: 0 < alpha, beta < 1
"""
# If necessary change scalars to arrays
if (np.ndim(alpha) == 0):
alpha = np.ones(self.n_samples) * alpha
if (np.ndim(beta) == 0):
beta = np.ones(self.n_samples) * beta
if (np.ndim(gamma) == 0):
gamma = np.ones(self.n_samples) * gamma
# Initialize
Y_corr = self.data.copy()
Y_pred = self.data.copy()
x0 = self.data[0,:]
v0 = np.zeros(self.n_series)
a0 = np.zeros(self.n_series)
for i in range(1, self.n_samples):
# Predictor (predicts state in <i>)
x_pred = x0 + dt * v0 + 0.5 * a0 * dt ** 2
v_pred = v0 + dt * a0
a_pred = a0
Y_pred[i, :] = x_pred
# Residual (innovation)
r = self.data[i, :] - x_pred
# Corrector (corrects state in <i>)
x_corr = x_pred + alpha[i] * r
v_corr = v_pred + (beta[i] / dt) * r
a_corr = a_pred + (2.0 * gamma[i] / dt ** 2) *r
# Save value and prepare next iteration
Y_corr[i, :] = x_corr
x0 = x_corr
v0 = v_corr
a0 = a_corr
self.idx = 1
return Y_corr, Y_pred
def Kalman(self, sigma_x, sigma_v, dt, abg_type="abg"):
"""
Steady-state Kalman filter (also limited to one-dimension)
"""
L = (sigma_x / sigma_v) * dt ** 2
# Alpha filter
if (abg_type == 'a'):
alpha = (-L ** 2 + np.sqrt(L ** 4 + 16.0 * L ** 2)) / 8.0
beta = 0.0
gamma = 0.0
# Alpha-Beta filter
elif(abg_type == 'ab'):
r = (4.0 + L - np.sqrt(8.0 * L + L ** 2)) / 4.0
alpha = 1.0 - r ** 2
beta = 2.0 * (2.0 - alpha) - 4.0 * np.sqrt(1.0 - alpha)
gamma = 0.0
#Alpha-Beta-Gamma filter
else:
b = (L / 2.0) - 3.0
c = (L / 2.0) + 3.0
d = -1.0
p = c - b **2 /3.0
q = 2.0 * b **3 /27.0 - b * c /3.0 + d
v = np.sqrt(q ** 2 + 4.0 * p ** 3 / 27.0)
z = - (q + v / 2.0) ** (1.0 / 3.0)
s = z - p / (3.0 * z) - b / 3.0
alpha = 1.0 - s ** 2
beta = 2.0 * (1 - s) ** 2
gamma = beta ** 2 / (2.0 * alpha)
# Apply filter
Y = self.abg(alpha=alpha, beta=beta, gamma=gamma, dt=dt)
return Y
"""
correction = update = measurement
prediction = motion
X (n_states, 1) State estimate
P (n_states, n_states) Covariance estimate
F (n_states, n_states) State transition model
Z (n_obs, 1) Observations
H (n_obs, n_states) Observation model
R (n_obs, n_obs) Covariance of the observation noise
S (n_obs, n_obs) Covariance of the observation residual
K (n_states, n_obs) Optimal Kalman gain
Q (n_states, n_states) Covariance of the process noise matrix
Y (n_obs, 1) Observation residual (innovation)
"""

Wyświetl plik

@ -4,22 +4,22 @@ Filters for time series.
Copyright (c) 2020 Gabriele Gilardi Copyright (c) 2020 Gabriele Gilardi
ToDo: ToDo:
- use NaN/input values for points not filtered?
- return idx?
- util to test filter (impulse, utils)
- warning in filter when wrong order? or save flag with true/false if computed
- use self.a and self.b
- remove a and b from plots
- in comments write what filters do - in comments write what filters do
- is necessary to copy X for Y untouched? - is necessary to copy X for Y untouched?
- decide default values in functions - decide default values in functions
- check conditions on P and N - check conditions on P and N
- why lag plot gives errors
- fix plotting function
- example for alpha-beta-gamma using variable sigma as in financial time series
(see Ehler)
- example using noisy multi-sine-waves
""" """
import sys import sys
import numpy as np import numpy as np
import filters as flt import filters as flt
import utils as utl import utils as utl
import matplotlib.pyplot as plt
# Read data to filter # Read data to filter
if len(sys.argv) != 2: if len(sys.argv) != 2:
@ -42,15 +42,19 @@ spx = flt.Filter(data)
# aa = np.array([1.0, alpha - 1.0]) # aa = np.array([1.0, alpha - 1.0])
res, bb, aa = spx.SincFunction(2, 50) # res, bb, aa = spx.SincFunction(2, 50)
print(bb) # print(bb)
print(aa) # print(aa)
utl.plot_frequency_response(bb, aa) # utl.plot_frequency_response(bb, aa)
utl.plot_lag_response(bb, aa) # utl.plot_lag_response(bb, aa)
# sigma_x = 0.1
# sigma_v = 0.1 * np.ones(n_samples)
# res = spx.Kalman(sigma_x=sigma_x, sigma_v=sigma_v, dt=1.0, abg_type="abg")
alpha = 0.5
beta = 0.005
gamma = 0.0
Yc, Yp = spx.ABG(alpha=alpha, beta=beta, gamma=gamma, dt=1.0)
signals = (spx.data[:, 0], Yc[:, 0], Yp[:, 0])
utl.plot_signals(signals, 0, 50)
# res = spx.DecyclerOsc(30, 60)
# print(res[0:10, :])
signals = (spx.X, res)
print(spx.idx)
utl.plot_signals(signals)
# print(spx.X[0:20])

Wyświetl plik

@ -63,61 +63,20 @@ def scale_data(X, param=()):
return Xs, param return Xs, param
def calc_rmse(a, b): def plot_signals(signals, idx_start=0, idx_end=None):
"""
Calculates the root-mean-square-error of arrays <a> and <b>. If the arrays
are multi-column, the RMSE is calculated as all the columns are one single
vector.
"""
# Convert to (n, ) dimension
a = a.flatten()
b = b.flatten()
# Root-mean-square-error
rmse = np.sqrt(((a - b) ** 2).sum() / len(a))
return rmse
def calc_corr(a, b):
"""
Calculates the correlation between arrays <a> and <b>. If the arrays are
multi-column, the correlation is calculated as all the columns are one
single vector.
"""
# Convert to (n, ) dimension
a = a.flatten()
b = b.flatten()
# Correlation
corr = np.corrcoef(a, b)[0, 1]
return corr
def calc_accu(a, b):
"""
Calculates the accuracy (in %) between arrays <a> and <b>. The two arrays
must be column/row vectors.
"""
# Convert to (n, ) dimension
a = a.flatten()
b = b.flatten()
# Correlation
accu = 100.0 * (a == b).sum() / len(a)
return accu
def plot_signals(signals):
""" """
""" """
if (idx_end is None):
idx_end = len(signals[0])
t = np.arange(idx_start, idx_end)
names = []
count = 0
for signal in signals: for signal in signals:
plt.plot(signal) plt.plot(t, signal[idx_start:idx_end])
names.append(str(count))
count += 1
plt.grid(b=True) plt.grid(b=True)
plt.xlim(0, 100) plt.legend(names)
plt.show() plt.show()
@ -138,8 +97,6 @@ def plot_frequency_response(b, a=1.0):
# plt.ylim(-40.0, 0.0) # plt.ylim(-40.0, 0.0)
plt.xlabel('$\omega$ [rad/sample]') plt.xlabel('$\omega$ [rad/sample]')
plt.ylabel('$h$ [db]') plt.ylabel('$h$ [db]')
plt.title('b = ' + np.array_str(np.around(b, decimals=2)) \
+ ', a = ' + np.array_str(np.around(a, decimals=2)))
plt.show() plt.show()
@ -157,128 +114,53 @@ def plot_lag_response(b, a=1.0):
plt.xlim(np.amin(wf), np.amax(wf)) plt.xlim(np.amin(wf), np.amax(wf))
plt.xlabel('$\omega$ [rad/sample]') plt.xlabel('$\omega$ [rad/sample]')
plt.ylabel('$gd$ [samples]') plt.ylabel('$gd$ [samples]')
plt.title('b = ' + np.array_str(np.around(b, decimals=2)) \
+ ', a = ' + np.array_str(np.around(a, decimals=2)))
plt.show() plt.show()
def synthetic_wave(per, amp=None, pha=None, tim=None):
"""
Generates a multi-sinewave.
P = [ P1 P2 ... Pn ] Periods
varargin = A, T, PH Amplitudes, time, phases
A = [ A1 A2 ... An ] Amplitudes
T = [ ts tf dt] Time info: from ts to tf in dt steps
PH = [PH1 PH2 ... PHn] Phases (rad)
# function [y,t] = interpSignal(x,n1,n2,varargin) Av = SUM(1 to n) of [ A*sin(2*pi*f*t + PH) ]
# if (nargin == 4) Tv Time (ts to tf with step dt)
# dt = varargin{1};
# else
# dt = 20;
# end
# N = length(x);
# Ts = (n2-n1)/(N-1);
# n = n1:Ts:n2;
# t = n1:Ts/dt:n2;
# nt = length(t);
# y = zeros(1,nt);
# for i = 1:nt
# a = x.*sinc( (t(i)-n)/Ts );
# y(i) = sum(a);
# end
# end
# function [f] = plotFn(type,func,varargin)
# % 0 Plot real function
# % 1 Plot real/imag values
# % 2 Plot real/imag values in polar form
# % 3 Plot magnitude/phase
# clf
# tiny = 1e-7;
# % Check if func is a function or data
# if ( isa(func,"function_handle") )
# N = varargin{1};
# n = (0:N-1)';
# f = func(n);
# else
# N = length(func);
# n = (0:N-1)';
# f = func;
# end
# % Clean data
# xRe = real(f);
# xIm = imag(f);
# xRe( abs(xRe) < tiny ) = 0;
# xIm( abs(xIm) < tiny ) = 0;
# switch (type)
# % Plot real function
# case 0
# stem(n,xRe,"b","filled")
# xlim([n(1) n(N)])
# grid on
# xlabel("n")
# ylabel("f")
# box on
# % Plot real/imag function
# case 1
# subplot(2,1,1)
# stem(n,xRe,"b","filled")
# xlim([n(1) n(N)])
# grid on
# xlabel("n")
# ylabel("Re")
# box on
# subplot(2,1,2)
# stem(n,xIm,"b","filled")
# xlim([n(1) n(N)])
# grid on
# xlabel("n")
# ylabel("Im")
# box on
# % Plot real/imag function in polar form
# case 2
# scatter(xRe,xIm,"b","filled")
# maxRe = max( abs(xRe) );
# maxIm = max( abs(xIm) );
# m = max(maxRe,maxIm);
# dx = 2*m/50;
# text(xRe+dx,xIm,num2str(n))
# xlim( [-m +m ])
# ylim( [-m +m ])
# axis("square")
# grid on
# hold on
# plot([-m 0; +m 0],[0 -m; 0 +m],"k")
# hold off
# xlabel("Real")
# ylabel("Imag")
# box on
# % Plot magnitude/phase
# case 3
# xMa = sqrt( xRe.^2 + xIm.^2 );
# xAr = atan2(xIm,xRe);
# subplot(2,1,1)
# stem(n,xMa,"b","filled")
# xlim([n(1) n(N)])
# grid on
# xlabel("n")
# ylabel("Magnitude")
# box on
# subplot(2,1,2)
# stem(n,xAr,"b","filled")
# xlim([n(1) n(N)])
# ylim([-pi pi])
# grid on
# xlabel("n")
# ylabel("Phase [rad]")
# box on
# end Default amplitudes are ones
Default time is from 0 to largest period (1000 steps)
# end Default phases are zeros
"""
n_waves = len(per)
per = np.asarray(per)
# Check for amplitudes, times, and phases
if (amp is None):
amp = np.ones(n_waves)
else:
amp = np.asarray(amp)
if (tim is None):
t_start = 0.0
t_end = np.amax(per)
n_steps = 500
else:
t_start = tim[0]
t_end = tim[1]
n_steps = int(tim[2])
if (pha is None):
pha = np.zeros(n_waves)
else:
pha = np.asarray(pha)
# Add all the waves
t = np.linspace(t_start, t_end, num=n_steps)
f = np.zeros(len(t))
for i in range(n_waves):
f = f + amp[i] * np.sin(2.0 * np.pi * t / per[i] + pha[i])
return t, f
# function sP = SyntQT(P,type) # function sP = SyntQT(P,type)
@ -406,60 +288,5 @@ def plot_lag_response(b, a=1.0):
# end % End of function # end % End of function
# function [Tv,Av] = SyntWave(P,varargin)
# % Function: generate a multi-sinewave
# %
# % Inputs
# % ------
# % P = [ P1 P2 ... Pn ] Periods
# % varargin = A, T, PH Amplitudes, time, phases
# %
# % A = [ A1 A2 ... An ] Amplitudes
# % T = [ ts tf dt] Time info: from ts to tf with step dt
# % PH = [PH1 PH2 ... PHn] Phases (rad)
# %
# % Outputs
# % -------
# % Av = SUM(1 to n) of [ A*sin(2*pi*f*t + PH) ]
# % Tv Time (ts to tf with step dt)
# %
# % Default amplitudes are ones
# % Default time is from 0 to largest period (1000 steps)
# % Default phases are zeros
# % Check for arguments
# if (nargin == 1)
# np = length(P);
# A = ones(1,np);
# T = [0 max(P) max(P)/1000];
# PH = zeros(1,np);
# elseif (nargin == 2)
# np = length(P);
# A = varargin{1};
# T = [0 max(P) max(P)/1000];
# PH = zeros(1,np);
# elseif (nargin == 3)
# np = length(P);
# A = varargin{1};
# T = varargin{2};
# PH = zeros(1,np);
# elseif (nargin == 4)
# np = length(P);
# A = varargin{1};
# T = varargin{2};
# PH = varargin{3};
# else
# fprintf(1,'\n');
# error('Wrong number of arguments');
# end
# % Add all waves
# Tv = T(1):T(3):T(2);
# Av = zeros(1,length(Tv));
# for j = 1:np
# Av = Av + A(j)*sin(2*pi*Tv/P(j)+PH(j));
# end
# end % End of function

Wyświetl plik

@ -2,9 +2,9 @@
## Reference ## Reference
- [1] John F. Ehlers, "[Cycle Analytics for Traders: Advanced Technical Trading Concepts](http://www.mesasoftware.com/ehlers_books.htm)". - John F. Ehlers, "[Cycle Analytics for Traders: Advanced Technical Trading Concepts](http://www.mesasoftware.com/ehlers_books.htm)".
- [2] John F. Ehlers, "[Signal Analysis, Filters And Trading Strategies](http://www.mesasoftware.com/ehlers_technical_papers.htm)". - Wikipedia, "[Alpha beta filter](https://en.wikipedia.org/wiki/Alpha_beta_filter)".
## Characteristics ## Characteristics