kopia lustrzana https://github.com/gabrielegilardi/SignalFilters
add Kalman filter
rodzic
1f8e47a34f
commit
8fef296eba
|
@ -31,153 +31,153 @@
|
||||||
|
|
||||||
# -------------------------------------------------------
|
# -------------------------------------------------------
|
||||||
|
|
||||||
import numpy as np
|
# import numpy as np
|
||||||
|
|
||||||
measurements = [ 1., 2., 3. ]
|
# measurements = [ 1., 2., 3. ]
|
||||||
dt = 1.
|
# dt = 1.
|
||||||
|
|
||||||
# Initial state (location and velocity)
|
# # Initial state (location and velocity)
|
||||||
x = np.array([[ 0. ],
|
# x = np.array([[ 0. ],
|
||||||
[ 0. ]])
|
# [ 0. ]])
|
||||||
# Initial uncertainty
|
# # Initial uncertainty
|
||||||
P = np.array([[ 1000., 0. ],
|
# P = np.array([[ 1000., 0. ],
|
||||||
[ 0., 1000. ]])
|
# [ 0., 1000. ]])
|
||||||
# External motion
|
# # External motion
|
||||||
U = np.array([[ 0. ],
|
# U = np.array([[ 0. ],
|
||||||
[ 0. ]])
|
# [ 0. ]])
|
||||||
# Next state function
|
# # Next state function
|
||||||
F = np.array([[ 1., dt ],
|
# F = np.array([[ 1., dt ],
|
||||||
[ 0., 1. ]])
|
# [ 0., 1. ]])
|
||||||
# Measurement function
|
# # Measurement function
|
||||||
H = np.array([[ 1., 0. ]])
|
# H = np.array([[ 1., 0. ]])
|
||||||
# Measurement uncertainty
|
# # Measurement uncertainty
|
||||||
R = np.array([[ 1. ]])
|
# R = np.array([[ 1. ]])
|
||||||
# Identity matrix
|
# # Identity matrix
|
||||||
I = np.eye(2)
|
# I = np.eye(2)
|
||||||
|
|
||||||
|
|
||||||
def filter(x, P):
|
# def filter(x, P):
|
||||||
|
|
||||||
step = 0
|
# step = 0
|
||||||
for z in (measurements):
|
# for z in (measurements):
|
||||||
step += 1
|
# step += 1
|
||||||
print("step = ", step, " meas. = ", z)
|
# print("step = ", step, " meas. = ", z)
|
||||||
|
|
||||||
# Measurement
|
# # Measurement
|
||||||
Htra = H.T
|
# Htra = H.T
|
||||||
S = H.dot(P.dot(Htra)) + R
|
# S = H.dot(P.dot(Htra)) + R
|
||||||
Sinv = np.linalg.inv(S)
|
# Sinv = np.linalg.inv(S)
|
||||||
K = P.dot(Htra.dot(Sinv))
|
# K = P.dot(Htra.dot(Sinv))
|
||||||
y = z - H.dot(x)
|
# y = z - H.dot(x)
|
||||||
xp = x +K.dot(y)
|
# xp = x +K.dot(y)
|
||||||
Pp = P - K.dot(H.dot(P))
|
# Pp = P - K.dot(H.dot(P))
|
||||||
|
|
||||||
# Prediction
|
# # Prediction
|
||||||
x = F.dot(xp) + U
|
# x = F.dot(xp) + U
|
||||||
Ftra = F.T
|
# Ftra = F.T
|
||||||
P = F.dot(Pp.dot(Ftra))
|
# P = F.dot(Pp.dot(Ftra))
|
||||||
|
|
||||||
print('x =')
|
# print('x =')
|
||||||
print(x)
|
# print(x)
|
||||||
print('P =')
|
# print('P =')
|
||||||
print(P)
|
# print(P)
|
||||||
|
|
||||||
filter(x, P)
|
# filter(x, P)
|
||||||
|
|
||||||
# # -------------------------------------------------------
|
# # -------------------------------------------------------
|
||||||
|
|
||||||
# import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
# # x0 = 4.
|
x0 = 4.
|
||||||
# # y0 = 12.
|
y0 = 12.
|
||||||
# # measurements = np.array([[ 5., 10. ],
|
measurements = np.array([[ 5., 10. ],
|
||||||
# # [ 6., 8. ],
|
[ 6., 8. ],
|
||||||
# # [ 7., 6. ],
|
[ 7., 6. ],
|
||||||
# # [ 8., 4. ],
|
[ 8., 4. ],
|
||||||
# # [ 9., 2. ],
|
[ 9., 2. ],
|
||||||
# # [ 10., 0. ]])
|
[ 10., 0. ]])
|
||||||
# # x0 = -4.
|
# x0 = -4.
|
||||||
# # y0 = 8.
|
# y0 = 8.
|
||||||
# # measurements = np.array([[ 1., 4. ],
|
# measurements = np.array([[ 1., 4. ],
|
||||||
# # [ 6., 0. ],
|
# [ 6., 0. ],
|
||||||
# # [ 11., -4. ],
|
# [ 11., -4. ],
|
||||||
# # [ 16., -8. ]])
|
# [ 16., -8. ]])
|
||||||
# # x0 = 1.
|
# x0 = 1.
|
||||||
# # y0 = 19.
|
# y0 = 19.
|
||||||
# # measurements = np.array([[ 1., 17. ],
|
# measurements = np.array([[ 1., 17. ],
|
||||||
# # [ 1., 15. ],
|
# [ 1., 15. ],
|
||||||
# # [ 1., 13. ],
|
# [ 1., 13. ],
|
||||||
# # [ 1., 11. ]])
|
# [ 1., 11. ]])
|
||||||
# x0 = 1.
|
# x0 = 1.
|
||||||
# y0 = 19.
|
# y0 = 19.
|
||||||
# measurements = np.array([[ 2., 17. ],
|
# measurements = np.array([[ 2., 17. ],
|
||||||
# [ 0., 15. ],
|
# [ 0., 15. ],
|
||||||
# [ 2., 13. ],
|
# [ 2., 13. ],
|
||||||
# [ 0., 11. ]])
|
# [ 0., 11. ]])
|
||||||
# # Time step
|
# Time step
|
||||||
# dt = 0.1
|
dt = 0.1
|
||||||
# # Initial state (location and velocity)
|
# Initial state (location and velocity)
|
||||||
# x = np.array([[ x0 ],
|
x = np.array([[ x0 ],
|
||||||
# [ y0 ],
|
[ y0 ],
|
||||||
# [ 0. ],
|
[ 0. ],
|
||||||
# [ 0. ]])
|
[ 0. ]])
|
||||||
# # Initial uncertainty
|
# Initial uncertainty
|
||||||
# P = np.array([[ 0., 0., 0., 0. ],
|
P = np.array([[ 0., 0., 0., 0. ],
|
||||||
# [ 0., 0., 0., 0. ],
|
[ 0., 0., 0., 0. ],
|
||||||
# [ 0., 0., 1000., 0. ],
|
[ 0., 0., 1000., 0. ],
|
||||||
# [ 0., 0., 0., 1000. ]])
|
[ 0., 0., 0., 1000. ]])
|
||||||
# # External motion
|
# External motion
|
||||||
# U = np.array([[ 0. ],
|
U = np.array([[ 0. ],
|
||||||
# [ 0. ],
|
[ 0. ],
|
||||||
# [ 0. ],
|
[ 0. ],
|
||||||
# [ 0. ]])
|
[ 0. ]])
|
||||||
# # Next state function
|
# Next state function
|
||||||
# F = np.array([[ 1., 0., dt, 0. ],
|
F = np.array([[ 1., 0., dt, 0. ],
|
||||||
# [ 0., 1., 0., dt ],
|
[ 0., 1., 0., dt ],
|
||||||
# [ 0., 0., 1., 0. ],
|
[ 0., 0., 1., 0. ],
|
||||||
# [ 0., 0., 0., 1. ]])
|
[ 0., 0., 0., 1. ]])
|
||||||
# # Measurement function
|
# Measurement function
|
||||||
# H = np.array([[ 1., 0., 0., 0. ],
|
H = np.array([[ 1., 0., 0., 0. ],
|
||||||
# [ 0., 1., 0., 0. ]])
|
[ 0., 1., 0., 0. ]])
|
||||||
# # Measurement uncertainty
|
# Measurement uncertainty
|
||||||
# R = np.array([[ 0.1, 0. ],
|
R = np.array([[ 0.1, 0. ],
|
||||||
# [ 0. , 0.1 ]])
|
[ 0. , 0.1 ]])
|
||||||
# # Measurement vector
|
# Measurement vector
|
||||||
# z = np.zeros((2,1))
|
z = np.zeros((2,1))
|
||||||
|
|
||||||
|
|
||||||
# def filter(x, P):
|
def filter(x, P):
|
||||||
|
|
||||||
# for n in range(len(measurements)):
|
for n in range(len(measurements)):
|
||||||
|
|
||||||
# z[0][0] = measurements[n][0]
|
z[0][0] = measurements[n][0]
|
||||||
# z[1][0] = measurements[n][1]
|
z[1][0] = measurements[n][1]
|
||||||
|
|
||||||
# # Prediction
|
# Prediction
|
||||||
# xp = F.dot(x) + U
|
xp = F.dot(x) + U
|
||||||
# Ftra = F.T
|
Ftra = F.T
|
||||||
# Pp = F.dot(P.dot(Ftra))
|
Pp = F.dot(P.dot(Ftra))
|
||||||
|
|
||||||
# # Measurement
|
# Measurement
|
||||||
# Htra = H.T
|
Htra = H.T
|
||||||
# S = H.dot(Pp.dot(Htra)) + R
|
S = H.dot(Pp.dot(Htra)) + R
|
||||||
# Sinv = np.linalg.inv(S)
|
Sinv = np.linalg.inv(S)
|
||||||
# K = Pp.dot(Htra.dot(Sinv))
|
K = Pp.dot(Htra.dot(Sinv))
|
||||||
# y = z - H.dot(xp)
|
y = z - H.dot(xp)
|
||||||
# x = xp +K.dot(y)
|
x = xp +K.dot(y)
|
||||||
# P = Pp - K.dot(H.dot(Pp))
|
P = Pp - K.dot(H.dot(Pp))
|
||||||
# # print(z)
|
# print(z)
|
||||||
# # print('x = ')
|
# print('x = ')
|
||||||
# # print(x)
|
# print(x)
|
||||||
# # print('P = ')
|
# print('P = ')
|
||||||
# # print(P)
|
# print(P)
|
||||||
# # print(' ')
|
# print(' ')
|
||||||
|
|
||||||
# return x, P
|
return x, P
|
||||||
|
|
||||||
|
|
||||||
# x_final, P_final = filter(x, P)
|
x_final, P_final = filter(x, P)
|
||||||
# print('x = ')
|
print('x = ')
|
||||||
# print(x_final)
|
print(x_final)
|
||||||
# print('P = ')
|
print('P = ')
|
||||||
# print(P_final)
|
print(P_final)
|
||||||
|
|
|
@ -1,45 +1,92 @@
|
||||||
"""
|
"""
|
||||||
- generalize to N and M
|
|
||||||
|
|
||||||
M = 1
|
correction = update = measurement
|
||||||
N = 2
|
prediction = motion
|
||||||
|
|
||||||
|
X (n_states, 1) State vector
|
||||||
|
P (n_states, n_states) Covariance matrix of X
|
||||||
|
F (n_states, n_states) State transition matrix
|
||||||
|
U (n_states, 1) Input/control/drift vector
|
||||||
|
Z (n_meas, 1) Measurament vector
|
||||||
|
H (n_meas, n_states) Measurament matrix
|
||||||
|
R (n_meas, n_meas) Covariance matrix of Z
|
||||||
|
S (n_meas, n_meas) Covariance matrix (?)
|
||||||
|
K (n_states, m) Kalman gain
|
||||||
|
Q (n_states, n_states) Covariance matrix (?)
|
||||||
|
|
||||||
|
Data (n_meas, n_samples) Measurements
|
||||||
|
Fext (n_states, n_samples) External driver
|
||||||
|
X0 (n_states, 1) Initial state vector
|
||||||
|
P0 (n_states, n_states) Initial covariance matrix of X
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
dt = 1.
|
|
||||||
|
|
||||||
measurements = np.array([ 1., 2., 3., 4., 5., 6., 7., 8, 9, 10])
|
class KalmanFilter:
|
||||||
|
|
||||||
# State vector
|
def __init__(self, F, H, Q, R):
|
||||||
# (N, 1)
|
"""
|
||||||
x = np.array([[ 0. ],
|
"""
|
||||||
[ 0. ]])
|
self.F = F
|
||||||
|
self.Q = Q
|
||||||
|
self.H = H
|
||||||
|
self.R = R
|
||||||
|
|
||||||
# Prediction uncertainty (covariance matrix of x)
|
|
||||||
# (N, N)
|
def prediction(self, X, P, U):
|
||||||
P = np.array([[ 1000., 0. ],
|
X = self.F @ X + U
|
||||||
[ 0., 1000. ]])
|
P = self.F @ P @ self.F.T + self.Q
|
||||||
|
return X, P
|
||||||
|
|
||||||
|
|
||||||
|
def update(self, X, P, Z):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
S = self.H @ P @ self.H.T + self.R
|
||||||
|
K = P @ self.H.T @ np.linalg.inv(S)
|
||||||
|
X = X + K @ (Z - self.H @ X)
|
||||||
|
P = P - K @ self.H @ P
|
||||||
|
return X, P
|
||||||
|
|
||||||
|
|
||||||
|
def applyFilter(self, Data, Fext, X0, P0):
|
||||||
|
"""
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Measurements
|
||||||
|
data = np.array([[5., 6., 7., 8., 9., 10.],
|
||||||
|
[10., 8., 6., 4., 2., 0.]])
|
||||||
|
|
||||||
|
# Initial state vector
|
||||||
|
X0 = np.array([[4. ],
|
||||||
|
[12.],
|
||||||
|
[0. ],
|
||||||
|
[0. ]])
|
||||||
|
|
||||||
|
# Initial covariance matrix of X
|
||||||
|
P0 = np.array([[0., 0., 0., 0.],
|
||||||
|
[0., 0., 0., 0.],
|
||||||
|
[0., 0., 1000., 0.],
|
||||||
|
[0., 0., 0., 1000.]])
|
||||||
|
|
||||||
# External motion
|
# External motion
|
||||||
# (N, 1)
|
Fext = np.zeros_like(data)
|
||||||
U = np.array([[ 0. ],
|
|
||||||
[ 0. ]])
|
|
||||||
|
|
||||||
# Update matrix (state transition matrix)
|
# Next state function
|
||||||
# (N, N)
|
dt = 0.1
|
||||||
F = np.array([[ 1., dt ],
|
F = np.array([[ 1., 0., dt, 0. ],
|
||||||
[ 0., 1. ]])
|
[ 0., 1., 0., dt ],
|
||||||
|
[ 0., 0., 1., 0. ],
|
||||||
# Measurement function (extraction matrix)
|
[ 0., 0., 0., 1. ]])
|
||||||
# (M, N)
|
# Measurement function
|
||||||
H = np.array([[ 1., 0. ]])
|
H = np.array([[ 1., 0., 0., 0. ],
|
||||||
|
[ 0., 1., 0., 0. ]])
|
||||||
# Measurement uncertainty/noise (covariance matrix of z)
|
# Measurement uncertainty
|
||||||
# (M, M)
|
R = np.array([[ 0.1, 0. ],
|
||||||
R = np.array([[ 1. ]])
|
[ 0. , 0.1 ]])
|
||||||
|
|
||||||
# z = measurament vector
|
|
||||||
# (M, 1)
|
|
||||||
|
|
||||||
def filter(x, P):
|
def filter(x, P):
|
||||||
|
|
||||||
|
@ -48,16 +95,8 @@ def filter(x, P):
|
||||||
step += 1
|
step += 1
|
||||||
print("step = ", step, " meas. = ", z)
|
print("step = ", step, " meas. = ", z)
|
||||||
|
|
||||||
# Measurement
|
# Update
|
||||||
S = H @ P @ H.T + R # (M, M)
|
|
||||||
K = P @ H.T @ np.linalg.inv(S) # (N, M)
|
|
||||||
y = z - H @ x
|
|
||||||
xp = x + K @ y
|
|
||||||
Pp = P - K @ H @ P
|
|
||||||
|
|
||||||
# Prediction
|
|
||||||
x = F @ xp + U
|
|
||||||
P = F @ Pp @ F.T
|
|
||||||
|
|
||||||
print('x =')
|
print('x =')
|
||||||
print(x)
|
print(x)
|
||||||
|
|
Ładowanie…
Reference in New Issue