import numpy as np
from typing import Callable, Any
# This function can, in princiople, be parallelized... see original "KI.ipynb"
def ensemble(s_param: Any, θ_ens: np.ndarray, forward: Callable) -> np.ndarray:
= θ_ens.shape
N_ens, N_θ = s_param.N_y
N_y = np.zeros((N_ens, N_y))
g_ens
for i in range(N_ens):
= θ_ens[i, :]
θ = forward(s_param, θ)
g_ens[i, :]
return g_ens
21 Example 3: Kalman Inversion with an Ensemble Transform Filter
We solve a nonlinear, two-parameter inverse problem for a 1-D elliptic equation using mean-field ensemble Kalman inversion based on a square-root filter.
NOTE
This version uses the mean-field approach.
21.1 2-Parameter Elliptic Equation : Probabilistic Approach
Consider the one-dimensional elliptic boundary-value problem
\[ -\frac{d}{dx}\Big(\exp(\theta_{(1)}) \frac{d}{dx}p(x)\Big) = 1, \qquad x\in[0,1] \]
with boundary conditions \[p(0) = 0, \qquad p(1) = \theta_{(2)}.\]
The exact solution for this problem is given by
\[ p(x) = \theta_{(2)} x + \exp(-\theta_{(1)})\Big(-\frac{x^2}{2} + \frac{x}{2}\Big). \]
21.2 Inverse Problem
The inverse problem: given the observations \(y = (p(x_1),\,p(x_2))^T\) at \(x_1=0.25\) and \(x_2=0.75,\) solve for \(\theta = (\theta_{(1)},\, \theta_{(2)})^T.\)
The Bayesian inverse problem, “given observations \(y,\) find the parameter \(\theta\)”, is formulated as
\[ y = \mathcal{G}(\theta) + \eta \qquad \textrm{and} \qquad \mathcal{G}(\theta) = \begin{bmatrix} p(x_1, \theta)\\ p(x_2, \theta) \end{bmatrix}, \] where \(\mathcal{G}(\theta)\) is the forward model operator, and the prior is \(\mathcal{N}([0, 100]^T, I)\). We consider two scenarios
- Well-posed case:the observation is \(y=[27.5, 79.7]^T\) with observation error \(\eta\sim\mathcal{N}(0, 0.1^2 I)\).
- Ill-posed case:the observation is \(y=[27.5]\) with observation error \(\eta\sim\mathcal{N}(0, 0.1^2 I)\).
21.3 Kalman Inversion by the Mean Field Method
In the code below (see update_ensemble
), the following mean-field formulation is used.
21.3.1 Prediction
\[ \hat{\theta}^j_{n+1} = {m}_{n} + \sqrt{\frac{1}{1-\Delta \tau}} \left( \theta_n^j - m_n \right), \]
where \(m_n\) is the mean of \(\theta_n,\) the analyzed update (or initial condition, if \(n=0\)), \(\Delta \tau = 0.5,\) and \(j=1, \ldots, J\) is the ensemble member index.
21.3.2 Analysis
- compute anomalies
- update \(\theta\)
- loop back
21.4 Ensemble Kalman Inversion
Define the EKI
class. Can be applied to three types of ensemble filters:
- EKI
- EAKI
- ETKI
This one is for an ensemble transform filter, of square-root type, the ETKI. It uses the mean-field approach.
import numpy as np
from scipy import linalg
from scipy.stats import norm
class EKIObj:
def __init__(self, theta_names, N_ens, theta0_mean, theta_theta0_cov_sqrt,
y, Sigma_eta, delta_t):self.theta_names = theta_names
self.theta = [theta0_mean]
self.y_pred = []
self.y = y
self.Sigma_eta = Sigma_eta
self.N_ens = N_ens
self.N_theta = len(theta0_mean)
self.N_y = len(y)
self.delta_t = delta_t
self.iter = 0
print(f"Start ETKI on the mean-field stochastic dynamical system for Bayesian inference")
# Generate initial ensemble
self.theta = [self.MvNormal_sqrt(N_ens, theta0_mean, theta_theta0_cov_sqrt)]
def MvNormal_sqrt(self, N_ens, theta_mean, theta_theta_cov_sqrt):
= theta_theta_cov_sqrt.shape
N_theta, N_r = np.zeros((N_ens, N_theta))
theta for i in range(N_ens):
= theta_mean + theta_theta_cov_sqrt @ norm.rvs(size=N_r)
theta[i, :] return theta
def update_ensemble(self, ens_func):
self.iter += 1
= self.theta[-1]
theta = (1/self.delta_t) * self.Sigma_eta
Sigma_nu
# Prediction step
#################
= np.zeros_like(theta)
theta_p = np.mean(theta, axis=0)
theta_mean = theta_mean
theta_p_mean for j in range(self.N_ens):
= theta_p_mean + np.sqrt( 1/(1 - self.delta_t) ) * (theta[j] - theta_mean)
theta_p[j]
= np.mean(theta_p, axis=0)
theta_p_mean
# Analysis step
###############
= ens_func(theta_p)
g = np.mean(g, axis=0)
g_mean
# define anomalies:
= theta_p - theta_p_mean
Z_p_t /= np.sqrt(self.N_ens - 1)
Z_p_t
= g - g_mean
Y_p_t /= np.sqrt(self.N_ens - 1)
Y_p_t
= Y_p_t @ np.linalg.inv(self.Sigma_eta) @ Y_p_t.T
X
= np.linalg.svd(X)
U, S, Vt = U, S
P, Gamma
= theta_p_mean + Z_p_t.T @ (P @ (np.linalg.inv(np.diag(Gamma) + np.eye(len(Gamma))) @ (P.T @ (Y_p_t @ (np.linalg.inv(self.Sigma_eta) @ (self.y - g_mean))))))
theta_mean
# filter_type == "ETKI":
= P @ np.diag(1 / np.sqrt(Gamma + 1)) @ P.T
T = np.array([theta_p[j] - theta_p_mean for j in range(self.N_ens)])
theta = T.T @ theta
theta += theta_mean
theta
self.theta.append(theta)
self.y_pred.append(g_mean)
def EKI_Run(s_param, forward,
theta0_mean, theta_theta0_cov_sqrt,
N_ens,
y, Sigma_eta,
Delta_t,
N_iter):
= s_param.θ_names ##s_param.theta_names
theta_names
= EKIObj(theta_names,
ekiobj
N_ens,
theta0_mean, theta_theta0_cov_sqrt,
y, Sigma_eta,
Delta_t)
def ens_func(theta_ens):
return ensemble(s_param, theta_ens, forward)
for i in range(N_iter):
##update_ensemble(ekiobj, ens_func)
ekiobj.update_ensemble(ens_func)
return ekiobj
import numpy as np
from typing import List
class Setup_Param:
def __init__(self, θ_names: List[str], N_θ: int, N_y: int):
self.θ_names = θ_names
self.N_θ = N_θ
self.N_y = N_y
def create_setup_param(N_θ: int, N_y: int) -> Setup_Param:
return Setup_Param(["θ"], N_θ, N_y)
def forward(s_param: Setup_Param, θ: List[float]) -> List[float]:
= 0.25, 0.75
x1, x2 1, θ2 = θ
θdef p(x):
return θ2 * x + np.exp(-θ1) * (-x**2/2 + x/2)
return [p(x1), p(x2)]
def forward_aug(s_param: Setup_Param, θ: List[float]) -> List[float]:
= 0.25, 0.75
x1, x2 1, θ2 = θ
θdef p(x):
return θ2 * x + np.exp(-θ1) * (-x**2/2 + x/2)
return [p(x1), p(x2), θ1, θ2]
def forward_illposed(s_param: Setup_Param, θ: List[float]) -> List[float]:
= 0.25
x1 1, θ2 = θ
θdef p(x):
return θ2 * x + np.exp(-θ1) * (-x**2/2 + x/2)
return [p(x1)]
def forward_illposed_aug(s_param: Setup_Param, θ: List[float]) -> List[float]:
= 0.25
x1 1, θ2 = θ
θdef p(x):
return θ2 * x + np.exp(-θ1) * (-x**2/2 + x/2)
return [p(x1), θ1, θ2]
def construct_cov(x: np.ndarray) -> np.ndarray:
= np.mean(x, axis=0)
x_mean = x.shape
N_ens, N_x
= np.zeros((N_x, N_x))
x_cov
for i in range(N_ens):
+= np.outer(x[i,:] - x_mean, x[i,:] - x_mean)
x_cov
return x_cov / (N_ens - 1)
#import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal
import random
def Elliptic_Posterior_Plot(problem_type="under-determined", μ0=np.array([0.0, 100.0]),
0=np.array([[1.0**2, 0.0], [0.0, 1.0**2]]), Nt=30, N_ens=100, file_name=""):
Σ
print("start Elliptic_Posterior_Plot")
128)
np.random.seed(= 2
N_θ = float
FT
# observation and observation error covariance
if problem_type == "under-determined":
= np.array([27.5])
y = np.array([[0.1**2]])
Σ_η = forward_illposed
forward_func = forward_illposed_aug
forward_func_aug else:
= np.array([27.5, 79.7])
y = np.diag([0.1**2] * 2)
Σ_η = forward
forward_func = forward_aug
forward_func_aug
= len(y)
N_y ##s_param = Setup_Param(N_θ, N_y)
= Setup_Param("θ",N_θ, N_y)
s_param
# compute posterior distribution by MCMC
def logρ(θ):
return log_bayesian_posterior(s_param, θ, forward_func, y, Σ_η, μ0, Σ0)
= 1.0
step_length ##N_iter_MCMC, n_burn_in = 5000000, 1000000
= 1000000, 200000
N_iter_MCMC, n_burn_in
print("start RWMCMC")
= RWMCMC_Run(logρ, μ0, step_length, N_iter_MCMC)
us = np.mean(us[n_burn_in:], axis=0)
θ_post = np.cov( us[n_burn_in:].T)
Σ_post
= Nt
N_iter = Setup_Param("θ", N_θ, N_y + N_θ)
s_param_aug 0_mean = μ0
θ0_cov = Σ0
θθ0_cov_sqrt = np.sqrt(Σ0)
θθ= np.concatenate([y, μ0])
y_aug = np.block([[Σ_η, np.zeros((N_y, N_θ))],
Σ_η_aug 0]])
[np.zeros((N_θ, N_y)), Σ
# mean-field model:
= 0.5
Δt
print("start ETKI")
= EKI_Run(s_param_aug, forward_func_aug, θ0_mean, θθ0_cov_sqrt,
etki_obj
N_ens, y_aug, Σ_η_aug, Δt, N_iter)
= np.zeros((N_iter+1, 2))
etki_errors
for i in range(N_iter+1):
0] = np.linalg.norm(np.mean(etki_obj.theta[i], axis=0) - θ_post) / np.linalg.norm(θ_post)
etki_errors[i, 1] = np.linalg.norm(construct_cov(etki_obj.theta[i]) - Σ_post) / np.linalg.norm(Σ_post)
etki_errors[i,
= Nt + 1
i = np.arange(Nt + 1)
ites
= plt.subplots(nrows=2, ncols=1, sharex=False, sharey="row", figsize=(14, 9))
fig, ax
0].semilogy(ites, etki_errors[:, 0], "-d", color="C3", fillstyle="none", label=f"ETKI (J={N_ens})")
ax[0].set_xlabel("Iterations")
ax[0].set_ylabel("Rel. mean error")
ax[0].grid(True)
ax[0].legend(bbox_to_anchor=(1.0, 1.0))
ax[
1].semilogy(ites, etki_errors[:, 1], "-d", color="C3", fillstyle="none", label=f"ETKI (J={N_ens})")
ax[1].set_xlabel("Iterations")
ax[1].set_ylabel("Rel. covariance error")
ax[1].grid(True)
ax[1].legend(bbox_to_anchor=(1.0, 1.0))
ax[
plt.tight_layout()#plt.savefig(f"Elliptic-{problem_type}-error_ETKF.pdf")
# plot results at the last iteration
= 1
ncols = plt.subplots(ncols=ncols, nrows=1, sharex=True, sharey=True, figsize=(17, 10))
fig, ax
for icol in range(ncols):
# plot MCMC results
= 10
everymarker 0], us[n_burn_in::everymarker, 1], s=1)
ax.scatter(us[n_burn_in::everymarker,
= N_iter ##+ 1
ites # scatter ETKI
0], etki_obj.theta[ites][:, 1], color="r")
ax.scatter(etki_obj.theta[ites][:, f"ETKI (J={N_ens})")
ax.set_title(r'$\theta_{(1)}$')
ax.set_xlabel(r'$\theta_{(2)}$')
ax.set_ylabel(
plt.tight_layout() plt.show()
21.5 Random Walk MCMC
Use the emcee
approach…
#import numpy as np
from scipy import stats
def log_bayesian_posterior(s_param, θ, forward, y, Σ_η, μ0, Σ0):
= forward(s_param, θ)
Gu = -0.5 * np.dot(np.dot((y - Gu).T, np.linalg.inv(Σ_η)), (y - Gu)) - \
Φ 0.5 * np.dot(np.dot((θ - μ0).T, np.linalg.inv(Σ0)), (θ - μ0))
return Φ
def log_likelihood(s_param, θ, forward, y, Σ_η):
= forward(s_param, θ)
Gu = -0.5 * np.dot(np.dot((y - Gu).T, np.linalg.inv(Σ_η)), (y - Gu))
Φ return Φ
def RWMCMC_Run(log_bayesian_posterior, θ0, step_length, n_ite, seed=11):
np.random.seed(seed)= len(θ0)
N_θ = np.zeros((n_ite, N_θ))
θs = np.zeros(n_ite)
fs
0, :] = θ0
θs[0] = log_bayesian_posterior(θ0)
fs[
for i in range(1, n_ite):
= θs[i-1, :]
θ_p = θ_p + step_length * np.random.normal(0, 1, N_θ)
θ
= log_bayesian_posterior(θ)
fs[i] = min(1.0, np.exp(fs[i] - fs[i-1]))
α if α > np.random.uniform(0, 1):
= θ
θs[i, :] else:
= θ_p
θs[i, :] = fs[i-1]
fs[i]
return θs
def PCN_Run(log_likelihood, θ0, θθ0_cov, β, n_ite, seed=11):
np.random.seed(seed)= len(θ0)
N_θ = np.zeros((n_ite, N_θ))
θs = np.zeros(n_ite)
fs
0, :] = θ0
θs[0] = log_likelihood(θ0)
fs[
for i in range(1, n_ite):
= θs[i-1, :]
θ_p = np.sqrt(1 - β**2) * θ_p + β * np.random.multivariate_normal(np.zeros(N_θ), θθ0_cov)
θ
= log_likelihood(θ)
fs[i] = min(1.0, np.exp(fs[i] - fs[i-1]))
α if α > np.random.uniform(0, 1):
= θ
θs[i, :] else:
= θ_p
θs[i, :] = fs[i-1]
fs[i]
return θs
def emcee_Propose(θ_s, θ_c, a=2.0):
= θ_s.shape
Ns, N_θ
= ((a - 1.0) * np.random.uniform(0, 1, Ns) + 1)**2.0 / a
zz = (N_θ - 1.0) * np.log(zz)
factors
= np.random.randint(0, Ns, Ns)
rint return θ_c[rint, :] - (θ_c[rint, :] - θ_s) * zz[:, np.newaxis], factors
def emcee_Run(log_bayesian_posterior, θ0, n_ite, random_split=True, a=2.0, seed=11):
np.random.seed(seed)
= θ0.shape
N_ens, N_θ assert N_ens >= 2*N_θ
assert N_ens % 2 == 0
= np.zeros((n_ite, N_ens, N_θ))
θs = np.zeros((n_ite, N_ens))
fs
0, :, :] = θ0
θs[for k in range(N_ens):
0, k] = log_bayesian_posterior(θ0[k, :])
fs[
= 2
nsplit = N_ens // 2
N_s
= np.arange(N_ens)
all_inds = all_inds % nsplit
inds = np.zeros(N_s)
log_probs
for i_t in range(1, n_ite):
if random_split:
np.random.shuffle(inds)
for split in [0, 1]:
= (inds == split)
s_inds = (inds != split)
c_inds
= θs[i_t - 1, s_inds, :], θs[i_t - 1, c_inds, :]
s, c
= emcee_Propose(s, c, a=a)
q, factors for i in range(N_s):
= log_bayesian_posterior(q[i, :])
log_probs[i]
for i in range(N_s):
= all_inds[s_inds][i]
j
= min(1.0, np.exp(factors[i] + log_probs[i] - fs[i_t - 1, j]))
α if α > np.random.uniform(0, 1):
= q[i, :]
θs[i_t, j, :] = log_probs[i]
fs[i_t, j] else:
= θs[i_t - 1, j, :]
θs[i_t, j, :] = fs[i_t - 1, j]
fs[i_t, j]
return θs
import numpy as np
import matplotlib.pyplot as plt
# Set up matplotlib parameters
plt.rcParams.update({"font.size": 15,
"axes.labelsize": 15,
"xtick.labelsize": 15,
"ytick.labelsize": 15,
"legend.fontsize": 15,
})
def gaussian_1d(theta_mean, theta_theta_cov, nx, theta_min=None, theta_max=None):
# 1d Gaussian plot
if theta_min is None:
= theta_mean - 5 * np.sqrt(theta_theta_cov)
theta_min if theta_max is None:
= theta_mean + 5 * np.sqrt(theta_theta_cov)
theta_max
= np.linspace(theta_min, theta_max, nx)
theta = np.zeros_like(theta)
rho_theta
for ix in range(nx):
= theta[ix] - theta_mean
delta_theta = np.exp(-0.5 * (delta_theta / theta_theta_cov * delta_theta)) / (np.sqrt(2 * np.pi * theta_theta_cov))
rho_theta[ix]
return theta, rho_theta
def gaussian_2d(theta_mean, theta_theta_cov, nx, ny, x_min=None, x_max=None, y_min=None, y_max=None):
# 2d Gaussian plot
if x_min is None:
= theta_mean[0] - 5 * np.sqrt(theta_theta_cov[0, 0])
x_min if x_max is None:
= theta_mean[0] + 5 * np.sqrt(theta_theta_cov[0, 0])
x_max if y_min is None:
= theta_mean[1] - 5 * np.sqrt(theta_theta_cov[1, 1])
y_min if y_max is None:
= theta_mean[1] + 5 * np.sqrt(theta_theta_cov[1, 1])
y_max
= np.linspace(x_min, x_max, nx)
xx = np.linspace(y_min, y_max, ny)
yy
= np.meshgrid(xx, yy)
X, Y = np.zeros((nx, ny))
Z
= np.linalg.det(theta_theta_cov)
det_theta_theta_cov for ix in range(nx):
for iy in range(ny):
= np.array([xx[ix] - theta_mean[0], yy[iy] - theta_mean[1]])
delta_xy = np.exp(-0.5 * (delta_xy.T @ np.linalg.inv(theta_theta_cov) @ delta_xy)) / (2 * np.pi * np.sqrt(det_theta_theta_cov))
Z[ix, iy]
return X, Y, Z
21.6 Simulations
We are ready to run simulations…
= "under-determined"
problem_type 0 = np.array([0.0, 100.0])
μ0 = np.array([[1.0**2, 0.0], [0.0, 1.0**2]])
Σ= 30, 50
Nt, N_ens 0, Σ0, Nt, N_ens) Elliptic_Posterior_Plot(problem_type, μ
start Elliptic_Posterior_Plot
start RWMCMC
start ETKI
Start ETKI on the mean-field stochastic dynamical system for Bayesian inference
import numpy as np
= "well-determined"
problem_type 0 = np.array([0.0, 100.0])
μ0 = np.array([[1.0**2, 0.0], [0.0, 1.0**2]])
Σ= 30, 50
Nt, N_ens 0, Σ0, Nt, N_ens) Elliptic_Posterior_Plot(problem_type, μ
start Elliptic_Posterior_Plot
start RWMCMC
start ETKI
Start ETKI on the mean-field stochastic dynamical system for Bayesian inference