Skip to content

Commit

Permalink
Adds UKF implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
leocencetti committed Dec 14, 2020
1 parent 0a48c85 commit d63ad34
Show file tree
Hide file tree
Showing 3 changed files with 337 additions and 45 deletions.
1 change: 1 addition & 0 deletions FILTERS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#Filters
109 changes: 64 additions & 45 deletions aukf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# file created by Leonardo Cencetti on 11/24/20
import numpy as np
from scipy.linalg import block_diag, sqrtm

import models.actrv_model as actrv


Expand Down Expand Up @@ -37,9 +38,11 @@ def __init__(self, state_dimension: int, noise_dimension: int, output_dimension:
self.state_mean = None
self.state_covariance = None

# initialize output transition
self._output_initialized = False
# initialize transition function
self._io_initialized = False
self._output_transition = None
self._state_transition = None
self._postprocessing = lambda x: x

# initialize noise
self.noise_mean = None
Expand Down Expand Up @@ -79,19 +82,42 @@ def init_state(self, initial_state_mean: np.ndarray, initial_state_covariance: n

self._state_initialized = True

def init_io(self, output_transition: callable):
def init_io(self, state_transition: callable, output_transition: callable, postprocessing: callable = lambda x: x):
"""
Initializes the output transition
:param callable output_transition: output transition function, input: state vector
:param callable state_transition: state transition function, equivalent to A matrix. Must have the following
signature:
```python
state_transition(
delta_t: float,
current_augmented_state: numpy.ndarray[augmented_state_dimension, 1]
) -> numpy.ndarray[state_dimension, 1]
```
:param callable output_transition: output transition function, equivalent to C matrix. Takes the current state
as input and returns the current output (Y). Must have the following signature:
```python
output_transition(
current_state: numpy.ndarray[state_dimension, 1]
) -> numpy.ndarray[output_dimension, 1]
```
:param callable postprocessing: postprocessing function to be called after each call to the previous functions
(e.g. to wrap the values to a specific range). Takes the state as input and returns the processed state.
Must have the following signature:
```python
postprocessing(
current_state: numpy.ndarray[state_dimension, 1]
) -> numpy.ndarray[state_dimension, 1]
"""
self._state_transition = state_transition
self._output_transition = output_transition
self._output_initialized = True
self._postprocessing = postprocessing
self._io_initialized = True

def reset(self):
"""
Resets the filter to its initial conditions
"""
self._state_initialized = self._output_initialized = False
self._state_initialized = self._io_initialized = False
self.state_covariance = self.noise_mean = self.noise_covariance = \
self.aug_state_mean = self.aug_state_covariance = None

Expand All @@ -105,16 +131,15 @@ def step(self, deltaT: float, measurement_mean: np.ndarray, measurement_covarian
"""
if not self._state_initialized:
raise ValueError('Initial conditions not initialized. Call init_state(...) before running.')
if not self._output_initialized:
if not self._io_initialized:
raise ValueError('Output transition not initialized. Call init_io(...) before running.')

# run aukf
predicted_mean, predicted_covariance, predicted_sigma = self.predict(deltaT, self.aug_state_mean,
self.aug_state_covariance)

# TODO: update this for current problem formulation
self.state_mean, self.state_covariance = self.correct(predicted_mean, predicted_covariance,
measurement_mean[3:6], measurement_covariance[3:6, 3:6])
measurement_mean, measurement_covariance)
# update augmented state
self.aug_state_mean = np.concatenate((self.state_mean, self.noise_mean))
self.state_covariance = block_diag(self.state_covariance, self.noise_covariance)
Expand All @@ -134,16 +159,12 @@ def predict(self, deltaT: float, aug_mean: np.ndarray, aug_covariance: np.ndarra
# time update
next_sigma_points = np.zeros([self._state_dimension, self._sigma_dimension])
for ii in range(self._sigma_dimension):
next_sigma_points[:, ii] = actrv.actrv(deltaT,
sigma_points[0:self._state_dimension, ii, None],
sigma_points[-self._noise_dimension:, ii, None],
False).squeeze()
next_sigma_points[:, ii] = self._state_transition(deltaT, sigma_points[:, ii, None]).squeeze()

next_state_mean = next_sigma_points @ self._m_weights
next_state_mean = self._postprocessing(next_sigma_points @ self._m_weights)

temp = next_sigma_points - next_state_mean
next_state_covariance = np.multiply(temp, self._c_weights.T) @ temp.T

return next_state_mean, next_state_covariance, next_sigma_points

def correct(self, predicted_state_mean: np.ndarray, predicted_state_covariance: np.ndarray,
Expand All @@ -166,7 +187,7 @@ def correct(self, predicted_state_mean: np.ndarray, predicted_state_covariance:
output_sigma_points = np.zeros((self._output_dimension, self._sigma_dimension))

for ii in range(self._sigma_dimension):
output_sigma_points[:, ii] = self._output_transition(next_sigma_points[:, ii], 'velocity')
output_sigma_points[:, ii] = self._output_transition(next_sigma_points[:, ii])

output_mean = output_sigma_points @ self._m_weights

Expand All @@ -180,7 +201,8 @@ def correct(self, predicted_state_mean: np.ndarray, predicted_state_covariance:
# compute Kalman gain
kalman_gain = np.linalg.solve(output_covariance.T, cross_covariance.T).T

state_mean_estimate = predicted_state_mean + kalman_gain @ (measurement_mean - output_mean)
state_mean_estimate = self._postprocessing(
predicted_state_mean + kalman_gain @ (measurement_mean - output_mean))
state_covariance_estimate = predicted_state_covariance - kalman_gain @ output_covariance @ kalman_gain.T

return state_mean_estimate, state_covariance_estimate
Expand Down Expand Up @@ -209,44 +231,41 @@ def compute_sigma_point(mean: np.ndarray, covariance: np.ndarray, coefficient: f

kf = AUKF(state_dim, noise_dim, output_dim)

initialState = np.array([[1, 1, 0, 1, 0.5, 0.05]]).T # initial state
initialCovariance = 1e-5 * np.eye(state_dim) # initial state covariance
initialState = np.array([[10, 12, 0, 0, 1.5, 0.35]]).T # initial state
initialCovariance = 1e-4 * np.eye(state_dim) # initial state covariance

gt_0 = np.array([[1, 1, 0, 1, 0.5, 0.05]]).T

r = 1e-5 # std of measurement noise
r = 1e-4 # std of measurement noise
measurementNoise = r ** 2 * np.eye(output_dim) # covariance of measurement noise
processNoise = 1e-20 * np.eye(noise_dim)
processNoise = 1e-5 * np.eye(noise_dim)

N = 150 # number of steps
dt = 1 # sampling time
R = 1 # number of iterations

# store states
X_est = np.zeros([R, state_dim, N])
X_est = np.zeros([state_dim, N])
X_gt = np.zeros([state_dim, N])
X_gt[:, 0] = initialState.squeeze()
X_gt[:, 0] = gt_0.squeeze()

# GroundTruth data generation and UKF estimation
for s in range(R):
X_est[s, :, 0] = initialState.squeeze()
kf.init_state(initialState, initialCovariance, np.zeros([3, 1]), processNoise)
kf.init_io(actrv.ctrv_output_transition)
for i in range(1, N):
meas_mean, meas_covariance = actrv.get_next_measurement(kf.aug_state_mean, dt, r)
m_estMean, _ = kf.step(dt, meas_mean, meas_covariance)
X_est[s, :, i] = m_estMean.squeeze()
kf.reset()

# compute ground truth
X_est[:, 0] = initialState.squeeze()
kf.init_state(initialState, initialCovariance, np.zeros([3, 1]), processNoise)
kf.init_io(actrv.actrv, actrv.actrv_output_transition, actrv.wrapState)

for i in range(1, N):
X_gt[:, i] = actrv.actrv(dt, X_gt[:, i - 1, None], np.array([[0, 0, 0]]).T, False).squeeze()
X_gt[:, i] = actrv.actrv(dt, np.concatenate((X_gt[:, i - 1, None], np.array([[0, 0, 0]]).T)), True).squeeze()
pose_gt = X_gt[:3, i, None]
pose_noise = r * np.eye(3)

m_estMean, _ = kf.step(dt, pose_gt, pose_noise)
X_est[:, i] = m_estMean.squeeze()

# Compute Root Mean Square Error between estimate and GT
RMSE = np.zeros([state_dim, 1])
for s in range(R):
E = np.squeeze(X_est[s, :, :]) - X_gt
SE = E ** 2
MSE = np.mean(SE, axis=1)
RMSE[:, 0] += np.sqrt(MSE) / R
E = np.squeeze(X_est[:, :]) - X_gt
SE = E ** 2
MSE = np.mean(SE, axis=1)
RMSE = np.sqrt(MSE)
print('RMSE =', RMSE.squeeze())

# Plots
Expand All @@ -261,15 +280,15 @@ def compute_sigma_point(mean: np.ndarray, covariance: np.ndarray, coefficient: f

for i in range(3):
fig_A.add_trace(go.Scatter(y=X_gt[i, :], name='{}_GT'.format(titles[i])), row=1, col=i + 1)
fig_A.add_trace(go.Scatter(y=X_est[0, i, :], name='{}_EST'.format(titles[i])), row=1, col=i + 1)
fig_A.add_trace(go.Scatter(y=X_est[i, :], name='{}_EST'.format(titles[i])), row=1, col=i + 1)
fig_A.add_trace(go.Scatter(y=X_gt[i, :], name='{}_GT'.format(titles[i + 3])), row=2, col=i + 1)
fig_A.add_trace(go.Scatter(y=X_est[0, i, :], name='{}_EST'.format(titles[i + 3])), row=2, col=i + 1)
fig_A.add_trace(go.Scatter(y=X_est[i, :], name='{}_EST'.format(titles[i + 3])), row=2, col=i + 1)

fig_A.show()

fig_B = go.Figure()
fig_B.add_trace(go.Scatter(x=X_gt[0, :], y=X_gt[1, :], name='GT'))
fig_B.add_trace(go.Scatter(x=X_est[0, 0, :], y=X_est[0, 1, :], name='EST'))
fig_B.add_trace(go.Scatter(x=X_est[0, :], y=X_est[1, :], name='EST'))
fig_B.update_layout(title_text='Ground truth vs estimated trajectory')
fig_B.update_yaxes(scaleanchor="x", scaleratio=1)
fig_B.show()
Loading

0 comments on commit d63ad34

Please sign in to comment.