import matplotlib.pyplot as plt
import numpy as np
from scipy import linalg
6)
np.random.seed(
# initialize and set up all matrices
= 0.1 #0.01 # process noise
q = 0.1 #0.02 #0.1 # measurement noise
r # need to add the ensemble dimension
= 10 # number of ensemble members
Ne = 2 # state dimension
Nx = 1 # observation dimension
Ny = 500 # time dimension
Nt = 0.01 # time step
dt = 9.81 # gravitational acceleration
g
= np.array([1.5, 0.]) # Initial state
x_0
= np.array([1.6, 0.]) # Initial state estimate (slightly off)
m_0 #P_0 = np.array([[0.1, 0.],
# [0., 0.1]]) # Initial estimate covariance
= np.array([[r, 0.],
P_0 0., r]]) # Initial estimate covariance
[
= q # process noise
sig_w = r # measurement noise
sig_v
= sig_w**2 * np.array([[dt ** 3 / 3, dt ** 2 / 2],
Q ** 2 / 2, dt]])
[dt = sig_v**2 * np.eye(Ny)
R
# Observation operator (nonlinear)
def Hx(x):
return np.array([np.sin(x[0])])
# State dynamics (nonlinear)
def Ax(x, dt):
= np.array([x[0] + dt*x[1],
m 1] - g*dt* np.sin(x[0])])
x[return m
12 Example 1: noisy pendulum
Consider the nonlinear ODE model for the oscillations of a noisy pendulum with unit mass and length \(L,\)
\[ \frac{\mathrm{d}^{2}\theta}{\mathrm{d} t^{2}}+\frac{g}{L}\sin\theta+w(t)=0 \]
where \(\theta\) is the angular displacement of the pendulum, \(g\) is the gravitational constant, \(L\) is the pendulum’s length, and \(w(t)\) is a white noise process. This is rewritten in state space form,
\[ \dot{\mathbf{x}}+\mathcal{M}(\mathbf{x})+\mathbf{w}=0, \]
where
\[ \mathbf{x}=\left[\begin{array}{c} x_{1}\\ x_{2} \end{array}\right]=\left[\begin{array}{c} \theta\\ \dot{\theta} \end{array}\right],\quad \mathcal{M}(\mathbf{x})=\left[\begin{array}{c} x_{2}\\ -\dfrac{g}{L}\sin x_{1} \end{array}\right],\quad \mathbf{w}=\left[\begin{array}{c} 0\\ w(t) \end{array}\right]. \]
Suppose that we have discrete, noisy measurements of the horizontal component of the position, \(\sin (\theta).\) Then the measurement equation is scalar,
\[ y_k = \sin \theta_k + v_k, \]
where \(v_k\) is a zero-mean Gaussian random variable with variance \(R.\) The system is thus nonlinear in state and measurement and the state-space system is of the general form seen above. A simple discretization, based on Euler’s method produces
\[\begin{align*} \mathbf{x}_{k} & =\mathcal{M}(\mathbf{x}_{k-1})+\mathbf{w}_{k-1},\\ {y}_{k} & = \mathcal{H}_{k}(\mathbf{x}_{k}) + {v}_{k}, \end{align*}\]
where
\[ \mathbf{x}_{k}=\left[\begin{array}{c} x_{1}\\ x_{2} \end{array}\right]_{k}, \quad \mathcal{M}(\mathbf{x}_{k-1})=\left[\begin{array}{c} x_1 + \Delta t x_{2}\\ x_2 - \Delta t \dfrac{g}{L}\sin x_{1} \end{array}\right]_{k-1}, \quad \mathcal{H}(\mathbf{x}_{k}) = [\sin x_{1}]_k . \]
The noise terms have distributions
\[ \mathbf{w}_{k-1}\sim\mathcal{N}(\mathbf{0},Q),\quad v_{k}\sim\mathcal{N}(0,R), \]
where the process covariance matrix is
\[ Q=\left[\begin{array}{cc} q_{11} & q_{12}\\ q_{21} & q_{22} \end{array}\right], \]
with components (see KF example for 2D motion tracking),
\[ q_{11}=q_{c}\frac{\Delta t^{3}}{3},\quad q_{12}=q_{21}=q_{c}\frac{\Delta t^{2}}{2},\quad q_{22}=q_{c}\Delta t, \]
and \(q_c\) is the continuous process noise spectral density.
12.1 Generate noisy observations
To generate the noisy observations, we have 2 options
- Use an accurate RK4 method.
- Use a simpler, first-order Euler method.
Whatever the approach chosen, it must be used both for the observation generation and for the state evolution within the Kalman filter.
We will use the simpler Euler approximation. For reference, here is the RK4 method.
def Pendulum(state, *args): #nonlinear pendulum
= args[0]
g = args[1]
L = state #Unpack the state vector
x1, x2 = np.zeros(2) #Derivatives
f 0] = x2
f[1] = -g/L * np.sin(x1)
f[return f
def RK4(rhs, state, dt, *args):
= rhs(state, *args)
k1 = rhs(state+k1*dt/2, *args)
k2 = rhs(state+k2*dt/2, *args)
k3 = rhs(state+k3*dt, *args)
k4
= state + (dt/6)*(k1 + 2*k2 + 2*k3 + k4)
new_state return new_state
#np.random.seed(55)
# Solve system and generate noisy observations
= np.linspace(0, Nt, Nt)
T #x0True = np.array([1.8, 0.0]) # True initial conditions
= np.array([1.5, 0.0]) # True initial conditions
x0True #time integration
= np.linalg.cholesky(Q) # noise std dev.
chol_Q = np.sqrt(R)
sqrt_R = np.zeros([Nx, Nt])
xxTrue 0] = x0True
xxTrue[:,= np.zeros((Nt, Ny))
yy 0] = np.sin(xxTrue[0, 0]) + sig_v * np.random.randn() # must initialize correctly!
yy[for k in range(Nt-1):
= chol_Q @ np.random.randn(2)
w_k +1] = RK4(Pendulum, xxTrue[:,k], dt, g, 1.0) + w_k
xxTrue[:,k+1,0] = np.sin(xxTrue[0, k+1]) + sig_v * np.random.randn()
yy[k
# plot results
= plt.subplots(nrows=1,ncols=1, figsize=(10,4))
fig, ax *dt, np.sin(xxTrue[0, :]), color='b', linewidth = 3, label="Noisy state")
ax.plot(T*dt, yy[:,0], marker="o", label="Measurements", color="red", alpha=0.66, s=10)
ax.scatter(T't')
ax.set_xlabel("$\sin(x_1(t))$", labelpad=5)
ax.set_ylabel(
ax.legend() plt.show()
# Use Euler method
from common_utilities import generate_pendulum, RandomState, rmse, plot_pendulum
= RandomState(1)
random_state = Nt
steps = x0True
x_0 #timeline, states, observations = generate_pendulum(x_0, g, Q, dt, R, steps, random_state)
#plot_pendulum(timeline, observations, states, "Trajectory")
= np.zeros((Nt, Ny))
y 0] = generate_pendulum(x_0, g, Q, dt, R, steps, random_state)
timeline, xTrue, y[:, "Trajectory") plot_pendulum(timeline, y, xTrue,
= plt.subplots(nrows=1,ncols=1, figsize=(10,4))
fig, ax *dt, np.sin(xTrue[:, 0]), color='b', linewidth = 3, label="Noisy state")
ax.plot(T*dt, y[:,0], marker="o", label="Measurements", color="red", alpha=0.66, s=10)
ax.scatter(T
ax.legend()'$\sin x_1(t)$')
plt.ylabel('$t$')
plt.xlabel( plt.show()
= plt.subplots(nrows=1,ncols=1, figsize=(6,6))
fig, ax #plt.plot(xTrue[0, :], xTrue[1, :], 'b')
0], xTrue[:, 1], 'b')
plt.plot(xTrue[:, 0, 0], xTrue[0, 1], marker="o", color="green", s=500)
plt.scatter(xTrue['$x_1(t)$')
plt.xlabel('$x_2(t)$')
plt.ylabel( plt.show()
12.2 Ensemble Kalman Filter
def ens_kalman_filter(m_0, P_0, Q, R, Ne, dt, Y):
= m_0.shape[-1]
Nx = Y.shape
Nt, Ny
= np.empty((Nt, Nx))
enkf_m = np.empty((Nt, Nx, Nx))
enkf_P = np.empty((Nx, Ne))
X = np.empty((Ny, Nx))
HXf
= np.tile(m_0, (Ne,1)).T + np.linalg.cholesky(P_0)@np.random.randn(Nx, Ne) # initial ensemble state
X[:,:] = P_0 # initial state covariance
P 0, :] = m_0
enkf_m[0, :, :] = P_0
enkf_P[
for i in range(Nt-1):
# ==== predict/forecast ====
= Ax(X[:,:],dt) + np.linalg.cholesky(Q)@np.random.randn(Nx, Ne) # predict state ensemble
Xf = np.mean(Xf, axis=1) # state ensemble mean
mX = Xf - mX[:, None] # state forecast anomaly
Xfp #Phat = Xfp @ Xfp.T / (Ne - 1) # predict covariance (not needed)
# ==== prepare =====
= Hx(Xf) # nonlinear observation
HXf = np.mean(HXf, axis=1) # observation ensemble mean
mY = HXf - mY[:, None] # observation anomaly
HXp = (HXp @ HXp.T)/(Ne - 1) + R # observation covariance
S = linalg.solve(S, HXp @ Xfp.T, assume_a="pos").T / (Ne - 1) # Kalman gain
K # === perturb y and compute innovation ====
= Y[i,:] + np.linalg.cholesky(R)@np.random.randn(Ny, Ne)
ypert = ypert - HXf
d # ==== correct/analyze ====
= Xf + K @ d # update state ensemble
X[:,:] = np.mean(X[:,:], axis=1)# state analysis ensemble mean
mX = X[:,:] - mX[:, None] # state analysis anomaly
Xap = Xap @ Xap.T / ( Ne - 1) # update covariance
P # ==== save ====
+1] = mX # save KF state estimate (mean)
enkf_m[i+1] = P # save KF error estimate (covariance)
enkf_P[ireturn enkf_m, enkf_P
# initialize state and covariance
= np.array([1.5, 0.]) # Initial state
x_0 = np.array([1.6, 0.]) # Initial state estimate (slightly off)
m_0 = np.array([[r, 0.],
P_0 0., r]]) # Initial estimate covariance
[
= ens_kalman_filter(m_0, P_0, Q, R, Ne, dt, y)
enkf_m, enkf_P
#rmse_ekf = rmse(ekf_m[:, :1], states[:, :1])
#print(f"EKF RMSE: {rmse_ekf}")
= plt.subplots(ncols=2, figsize=(18, 8))
fig, axes = xTrue # states
x1 = enkf_m # KF estimate
x2 = y # Measurements
y = T*dt
timeline 1].scatter(timeline, y, marker=".", label="Measurements", color="red", alpha=0.66)
axes[#axes[1].plot(timeline, np.sin(x1[0, :]), linestyle="dashdot", label="Trajectory", color="blue")
1].plot(timeline, np.sin(x1[:, 0]), linestyle="dashdot", label="Trajectory", color="blue")
axes[1].plot(timeline, np.sin(x2[:, 0]), linestyle="dashdot", label="EnKF estimate", color="green")
axes[#axes[0].plot(x1[0, :], x1[1, :], label="Trajectory", color="blue")
0].plot(x1[:, 0], x1[:, 1], label="Trajectory", color="blue")
axes[0].plot(x2[:, 0], x2[:, 1], label="EnKF estimate", color="green")
axes[0].legend()
axes[1].legend()
axes[0].set_xlabel('$x_1(t)$')
axes[1].set_xlabel('$t$')
axes[0].set_ylabel('$ x_2(t)$')
axes[1].set_ylabel('$\sin x_1(t)$')
axes[
plt.show()
= plt.subplots(nrows=1,ncols=1, figsize=(8,8))
fig, ax 0, 0], x1[0, 1], marker="o", color="green", s=500)
plt.scatter(x1[0], x1[:, 1], label="Trajectory", color="blue")
plt.plot(x1[:, 0], x2[:, 1], label="EnKF estimate", color="red")
plt.plot(x2[:, '$x_1(t)$')
plt.xlabel('$x_2(t)$')
plt.ylabel(
plt.legend() plt.show()