Skip to content

Kalman Filter

quantflow.ta.KalmanFilter pydantic-model

Bases: BaseModel

One-dimensional Kalman filter for time series data.

This implementation uses a simple 1D state-space model:

\[ \begin{align} x_t &= x_{t-1} + w_t, \quad w_t \sim \mathcal{N}(0, Q) \\ z_t &= x_t + v_t, \quad v_t \sim \mathcal{N}(0, R) \end{align} \]

The Kalman filter estimates the hidden state \(x_t\) given noisy measurements \(z_t\). The ratio \(Q/R\) determines the smoothing behavior.

Fields:

  • R (float)
  • Q (float)

R pydantic-field

R = 1.0

Measurement noise covariance

Q pydantic-field

Q = 0.01

Process noise covariance

error_covariance property

error_covariance

Current estimated error covariance.

kalman_gain property

kalman_gain

Most recent Kalman gain.

value

value()

Get the most recent smoothed value (state estimate), if available.

Source code in quantflow/ta/kalman.py
def value(self) -> float | None:
    """Get the most recent smoothed value (state estimate), if available."""
    return self._x

update

update(value)

Update the filter with a new value and return the smoothed result.

PARAMETER DESCRIPTION
value

New noisy measurement to update the filter

TYPE: float

Source code in quantflow/ta/kalman.py
def update(
    self,
    value: Annotated[float, Doc("New noisy measurement to update the filter")],
) -> float:
    """Update the filter with a new value and return the smoothed result."""
    # Initialize on first update
    if self._x is None:
        self._x = value
        self._P = self.R
        return value

    # Prediction step
    # x_pred = x_prev (Random walk model)
    # P_pred = P_prev + Q
    x_pred = self._x
    P_pred = self._P + self.Q

    # Update step
    # K = P_pred / (P_pred + R)
    # x_new = x_pred + K * (measurement - x_pred)
    # P_new = (1 - K) * P_pred
    K = P_pred / (P_pred + self.R)
    x_new = x_pred + K * (value - x_pred)
    P_new = (1 - K) * P_pred

    # Update state
    self._x = x_new
    self._P = P_new
    self._K = K

    return x_new