Skip to content

Kalman Filter & Unscented Kalman Filter

This module provides generic Kalman and unscented Kalman filter implementations built on an abstract state-space model.

It is influenced by the implementation of the State-Space model & Kalman filter in the Python particles library.

A state-space model separates a time series into two layers:

  • A latent state \(x_t\) that carries the unobserved dynamics.
  • An observation \(y_t\) that is a (noisy) function of the current state.

See the kalman filter theory page for more details on the algorithms and their applications.

quantflow.ta.kalman.StateSpaceModel pydantic-model

Bases: BaseModel, ABC

Generic state-space model

\[\begin{equation} \begin{aligned} x_t &\sim p_x\left(x_t \mid x_{t-1}\right) \\ y_t &\sim p_y\left(y_t \mid x_t\right) \end{aligned} \end{equation}\]

where \(p_x\) is the transition distribution of the latent state \(x_t\) and \(p_y\) is the observation distribution of the observable \(y_t\) given \(x_t\).

get_px0 abstractmethod

get_px0()

Distribution \(p_x(x_0)\) of the initial state \(x_0\).

Source code in quantflow/ta/kalman.py
@abstractmethod
def get_px0(self) -> MvDistribution:
    r"""Distribution $p_x(x_0)$ of the initial state $x_0$."""

get_px abstractmethod

get_px(t, xp)

Distribution \(p_x\left(x_t \mid x_{t-1}\right)\) of \(x_t\) given \(x_{t-1} = x_p\).

PARAMETER DESCRIPTION
t

Time index \(t\).

TYPE: int

xp

State at time \(t-1\).

TYPE: FloatArray

Source code in quantflow/ta/kalman.py
@abstractmethod
def get_px(
    self,
    t: Annotated[int, Doc("Time index $t$.")],
    xp: Annotated[FloatArray, Doc("State at time $t-1$.")],
) -> MvDistribution:
    r"""Distribution $p_x\left(x_t \mid x_{t-1}\right)$
    of $x_t$ given $x_{t-1} = x_p$."""

get_py abstractmethod

get_py(t, xp, x)

Distribution of \(y_t\) given \(x_{t-1}=x_p\) and \(x_t=x\).

PARAMETER DESCRIPTION
t

Time index \(t\).

TYPE: int

xp

State at time \(t-1\).

TYPE: FloatArray

x

State at time \(t\).

TYPE: FloatArray

Source code in quantflow/ta/kalman.py
@abstractmethod
def get_py(
    self,
    t: Annotated[int, Doc("Time index $t$.")],
    xp: Annotated[FloatArray, Doc("State at time $t-1$.")],
    x: Annotated[FloatArray, Doc("State at time $t$.")],
) -> MvDistribution:
    r"""Distribution of $y_t$ given $x_{t-1}=x_p$ and $x_t=x$."""

simulate_given_x

simulate_given_x(x)

Simulate observations given a state trajectory.

Returns \([y_0, \ldots, y_{T-1}]\) where \(y_t \sim\) get_py\((t, x_{t-1}, x_t)\).

PARAMETER DESCRIPTION
x

State trajectory \(x_0, \ldots, x_{T-1}\).

TYPE: list[FloatArray]

Source code in quantflow/ta/kalman.py
def simulate_given_x(
    self,
    x: Annotated[
        list[FloatArray], Doc(r"State trajectory $x_0, \ldots, x_{T-1}$.")
    ],
) -> list:
    r"""Simulate observations given a state trajectory.

    Returns $[y_0, \ldots, y_{T-1}]$ where
    $y_t \sim$ [get_py][..get_py]$(t, x_{t-1}, x_t)$.
    """
    lag_x: list = [None] + x[:-1]
    return [
        self.get_py(t, xp, xt).sample(size=1)
        for t, (xp, xt) in enumerate(zip(lag_x, x))
    ]

simulate

simulate(T)

Simulate state and observation trajectories of length \(T\).

Returns a tuple of two lists: the state trajectory \([x_0, \ldots, x_{T-1}]\) and the observation trajectory \([y_0, \ldots, y_{T-1}]\).

PARAMETER DESCRIPTION
T

Number of time steps.

TYPE: int

Source code in quantflow/ta/kalman.py
def simulate(
    self,
    T: Annotated[int, Doc("Number of time steps.")],
) -> tuple[list, list]:
    r"""Simulate state and observation trajectories of length $T$.

    Returns a tuple of two lists: the state trajectory
    $[x_0, \ldots, x_{T-1}]$ and the observation trajectory
    $[y_0, \ldots, y_{T-1}]$.
    """
    x: list = []
    for t in range(T):
        law = self.get_px0() if t == 0 else self.get_px(t, x[-1])
        x.append(law.sample(size=1))
    y = self.simulate_given_x(x)
    return x, y

unscented_filter

unscented_filter(y)

Build an [UnscentedKalmanFilter][quantflow.ta.kalman.StateSpaceModel.UnscentedKalmanFilter] for this model and the given observations.

PARAMETER DESCRIPTION
y

Observations \(y_{1:T}\) of shape \((T, n_y)\).

TYPE: FloatArray

Source code in quantflow/ta/kalman.py
def unscented_filter(
    self,
    y: Annotated[FloatArray, Doc("Observations $y_{1:T}$ of shape $(T, n_y)$.")],
) -> UnscentedKalmanFilter:
    """Build an [UnscentedKalmanFilter][..UnscentedKalmanFilter] for this
    model and the given observations."""
    return UnscentedKalmanFilter(model=self, y=y)

quantflow.ta.kalman.LinearGaussianModel pydantic-model

Bases: StateSpaceModel

State-space model with linear-Gaussian dynamics.

\[\begin{equation} \begin{aligned} x_t &= F\, x_{t-1} + \varepsilon_t, \quad \varepsilon_t \sim N(0, Q) \\ y_t &= H\, x_t + \eta_t, \quad \eta_t \sim N(0, R) \end{aligned} \end{equation}\]

Fields:

Q pydantic-field

Q

Process noise covariance \(Q\) of shape \((n_x, n_x)\).

R pydantic-field

R

Observation noise covariance \(R\) of shape \((n_y, n_y)\).

F pydantic-field

F

State transition matrix \(F\) of shape \((n_x, n_x)\).

H pydantic-field

H

Observation matrix \(H\) of shape \((n_y, n_x)\), or \((n_y,)\) when \(n_x = 1\).

mu0 pydantic-field

mu0

Prior mean \(\mu_0\) of \(x\) of shape \((n_x,)\).

cov0 pydantic-field

cov0

Prior covariance \(\Sigma_0\) of \(x\) of shape \((n_x, n_x)\).

n_x property

n_x

State dimension.

get_px0

get_px0()

Initial state distribution \(p(x_0) = \mathcal{N}(\mu_0, \Sigma_0)\).

Source code in quantflow/ta/kalman.py
def get_px0(self) -> MvNormal:
    r"""Initial state distribution $p(x_0) = \mathcal{N}(\mu_0, \Sigma_0)$."""
    return MvNormal(mean=self.mu0, cov=self.cov0)

get_px

get_px(t, xp)

Transition distribution of latent state \(x_t\) given \(x_{t-1}=x_p\).

\[\begin{equation} p(x_t \mid x_{t-1}) = \mathcal{N}(F x_{t-1}, Q) \end{equation}\]
PARAMETER DESCRIPTION
t

Time index \(t\).

TYPE: int

xp

State at time \(t-1\).

TYPE: FloatArray

Source code in quantflow/ta/kalman.py
def get_px(
    self,
    t: Annotated[int, Doc("Time index $t$.")],
    xp: Annotated[FloatArray, Doc("State at time $t-1$.")],
) -> MvNormal:
    r"""Transition distribution of latent state $x_t$ given $x_{t-1}=x_p$.

    \begin{equation}
        p(x_t \mid x_{t-1}) = \mathcal{N}(F x_{t-1}, Q)
    \end{equation}
    """
    return MvNormal(mean=self.F @ xp, cov=self.Q)

get_py

get_py(t, xp, x)
PARAMETER DESCRIPTION
t

Time index \(t\).

TYPE: int

xp

State at time \(t-1\).

TYPE: FloatArray

x

State at time \(t\).

TYPE: FloatArray

Source code in quantflow/ta/kalman.py
def get_py(
    self,
    t: Annotated[int, Doc("Time index $t$.")],
    xp: Annotated[FloatArray, Doc("State at time $t-1$.")],
    x: Annotated[FloatArray, Doc("State at time $t$.")],
) -> MvNormal:
    return MvNormal(mean=(self.H @ x).ravel(), cov=self.R)

kalman_filter

kalman_filter(y)

Convenience method to create a [KalmanFilter][quantflow.ta.kalman.LinearGaussianModel.KalmanFilter] for this model and the given observations.

PARAMETER DESCRIPTION
y

Observations \(y_{1:T}\) of shape \((T, n_y)\).

TYPE: FloatArray

Source code in quantflow/ta/kalman.py
def kalman_filter(
    self,
    y: Annotated[FloatArray, Doc("Observations $y_{1:T}$ of shape $(T, n_y)$.")],
) -> KalmanFilter:
    """Convenience method to create a [KalmanFilter][..KalmanFilter] for
    this model and the given observations."""
    return KalmanFilter(model=self, y=y)

simulate_given_x

simulate_given_x(x)

Simulate observations given a state trajectory.

Returns \([y_0, \ldots, y_{T-1}]\) where \(y_t \sim\) get_py\((t, x_{t-1}, x_t)\).

PARAMETER DESCRIPTION
x

State trajectory \(x_0, \ldots, x_{T-1}\).

TYPE: list[FloatArray]

Source code in quantflow/ta/kalman.py
def simulate_given_x(
    self,
    x: Annotated[
        list[FloatArray], Doc(r"State trajectory $x_0, \ldots, x_{T-1}$.")
    ],
) -> list:
    r"""Simulate observations given a state trajectory.

    Returns $[y_0, \ldots, y_{T-1}]$ where
    $y_t \sim$ [get_py][..get_py]$(t, x_{t-1}, x_t)$.
    """
    lag_x: list = [None] + x[:-1]
    return [
        self.get_py(t, xp, xt).sample(size=1)
        for t, (xp, xt) in enumerate(zip(lag_x, x))
    ]

simulate

simulate(T)

Simulate state and observation trajectories of length \(T\).

Returns a tuple of two lists: the state trajectory \([x_0, \ldots, x_{T-1}]\) and the observation trajectory \([y_0, \ldots, y_{T-1}]\).

PARAMETER DESCRIPTION
T

Number of time steps.

TYPE: int

Source code in quantflow/ta/kalman.py
def simulate(
    self,
    T: Annotated[int, Doc("Number of time steps.")],
) -> tuple[list, list]:
    r"""Simulate state and observation trajectories of length $T$.

    Returns a tuple of two lists: the state trajectory
    $[x_0, \ldots, x_{T-1}]$ and the observation trajectory
    $[y_0, \ldots, y_{T-1}]$.
    """
    x: list = []
    for t in range(T):
        law = self.get_px0() if t == 0 else self.get_px(t, x[-1])
        x.append(law.sample(size=1))
    y = self.simulate_given_x(x)
    return x, y

unscented_filter

unscented_filter(y)

Build an [UnscentedKalmanFilter][quantflow.ta.kalman.LinearGaussianModel.UnscentedKalmanFilter] for this model and the given observations.

PARAMETER DESCRIPTION
y

Observations \(y_{1:T}\) of shape \((T, n_y)\).

TYPE: FloatArray

Source code in quantflow/ta/kalman.py
def unscented_filter(
    self,
    y: Annotated[FloatArray, Doc("Observations $y_{1:T}$ of shape $(T, n_y)$.")],
) -> UnscentedKalmanFilter:
    """Build an [UnscentedKalmanFilter][..UnscentedKalmanFilter] for this
    model and the given observations."""
    return UnscentedKalmanFilter(model=self, y=y)

quantflow.ta.kalman.KalmanFilter pydantic-model

Bases: BaseModel

Kalman filter for a LinearGaussianModel.

Starting from the prior \((\mu_0, \Sigma_0)\), the forward pass alternates a prediction step with an observation update, accumulating the log-likelihood of the observations.

The state estimate of \(x_t\) given observations \(y_{1:s}\) is written \(\hat{x}_{t \mid s}\) with covariance \(P_{t \mid s}\).

The prediction propagates the filtered state through the linear dynamics of the model:

\[\begin{equation} \begin{aligned} \hat{x}_{0 \mid 0} &= \mu_0 \\ \hat{x}_{t \mid t-1} &= F\, \hat{x}_{t-1 \mid t-1}, \\ P_{t \mid t-1} &= F\, P_{t-1 \mid t-1}\, F^\top + Q. \end{aligned} \end{equation}\]

The update corrects the prediction with the observation \(y_t\), where \(S_t\) is the innovation covariance and \(K_t\) the optimal Kalman gain:

\[\begin{equation} \begin{aligned} e_t &= y_t - H\, \hat{x}_{t \mid t-1}, \\ S_t &= H\, P_{t \mid t-1}\, H^\top + R, \\ K_t &= P_{t \mid t-1}\, H^\top S_t^{-1}, \\ \hat{x}_{t \mid t} &= \hat{x}_{t \mid t-1} + K_t\, e_t, \\ P_{t \mid t} &= P_{t \mid t-1} - K_t\, H\, P_{t \mid t-1}. \end{aligned} \end{equation}\]

Each step adds its contribution to the Gaussian log-likelihood:

\[\begin{equation} \log L = -\frac{1}{2} \sum_t \left( n_y \log 2\pi + \log\det S_t + e_t^\top S_t^{-1} e_t \right). \end{equation}\]

When the state is one-dimensional and the observation noise is isotropic (\(R = h^2 I\)), the innovation covariance \(S_t = h^2 I + P_{t \mid t-1}\, c c^\top\) is a rank-1 update to a scaled identity, where \(c = H\) is a column vector.

The Sherman-Morrison identity inverts it in \(O(n_y)\) instead of \(O(n_y^3)\):

\[\begin{equation} S_t^{-1} = \frac{1}{h^2}\left( I - \frac{P_{t \mid t-1}\, c c^\top}{h^2 + P_{t \mid t-1}\, c^\top c} \right). \end{equation}\]

The update detects this case automatically (see [isotropic_noise][quantflow.ta.kalman.isotropic_noise]) and applies the fast path; otherwise it falls back to a dense Cholesky solve.

Fields:

model pydantic-field

model

Linear-Gaussian state-space model.

y pydantic-field

y

Observations \(y_{1:T}\) of shape \((T, n_y)\).

states pydantic-field

states

List of filtered state estimates. states[t] is the Gaussian approximation of \(p(x_t \mid y_{1:t})\) with mean and covariance.

filter

filter()

Run the Kalman forward pass.

Starts from the model prior, then for each observation applies predict followed by update. Populates states with the filtered estimates and returns the total log-likelihood of the observations.

Source code in quantflow/ta/kalman.py
def filter(self) -> float:
    r"""Run the Kalman forward pass.

    Starts from the model prior, then for each observation applies
    [predict][..predict] followed by [update][..update]. Populates
    [states][..states] with the filtered estimates and returns the total
    log-likelihood of the observations.
    """
    self.states = []
    log_lik = 0.0
    pred = self.model.get_px0().mean_and_cov()
    for t in range(self.y.shape[0]):
        if t > 0:
            pred = self.predict(self.states[-1])
        state, ll_inc = self.update(pred, self.y[t])
        log_lik += ll_inc
        self.states.append(state)
    return log_lik

predict

predict(state)

One-step-ahead state prediction.

\[\begin{equation} \begin{aligned} \hat{x}_{t \mid t-1} &= F\, \hat{x}_{t-1 \mid t-1}, \\ P_{t \mid t-1} &= F\, P_{t-1 \mid t-1}\, F^\top + Q. \end{aligned} \end{equation}\]
PARAMETER DESCRIPTION
state

Filtered state \((\hat{x}_{t-1}, P_{t-1})\).

TYPE: MeanAndCov

Source code in quantflow/ta/kalman.py
def predict(
    self,
    state: Annotated[
        MeanAndCov, Doc(r"Filtered state $(\hat{x}_{t-1}, P_{t-1})$.")
    ],
) -> MeanAndCov:
    r"""One-step-ahead state prediction.

    \begin{equation}
        \begin{aligned}
            \hat{x}_{t \mid t-1} &= F\, \hat{x}_{t-1 \mid t-1}, \\
            P_{t \mid t-1} &= F\, P_{t-1 \mid t-1}\, F^\top + Q.
        \end{aligned}
    \end{equation}
    """
    model = self.model
    mean = model.F @ state.mean
    cov = model.F @ state.cov @ model.F.T + model.Q
    return MeanAndCov(mean=mean, cov=cov)

update

update(pred, y)

Correct the prediction with the observation \(y_t\).

Returns the filtered state and the contribution of \(y_t\) to the Gaussian log-likelihood.

\[\begin{equation} \begin{aligned} e_t &= y_t - H\, \hat{x}_{t \mid t-1}, \\ S_t &= H\, P_{t \mid t-1}\, H^\top + R, \\ K_t &= P_{t \mid t-1}\, H^\top S_t^{-1}, \\ \hat{x}_{t \mid t} &= \hat{x}_{t \mid t-1} + K_t\, e_t, \\ P_{t \mid t} &= P_{t \mid t-1} - K_t\, H\, P_{t \mid t-1}. \end{aligned} \end{equation}\]

The log-likelihood contribution of \(y_t\) is

\[\begin{equation} -\tfrac{1}{2}\left( n_y \log 2\pi + \log\det S_t + e_t^\top S_t^{-1} e_t \right). \end{equation}\]
PARAMETER DESCRIPTION
pred

Predicted state \((\hat{x}_{t \mid t-1}, P_{t \mid t-1})\).

TYPE: MeanAndCov

y

Observation \(y_t\) of shape \((n_y,)\).

TYPE: FloatArray

Source code in quantflow/ta/kalman.py
def update(
    self,
    pred: Annotated[
        MeanAndCov,
        Doc(r"Predicted state $(\hat{x}_{t \mid t-1}, P_{t \mid t-1})$."),
    ],
    y: Annotated[FloatArray, Doc(r"Observation $y_t$ of shape $(n_y,)$.")],
) -> tuple[MeanAndCov, float]:
    r"""Correct the prediction with the observation $y_t$.

    Returns the filtered state and the contribution of $y_t$ to the
    Gaussian log-likelihood.

    \begin{equation}
        \begin{aligned}
            e_t &= y_t - H\, \hat{x}_{t \mid t-1}, \\
            S_t &= H\, P_{t \mid t-1}\, H^\top + R, \\
            K_t &= P_{t \mid t-1}\, H^\top S_t^{-1}, \\
            \hat{x}_{t \mid t} &= \hat{x}_{t \mid t-1} + K_t\, e_t, \\
            P_{t \mid t} &= P_{t \mid t-1} - K_t\, H\, P_{t \mid t-1}.
        \end{aligned}
    \end{equation}

    The log-likelihood contribution of $y_t$ is

    \begin{equation}
        -\tfrac{1}{2}\left(
            n_y \log 2\pi + \log\det S_t + e_t^\top S_t^{-1} e_t
        \right).
    \end{equation}
    """
    model = self.model
    H = model.H[:, None] if model.H.ndim == 1 else model.H
    obs = np.atleast_1d(np.asarray(y, dtype=float))
    innov = obs - H @ pred.mean
    h2 = isotropic_noise(model.R)
    if h2 is not None and model.n_x == 1:
        return self._update_isotropic(pred, H[:, 0], innov, h2)
    S = H @ pred.cov @ H.T + model.R
    sign, log_det = np.linalg.slogdet(S)
    if sign <= 0:
        raise RuntimeError("innovation covariance is not positive definite")
    quad = float(innov @ linalg.solve(S, innov, assume_a="pos"))
    log_lik = -0.5 * (obs.shape[0] * np.log(2.0 * np.pi) + log_det + quad)
    gain = linalg.solve(S, H @ pred.cov, assume_a="pos").T  # P H^T S^{-1}
    mean = pred.mean + gain @ innov
    cov = pred.cov - gain @ H @ pred.cov
    return MeanAndCov(mean=mean, cov=cov), log_lik

quantflow.ta.kalman.UnscentedKalmanFilter pydantic-model

Bases: BaseModel

Unscented Kalman filter (UKF) for a StateSpaceModel.

The UKF handles non-linear, non-Gaussian dynamics by propagating \(2 n_x + 1\) sigma points through the model transition (get_px) and observation (get_py) distributions, matching the first two moments of each. Only the conditional mean and covariance of those distributions are used, so any model that exposes them is supported.

The predicted moments combine the spread of the propagated means with the average conditional covariance (the law of total variance):

\[\begin{equation} \begin{aligned} \hat{x}_{t \mid t-1} &= \sum_i W^m_i\, \mu(\chi_i), \\ P_{t \mid t-1} &= \sum_i W^c_i \bigl(\mu(\chi_i) - \hat{x}_{t \mid t-1}\bigr) \bigl(\mu(\chi_i) - \hat{x}_{t \mid t-1}\bigr)^\top + \sum_i W^m_i\, \Sigma(\chi_i), \end{aligned} \end{equation}\]

where \(\chi_i\) are the sigma points and \(\mu, \Sigma\) are the mean and covariance of get_px. The observation update forms the innovation covariance \(S_t\) and cross covariance \(C_t\) the same way through get_py, with gain \(K_t = C_t S_t^{-1}\). For a LinearGaussianModel the filter reduces to the exact KalmanFilter.

Fields:

model pydantic-field

model

State-space model.

y pydantic-field

y

Observations \(y_{1:T}\) of shape \((T, n_y)\).

states pydantic-field

states

List of filtered state estimates. states[t] is the Gaussian approximation of \(p(x_t \mid y_{1:t})\) with mean and covariance.

alpha pydantic-field

alpha = 0.001

Sigma-point spread.

beta pydantic-field

beta = 2.0

Prior knowledge factor (2 is optimal for Gaussians).

kappa pydantic-field

kappa = 0.0

Secondary scaling parameter.

filter

filter()

Run the UKF forward pass.

Populates states with the filtered estimates and returns the total log-likelihood of the observations.

Source code in quantflow/ta/kalman.py
def filter(self) -> float:
    r"""Run the UKF forward pass.

    Populates [states][..states] with the filtered estimates and returns
    the total log-likelihood of the observations.
    """
    model = self.model
    observations = self.y
    prior = model.get_px0().mean_and_cov()
    x: FloatArray = np.atleast_1d(np.asarray(prior.mean, dtype=float))
    P: FloatArray = np.atleast_2d(np.asarray(prior.cov, dtype=float))
    n_x = x.shape[0]
    Wm, Wc, lmda = self._weights(n_x)

    self.states.clear()
    log_lik = 0.0
    for t in range(observations.shape[0]):
        if t > 0:
            x, P = self._predict(t, x, P, lmda, Wm, Wc)
        x, P, ll_inc = self._update(t, x, P, observations[t], lmda, Wm, Wc)
        log_lik += ll_inc
        self.states.append(MeanAndCov(mean=x.copy(), cov=P.copy()))

    return log_lik