From 6b0eb0146bd67546f98f6dda545ca9f890971fb2 Mon Sep 17 00:00:00 2001 From: Luca Date: Thu, 4 Jun 2026 14:25:52 +0100 Subject: [PATCH 1/4] Better filter --- docs/api/dists/index.md | 2 + docs/api/ta/kalman.md | 2 - docs/examples/rates_kalman.py | 78 +++--- docs/tutorials/index.md | 2 +- docs/tutorials/rates_kalman.md | 100 +++++--- mkdocs.yml | 2 +- quantflow/rates/cir.py | 221 +++++++++++++++-- quantflow/rates/vasicek.py | 20 +- quantflow/sp/cir.py | 38 ++- quantflow/ta/kalman.py | 387 +++++++++++------------------- quantflow_tests/test_cir.py | 10 + quantflow_tests/test_cir_curve.py | 83 +++++++ quantflow_tests/test_ta_kalman.py | 108 +++++---- 13 files changed, 651 insertions(+), 402 deletions(-) diff --git a/docs/api/dists/index.md b/docs/api/dists/index.md index 55b9d27..2a96a43 100644 --- a/docs/api/dists/index.md +++ b/docs/api/dists/index.md @@ -5,6 +5,8 @@ return types of the [StateSpaceModel](../ta/kalman.md#quantflow.ta.kalman.StateSpaceModel) distribution-level interface. +::: quantflow.dists.MeanAndCov + ::: quantflow.dists.Distribution ::: quantflow.dists.MvNormal diff --git a/docs/api/ta/kalman.md b/docs/api/ta/kalman.md index 31359ba..a07a37b 100644 --- a/docs/api/ta/kalman.md +++ b/docs/api/ta/kalman.md @@ -15,8 +15,6 @@ A state-space model separates a time series into two layers: See the [kalman filter](../../theory/kalman.md) theory page for more details on the algorithms and their applications. -::: quantflow.ta.kalman.MeanAndCov - ::: quantflow.ta.kalman.StateSpaceModel ::: quantflow.ta.kalman.LinearGaussianModel diff --git a/docs/examples/rates_kalman.py b/docs/examples/rates_kalman.py index 901b5fa..2be9be4 100644 --- a/docs/examples/rates_kalman.py +++ b/docs/examples/rates_kalman.py @@ -9,6 +9,7 @@ from docs.examples._utils import assets_path, cached_df from quantflow.data.fed import FederalReserve from quantflow.rates.calibration import tenor_to_years +from quantflow.rates.cir import CIRCurve from quantflow.rates.vasicek import VasicekCurve @@ -25,50 +26,53 @@ async def fetch() -> pd.DataFrame: df = fed_yield_curves() # weekly panel (uniform 7-day step) using the average yield over each week weekly = df.resample("W-WED").mean().dropna() +ttm = np.array([tenor_to_years(c) for c in weekly.columns]) -# calibrate the Vasicek short-rate model to the panel by Kalman-filter MLE -calibrator = VasicekCurve().calibrator() -curve = calibrator.calibrate_historical_rates_dataframe(weekly, frequency=2) -print(curve.model_dump_json(indent=2, exclude={"ref_date"})) +# Vasicek: linear-Gaussian, fitted with the exact Kalman filter +vasicek_cal = VasicekCurve().calibrator() +vasicek = vasicek_cal.calibrate_historical_rates_dataframe(weekly, frequency=2) +print("Vasicek (Kalman filter)") +print(vasicek.model_dump_json(indent=2, exclude={"ref_date"})) -# rebuild model-implied yields from the filtered short rate path: y = (B r - A) / tau -ttm = np.array([tenor_to_years(c) for c in weekly.columns]) -a, b = curve.affine_coefficients(ttm) -A, B = np.asarray(a, dtype=float), np.asarray(b, dtype=float) -short_rate = calibrator.filtered_short_rate # one value per observation date +# CIR: state-dependent variance, fitted with the unscented Kalman filter +cir_cal = CIRCurve().calibrator() +cir = cir_cal.calibrate_historical_rates_dataframe(weekly, frequency=2) +print("CIR (unscented Kalman filter)") +print(cir.model_dump_json(indent=2, exclude={"ref_date"})) + +# rebuild model-implied yields from each filtered short rate path: y = (B r - A) / tau +va_a, va_b = vasicek.affine_coefficients(ttm) +va_A, va_B = np.asarray(va_a, dtype=float), np.asarray(va_b, dtype=float) +va_short = vasicek_cal.filtered_short_rate +cir_a, cir_b = cir.affine_coefficients(ttm) +cir_A, cir_B = np.asarray(cir_a, dtype=float), np.asarray(cir_b, dtype=float) +cir_short = cir_cal.filtered_short_rate -# observed (par -> continuous) and model yields per tenor, over time +# observed (par -> continuous) and both model yields per tenor, over time tenors = ["1Y", "2Y", "5Y", "10Y"] +colours = dict(observed="#636efa", Vasicek="#ef553b", CIR="#00cc96") fig = make_subplots(rows=2, cols=2, subplot_titles=tenors) for k, tenor in enumerate(tenors): i = weekly.columns.get_loc(tenor) - observed = 2.0 * np.log1p(weekly[tenor].to_numpy() / 2.0) * 100 - model = (B[i] * short_rate - A[i]) / ttm[i] * 100 row, col = k // 2 + 1, k % 2 + 1 - fig.add_trace( - go.Scatter( - x=weekly.index, - y=observed, - name="observed", - legendgroup="observed", - showlegend=k == 0, - line=dict(color="#636efa"), - ), - row=row, - col=col, - ) - fig.add_trace( - go.Scatter( - x=weekly.index, - y=model, - name="model", - legendgroup="model", - showlegend=k == 0, - line=dict(color="#ef553b"), - ), - row=row, - col=col, - ) -fig.update_layout(title="Observed vs Vasicek model yields") + series = { + "observed": 2.0 * np.log1p(weekly[tenor].to_numpy() / 2.0) * 100, + "Vasicek": (va_B[i] * va_short - va_A[i]) / ttm[i] * 100, + "CIR": (cir_B[i] * cir_short - cir_A[i]) / ttm[i] * 100, + } + for name, values in series.items(): + fig.add_trace( + go.Scatter( + x=weekly.index, + y=values, + name=name, + legendgroup=name, + showlegend=k == 0, + line=dict(color=colours[name]), + ), + row=row, + col=col, + ) +fig.update_layout(title="Observed vs Vasicek and CIR model yields") fig.update_yaxes(title_text="yield (%)") fig.write_image(assets_path("rates_kalman.png"), width=1600, height=800) diff --git a/docs/tutorials/index.md b/docs/tutorials/index.md index 5e403f5..cce77a8 100644 --- a/docs/tutorials/index.md +++ b/docs/tutorials/index.md @@ -9,4 +9,4 @@ Step-by-step guides for common quantflow workflows. | [Heston Volatility Model](heston_calibration.md) | Calibrate the Heston and Heston-jump-diffusion models to an implied volatility surface | | [SPX Volatility Surface](spx_vol_surface.md) | Build a 3D implied volatility surface for the S&P 500 from a Yahoo Finance option chain | | [BNS Volatility Model](bns_calibration.md) | Calibrate the Barndorff-Nielsen and Shephard stochastic-volatility model to an implied volatility surface | -| [Vasicek Calibration from Rates](rates_kalman.md) | Fit the Vasicek short-rate model to historical Treasury rates by maximum likelihood with a Kalman filter | +| [Yield Curve Calibration from Rates](rates_kalman.md) | Fit the Vasicek (Kalman filter) and CIR (unscented Kalman filter) short-rate models to historical Treasury rates by maximum likelihood | diff --git a/docs/tutorials/rates_kalman.md b/docs/tutorials/rates_kalman.md index c43f7a3..93f46c6 100644 --- a/docs/tutorials/rates_kalman.md +++ b/docs/tutorials/rates_kalman.md @@ -1,27 +1,28 @@ -# Vasicek Calibration from Rates - -This tutorial calibrates the [VasicekCurve][quantflow.rates.vasicek.VasicekCurve] -short-rate model to a panel of historical US Treasury yields by maximum -likelihood, using a [KalmanFilter][quantflow.ta.kalman.KalmanFilter] to evaluate -the likelihood. It shows the full workflow: pulling the data from the Federal -Reserve, reshaping it into a uniform panel, fitting the model, and comparing the -fitted curve with the observed yields. - -For the mechanics of the filter itself, see the +# Yield Curve Calibration from Rates + +This tutorial calibrates two short-rate models, the +[VasicekCurve][quantflow.rates.vasicek.VasicekCurve] and the +[CIRCurve][quantflow.rates.cir.CIRCurve], to a panel of historical US Treasury +yields by maximum likelihood. Both treat the short rate as a latent state and +the yields as noisy observations of it, but they need different filters: Vasicek +is linear-Gaussian and uses the exact +[KalmanFilter][quantflow.ta.kalman.KalmanFilter], while CIR has a state-dependent +variance and uses the +[UnscentedKalmanFilter][quantflow.ta.kalman.UnscentedKalmanFilter]. + +For the mechanics of the filters themselves, see the [Kalman Filter](../theory/kalman.md) theory page. ## The idea -The Vasicek short rate is an Ornstein-Uhlenbeck process. We never observe it -directly: what we observe is a cross section of yields at each date. Treating -the short rate as a latent state and the yields as noisy linear observations of -it turns calibration into a state-space estimation problem. +A short-rate model never observes its state directly: what we observe at each +date is a cross section of yields. Treating the short rate as a latent state and +the yields as noisy observations of it turns calibration into a state-space +estimation problem. -Over a uniform time step the dynamics reduce to a Gaussian AR(1) and each yield -is affine in the short rate. The -[calibrate_historical_rates][quantflow.rates.vasicek.VasicekCurveCalibration.calibrate_historical_rates] -method documents these equations. The Kalman filter computes the exact Gaussian -log-likelihood of the observed panel, and the calibrator maximises it over +At each date every yield is affine in the short rate, so the observation is +linear in both models. The filter computes the Gaussian log-likelihood of the +observed panel, and the calibrator maximises it over $(\kappa, \theta, \sigma, h)$, where $h$ is the observation noise standard deviation. @@ -36,15 +37,50 @@ The calibration assumes a uniform time step, while the raw data is sampled on business days. Resampling to weekly Wednesdays with the average yield over each week gives an evenly spaced panel. +## Vasicek: the exact Kalman filter + +The Vasicek short rate is an Ornstein-Uhlenbeck process. Over a uniform time step +its dynamics reduce to a Gaussian AR(1) with a constant innovation variance, and +each yield is affine in the short rate. The +[calibrate_historical_rates][quantflow.rates.vasicek.VasicekCurveCalibration.calibrate_historical_rates] +method documents these equations. + +Because the model is fully linear-Gaussian, the exact Kalman filter gives the +exact log-likelihood. One full Kalman pass over the panel is performed per +optimiser iteration. + +## CIR: why the unscented filter + +The CIR short rate is a square-root diffusion, +$dr_t = \kappa(\theta - r_t)\,dt + \sigma\sqrt{r_t}\,dW_t$. Its conditional mean +is still linear in the previous state, but its conditional variance depends on +the state: + +\begin{equation} + \text{Var}[r_t \mid r_{t-1}] = + r_{t-1}\,\frac{\sigma^2}{\kappa}\left(\phi - \phi^2\right) + + \theta\,\frac{\sigma^2}{2\kappa}\left(1 - \phi\right)^2, + \quad \phi = e^{-\kappa \Delta t}. +\end{equation} + +The exact Kalman filter assumes a constant process-noise covariance, so it +cannot represent this heteroskedasticity. The unscented Kalman filter only needs +the conditional mean and covariance of the transition, which it propagates +through sigma points, so the state-dependent variance drops straight in. The +[CIRStateSpaceModel][quantflow.rates.cir.CIRStateSpaceModel] supplies those +moments and the affine observation, and +[calibrate_historical_rates][quantflow.rates.cir.CIRCurveCalibration.calibrate_historical_rates] +runs the unscented filter inside the same maximum-likelihood loop. + ## Calibrating +For both models, [calibrate_historical_rates_dataframe][quantflow.rates.calibration.YieldCurveCalibration.calibrate_historical_rates_dataframe] parses the tenor columns into times to maturity, infers the time step from the index, converts the par yields to continuously compounded rates (here `frequency=2` for semiannual compounding), and runs the maximum-likelihood fit. -One full Kalman pass over the panel is performed per optimiser iteration. -The fitted parameters and the final filtered short rate are returned on the +The fitted parameters and the final filtered short rate are returned on each calibrated curve: ``` @@ -53,18 +89,20 @@ calibrated curve: ## Model versus observed through time -The fit is a time-series fit, so the right check is whether the model tracks the -history of each tenor. The calibrator exposes the -[filtered_short_rate][quantflow.rates.vasicek.VasicekCurveCalibration.filtered_short_rate] -path, from which each tenor's model-implied yield is reconstructed through the -affine yield relation and plotted against its observed history. +The fit is a time-series fit, so the right check is whether each model tracks the +history of every tenor. Both calibrators expose a `filtered_short_rate` path +([Vasicek][quantflow.rates.vasicek.VasicekCurveCalibration.filtered_short_rate], +[CIR][quantflow.rates.cir.CIRCurveCalibration.filtered_short_rate]), from which +each tenor's model-implied yield is reconstructed through the affine yield +relation and plotted against its observed history. -The single factor tracks the short and intermediate tenors (1Y, 2Y) closely, but -the model yields are smoother than the data at the long end (5Y, 10Y): one -mean-reverting factor cannot capture the independent variation of the long end, -the expected limitation of the one-factor Vasicek model. +Both single-factor models track the short and intermediate tenors (1Y, 2Y) +closely and are smoother than the data at the long end (5Y, 10Y): one +mean-reverting factor cannot capture the independent variation of the long end. +The two fits stay close in this rate regime, the difference being the CIR +volatility that scales with the level of rates. -[![Observed vs Vasicek model yields](../assets/examples/rates_kalman.png)](../assets/examples/rates_kalman.png){target="_blank"} +[![Observed vs Vasicek and CIR model yields](../assets/examples/rates_kalman.png)](../assets/examples/rates_kalman.png){target="_blank"} ## Code diff --git a/mkdocs.yml b/mkdocs.yml index f3d11de..b73d42b 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -124,8 +124,8 @@ nav: - Option Pricing: tutorials/option_pricing.md - Pricing Method Comparison: tutorials/pricing_method_comparison.md - SPX Volatility Surface: tutorials/spx_vol_surface.md - - Vasicek Calibration from Rates: tutorials/rates_kalman.md - Volatility Surface: tutorials/volatility_surface.md + - Yield Curve Calibration from Rates: tutorials/rates_kalman.md - Theory: - theory/index.md - Characteristic Function: theory/characteristic.md diff --git a/quantflow/rates/cir.py b/quantflow/rates/cir.py index f6a140d..a0b9248 100644 --- a/quantflow/rates/cir.py +++ b/quantflow/rates/cir.py @@ -5,11 +5,13 @@ import numpy as np from numpy.typing import ArrayLike -from pydantic import Field -from scipy.optimize import Bounds, least_squares +from pydantic import Field, PrivateAttr +from scipy.optimize import Bounds, least_squares, minimize from typing_extensions import Annotated, Doc +from quantflow.dists import MvNormal from quantflow.sp.cir import CIR +from quantflow.ta.kalman import StateSpaceModel, UnscentedKalmanFilter from quantflow.utils.numbers import ZERO, DecimalNumber from quantflow.utils.types import FloatArray, FloatArrayLike, maybe_float @@ -112,21 +114,53 @@ def instantaneous_forward_rate(self, ttm: FloatArrayLike) -> FloatArrayLike: def discount_factor(self, ttm: FloatArrayLike) -> FloatArrayLike: r"""Calculate the discount factor using the CIR closed-form solution. - The discount factor is: - \begin{equation} D(\tau) = e^{A(\tau) - B(\tau)\, r_0} \end{equation} - The coefficients are: + where $A(\tau)$ and $B(\tau)$ are the + [affine_coefficients][..affine_coefficients]. + """ + arr = np.asarray(ttm, dtype=float) + ttma = np.maximum(arr, 0.0) + kappa = float(self.kappa) + theta = float(self.theta) + sigma = float(self.sigma) + rate = float(self.rate) + sigma2 = sigma * sigma + gamma = np.sqrt(kappa * kappa + 2.0 * sigma2) + gamma_kappa = gamma + kappa + gamma_m_kappa = gamma - kappa + emgt = np.exp(-gamma * ttma) + # d/e^{γτ} = (γ+κ) + (γ-κ)e^{-γτ} + de = gamma_kappa + gamma_m_kappa * emgt + b = 2.0 * (1.0 - emgt) / de + # log(d) = γτ + log(de) + log_a = (2.0 * kappa * theta / sigma2) * ( + np.log(2.0 * gamma) - 0.5 * gamma_m_kappa * ttma - np.log(de) + ) + df = np.exp(log_a - b * rate) + return maybe_float(df) + + def affine_coefficients( + self, ttm: FloatArrayLike + ) -> tuple[FloatArrayLike, FloatArrayLike]: + r"""Return the affine coefficients $A(\tau)$ and $B(\tau)$ + of the log discount factor. + + \begin{equation} + \log D(\tau) = A(\tau) - B(\tau)\, r_0 + \end{equation} + + where \begin{equation} \begin{aligned} - B(\tau) &= \frac{2(1 - e^{-\gamma\tau})}{d_e(\tau)} \\ + B(\tau) &= \frac{2(1 - e^{-\gamma\tau})}{d_e(\tau)}, \\ A(\tau) &= \frac{2\kappa\theta}{\sigma^2} \ln\!\left( \frac{2\gamma\, e^{-(\gamma - \kappa)\tau/2}}{d_e(\tau)} - \right) + \right). \end{aligned} \end{equation} """ @@ -135,21 +169,16 @@ def discount_factor(self, ttm: FloatArrayLike) -> FloatArrayLike: kappa = float(self.kappa) theta = float(self.theta) sigma = float(self.sigma) - rate = float(self.rate) sigma2 = sigma * sigma gamma = np.sqrt(kappa * kappa + 2.0 * sigma2) - gamma_kappa = gamma + kappa gamma_m_kappa = gamma - kappa emgt = np.exp(-gamma * ttma) - # d/e^{γτ} = (γ+κ) + (γ-κ)e^{-γτ} - de = gamma_kappa + gamma_m_kappa * emgt + de = (gamma + kappa) + gamma_m_kappa * emgt b = 2.0 * (1.0 - emgt) / de - # log(d) = γτ + log(de) log_a = (2.0 * kappa * theta / sigma2) * ( np.log(2.0 * gamma) - 0.5 * gamma_m_kappa * ttma - np.log(de) ) - df = np.exp(log_a - b * rate) - return maybe_float(df) + return maybe_float(log_a), maybe_float(b) def jacobian(self, ttm: FloatArrayLike) -> FloatArray | None: r"""Analytical Jacobian of discount factors w.r.t. @@ -197,9 +226,89 @@ def jacobian(self, ttm: FloatArrayLike) -> FloatArray | None: return np.column_stack([d_rate, d_kappa, d_theta, d_sigma]) +class CIRStateSpaceModel(StateSpaceModel, arbitrary_types_allowed=True): + r"""State-space model for the Cox-Ingersoll-Ross short rate. + + The latent state is the short rate $r_t$, with dynamics taken from the + embedded [CIRCurve][..CIRCurve]. The transition has a linear conditional + mean but a state-dependent conditional variance, the signature of the + square-root diffusion. Both are the one-step + [analytical_mean][quantflow.sp.cir.CIR.analytical_mean] and + [analytical_variance][quantflow.sp.cir.CIR.analytical_variance] of the CIR + process started from $r_{t-1}$. + + Because the conditional variance depends on the state, the constant-$Q$ + linear [KalmanFilter][quantflow.ta.kalman.KalmanFilter] does not apply; the + model is filtered with the + [UnscentedKalmanFilter][quantflow.ta.kalman.UnscentedKalmanFilter], which + only needs the conditional mean and covariance. + + The observations are the affine zero rates + $y_i = (B_i r_t - A_i)/\tau_i$ with Gaussian measurement noise of standard + deviation $h$. + """ + + curve: CIRCurve = Field(description="CIR curve supplying the short-rate dynamics.") + ttm: FloatArray = Field( + description="Times to maturity of the observed rates, shape $(n_y,)$." + ) + dt: float = Field(gt=0, description=r"Time step $\Delta t$ in years.") + h: float = Field(gt=0, description="Observation noise standard deviation $h$.") + + _process: CIR = PrivateAttr(default_factory=CIR) + _affine: tuple[FloatArray, FloatArray] = PrivateAttr( + default_factory=lambda: (np.zeros(0), np.zeros(0)) + ) + + def model_post_init(self, context: object) -> None: + """Cache the CIR process and the affine coefficients + $A(\\tau_i)$, $B(\\tau_i)$.""" + self._process = self.curve.process() + a, b = self.curve.affine_coefficients(self.ttm) + self._affine = (np.asarray(a, dtype=float), np.asarray(b, dtype=float)) + + def get_px0(self) -> MvNormal: + r"""Stationary (equilibrium) distribution of the short rate, used as the + prior for the latent initial state.""" + p = self._process + return MvNormal( + mean=np.array([p.equilibrium_mean]), + cov=np.array([[p.equilibrium_variance]]), + ) + + def get_px(self, t: int, xp: FloatArray) -> MvNormal: + r"""Transition $p(r_t \mid r_{t-1})$ from the CIR conditional moments + over one time step.""" + self._process.rate = max(float(np.atleast_1d(xp)[0]), 0.0) + mean = float(self._process.analytical_mean(self.dt)) + var = float(self._process.analytical_variance(self.dt)) + return MvNormal(mean=np.array([mean]), cov=np.array([[var]])) + + def get_py(self, t: int, xp: FloatArray, x: FloatArray) -> MvNormal: + r"""Observation distribution of the affine zero rates given $r_t$.""" + a, b = self._affine + r = float(np.atleast_1d(x)[0]) + mean = (b * r - a) / self.ttm + cov = self.h * self.h * np.eye(self.ttm.size) + return MvNormal(mean=mean, cov=cov) + + class CIRCurveCalibration(YieldCurveCalibration[CIRCurve]): """Calibration wrapper for a CIR yield curve.""" + _filtered_short_rate: FloatArray | None = PrivateAttr(default=None) + + @property + def filtered_short_rate(self) -> FloatArray: + """Unscented-Kalman-filtered short rate at each observation date. + + Populated by [calibrate_historical_rates][.calibrate_historical_rates]; + accessing it before a historical fit raises an error. + """ + if self._filtered_short_rate is None: + raise AttributeError("run calibrate_historical_rates first") + return self._filtered_short_rate + def get_params(self) -> FloatArray: c = self.yield_curve kappa = float(c.kappa) @@ -266,3 +375,87 @@ def jac(params: np.ndarray) -> FloatArray: ) self.set_params(result.x) return self.yield_curve + + def calibrate_historical_rates( + self, + ttm: Annotated[FloatArray, Doc("Times to maturity in years, shape (n,).")], + rates: Annotated[ + FloatArray, + Doc("Continuously compounded rates, shape (T, n) (time by maturity)."), + ], + dt: Annotated[ + FloatArray, + Doc("Per-step time increments in years, shape (T-1,); assumed uniform."), + ], + ) -> CIRCurve: + r"""Fit CIR by maximum likelihood with an unscented Kalman filter. + + The short rate $r_t$ is the latent state of a + [CIRStateSpaceModel][..CIRStateSpaceModel]. Its exact transition has a + linear conditional mean but a state-dependent conditional variance, so + the panel is filtered with the + [UnscentedKalmanFilter][quantflow.ta.kalman.UnscentedKalmanFilter] + rather than the exact linear filter used for Vasicek. + + The negative log-likelihood is minimised over + $(\kappa, \theta, \rho, h)$, where $h$ is the observation noise standard + deviation and the Feller condition is enforced by reparametrising + $\sigma = \rho \sqrt{2\kappa\theta}$ with $\rho \in (0, 1)$. The curve's + rate is then set to the final filtered short rate. + """ + rates = np.asarray(rates, dtype=float) + ttm = np.asarray(ttm, dtype=float) + dt = np.asarray(dt, dtype=float) + if dt.size and not np.allclose(dt, dt[0], rtol=1e-2): + raise ValueError( + "calibrate_historical_rates assumes a uniform time step; " + "the observation dates are not equally spaced" + ) + step = float(dt[0]) if dt.size else 1.0 + + def unpack(x: np.ndarray) -> tuple[float, float, float, float]: + # x = (log kappa, log theta, logit rho, log h); enforce Feller via + # sigma = rho * sqrt(2 kappa theta) with rho in (0, 1) + kappa = float(np.exp(x[0])) + theta = float(np.exp(x[1])) + rho = 1.0 / (1.0 + np.exp(-x[2])) + sigma = float(rho * np.sqrt(2.0 * kappa * theta)) + return kappa, theta, sigma, float(np.exp(x[3])) + + def filtered( + kappa: float, theta: float, sigma: float, h: float + ) -> UnscentedKalmanFilter: + curve = CIRCurve( + rate=Decimal(str(round(theta, 10))), + kappa=Decimal(str(round(kappa, 10))), + theta=Decimal(str(round(theta, 10))), + sigma=Decimal(str(round(sigma, 10))), + ) + model = CIRStateSpaceModel(curve=curve, ttm=ttm, dt=step, h=h) + return model.unscented_filter(rates) + + theta0 = max(float(np.mean(rates)), 1e-4) + short = rates[:, int(np.argmin(ttm))] + short_mean = max(float(np.mean(short)), 1e-4) + sigma0 = max(float(np.std(np.diff(short)) / np.sqrt(step * short_mean)), 1e-4) + rho0 = min(max(sigma0 / np.sqrt(2.0 * 0.5 * theta0), 0.01), 0.99) + h0 = max(float(np.std(rates - rates.mean(axis=0))) / 10.0, 1e-5) + x0 = np.array( + [np.log(0.5), np.log(theta0), np.log(rho0 / (1.0 - rho0)), np.log(h0)] + ) + + def neg_loglik(x: np.ndarray) -> float: + return -filtered(*unpack(x)).filter() + + result = minimize(neg_loglik, x0, method="Nelder-Mead") + kappa, theta, sigma, h = unpack(result.x) + ukf = filtered(kappa, theta, sigma, h) + ukf.filter() + short_rate = np.array([float(s.mean.item()) for s in ukf.states]) + self._filtered_short_rate = short_rate + c = self.yield_curve + c.rate = Decimal(str(round(float(short_rate[-1]), 10))) + c.kappa = Decimal(str(round(kappa, 10))) + c.theta = Decimal(str(round(theta, 10))) + c.sigma = Decimal(str(round(sigma, 10))) + return c diff --git a/quantflow/rates/vasicek.py b/quantflow/rates/vasicek.py index 20f7f81..d5e008f 100644 --- a/quantflow/rates/vasicek.py +++ b/quantflow/rates/vasicek.py @@ -11,7 +11,7 @@ from quantflow.sp.ou import Vasicek from quantflow.sp.wiener import WienerProcess -from quantflow.ta.kalman import KalmanFilter, LinearGaussianModel, MeanAndCov +from quantflow.ta.kalman import KalmanFilter, LinearGaussianModel from quantflow.utils.numbers import ZERO, DecimalNumber from quantflow.utils.types import FloatArray, FloatArrayLike, maybe_float @@ -299,8 +299,11 @@ def calibrate_historical_rates( step = float(dt[0]) if dt.size else 1.0 def filtered( - kappa: float, theta: float, sigma: float, h: float - ) -> tuple[list[MeanAndCov], float]: + kappa: float, + theta: float, + sigma: float, + h: float, + ) -> KalmanFilter: self.set_params(np.array([0.0, kappa, theta, sigma])) a, b = self.yield_curve.affine_coefficients(ttm) A, B = np.asarray(a, dtype=float), np.asarray(b, dtype=float) @@ -315,7 +318,7 @@ def filtered( mu0=np.zeros(1), cov0=np.array([[sigma * sigma / (2.0 * kappa)]]), ) - return KalmanFilter(model=model, data=rates - offset).filter() + return model.kalman_filter(rates - offset) theta0 = float(np.mean(rates)) short = rates[:, int(np.argmin(ttm))] @@ -324,20 +327,21 @@ def filtered( x0 = np.array([np.log(0.5), theta0, np.log(sigma0), np.log(h0)]) def neg_loglik(x: np.ndarray) -> float: - _, ll = filtered( + return -filtered( float(np.exp(x[0])), float(x[1]), float(np.exp(x[2])), float(np.exp(x[3])), - ) - return -ll + ).filter() result = minimize(neg_loglik, x0, method="Nelder-Mead") kappa = float(np.exp(result.x[0])) theta = float(result.x[1]) sigma = float(np.exp(result.x[2])) h = float(np.exp(result.x[3])) - states, _ = filtered(kappa, theta, sigma, h) + kf = filtered(kappa, theta, sigma, h) + kf.filter() + states = kf.states # filtered short rate path: z_t + theta short_rate = theta + np.array([float(s.mean.item()) for s in states]) self._filtered_short_rate = short_rate diff --git a/quantflow/sp/cir.py b/quantflow/sp/cir.py index 698e6ff..42d05da 100755 --- a/quantflow/sp/cir.py +++ b/quantflow/sp/cir.py @@ -27,7 +27,7 @@ class CIR(IntensityProcess): \end{equation} where $w_t$ is a standard Wiener process. The process stays strictly positive - (Feller condition) when + when the [Feller condition](../../../glossary#feller-condition) holds: \begin{equation} 2\kappa\theta \geq \sigma^2 @@ -57,6 +57,26 @@ def is_positive(self) -> bool: the process stays strictly positive.""" return self.feller_condition >= 0.0 + @property + def equilibrium_mean(self) -> float: + r"""Mean of the stationary (equilibrium) distribution of the process. + + \begin{equation} + \lim_{t \to \infty} E[x_t] = \theta + \end{equation} + """ + return self.theta + + @property + def equilibrium_variance(self) -> float: + r"""Variance of the stationary (equilibrium) distribution of the process. + + \begin{equation} + \lim_{t \to \infty} \mathrm{Var}(x_t) = \frac{\sigma^2 \theta}{2\kappa} + \end{equation} + """ + return self.sigma2 * self.theta / (2.0 * self.kappa) + def sample( self, n: Annotated[int, Doc("Number of paths")], @@ -152,15 +172,17 @@ def integrated_log_laplace(self, t: Vector, u: Vector) -> Vector: r"""Log-Laplace transform of the time-integrated CIR process. \begin{equation} - \phi(t, u) = \log E\!\left[e^{-u \int_0^t x_s\, ds}\right] - = \frac{2\kappa\theta}{\sigma^2} - \log\!\left(\frac{2\gamma\, e^{(\gamma+\kappa)t/2}}{D}\right) - - \frac{2u(e^{\gamma t}-1)}{D}\, x_0 + \begin{aligned} + \phi(t, u) &= \log E\!\left[e^{-u \int_0^t x_s\, ds}\right] \\ + &= \frac{2\kappa\theta}{\sigma^2} + \log\!\left(\frac{2\gamma\, e^{(\gamma+\kappa)t/2}}{D}\right) + - \frac{2u(e^{\gamma t}-1)}{D}\, x_0 \\ + \gamma &= \sqrt{\kappa^2 + 2u\sigma^2} \\ + D &= 2\gamma + (\gamma+\kappa)(e^{\gamma t}-1) + \end{aligned} \end{equation} - where $\gamma = \sqrt{\kappa^2 + 2u\sigma^2}$, - $D = 2\gamma + (\gamma+\kappa)(e^{\gamma t}-1)$, and $x_0$ is - the initial rate. + where $x_0$ is the initial rate. """ kappa = self.kappa sigma2 = self.sigma2 diff --git a/quantflow/ta/kalman.py b/quantflow/ta/kalman.py index 5063638..07a32e1 100644 --- a/quantflow/ta/kalman.py +++ b/quantflow/ta/kalman.py @@ -16,21 +16,22 @@ class StateSpaceModel(BaseModel, ABC): - r"""Generic state-space model with additive Gaussian observation noise. + r"""Generic state-space model \begin{equation} \begin{aligned} - x_t &= f(x_{t-1}, \Delta t) + \varepsilon_t, - \quad \varepsilon_t \sim N(0, Q_t) \\ - y_t &= H_t\, x_t + d_t + \eta_t, - \quad \eta_t \sim N(0, R_t) + x_t &\sim p_x\left(x_t \mid x_{t-1}\right) \\ + y_t &\sim p_y\left(y_t \mid x_t\right) \end{aligned} \end{equation} + + where $p_x$ is the transition distribution of the latent state $x_t$ and + $p_y$ is the observation distribution of the observable $y_t$ given $x_t$. """ @abstractmethod def get_px0(self) -> Distribution: - """Distribution of the initial state $x_0$.""" + r"""Distribution $p_x(x_0)$ of the initial state $x_0$.""" @abstractmethod def get_px( @@ -38,7 +39,8 @@ def get_px( t: Annotated[int, Doc("Time index $t$.")], xp: Annotated[FloatArray, Doc("State at time $t-1$.")], ) -> Distribution: - """Distribution of $x_t$ given $x_{t-1} = x_p$.""" + r"""Distribution $p_x\left(x_t \mid x_{t-1}\right)$ + of $x_t$ given $x_{t-1} = x_p$.""" @abstractmethod def get_py( @@ -47,24 +49,7 @@ def get_py( xp: Annotated[FloatArray, Doc("State at time $t-1$.")], x: Annotated[FloatArray, Doc("State at time $t$.")], ) -> Distribution: - """Distribution of $y_t$ given $x_{t-1}=x_p$ and $x_t=x$.""" - - @abstractmethod - def get_proposal0( - self, - data: Annotated[FloatArray, Doc("Observation data up to time $t$.")], - ) -> Distribution: - """Get the proposal distribution for $y_0$ given observation data.""" - - @abstractmethod - def get_proposal( - self, - t: Annotated[int, Doc("Time index $t$.")], - xp: Annotated[FloatArray, Doc("State at time $t-1$.")], - data: Annotated[FloatArray, Doc("Observation data up to time $t$.")], - ) -> Distribution: - """Get the proposal distribution for $y_t$ given $x_{t-1}=x_p$ - and observation data.""" + r"""Distribution of $y_t$ given $x_{t-1}=x_p$ and $x_t=x$.""" # -- Simulation helpers ---------------------------------------------- @@ -102,6 +87,14 @@ def simulate( y = self.simulate_given_x(x) return x, y + def unscented_filter( + self, + y: Annotated[FloatArray, Doc("Observations $y_{1:T}$ of shape $(T, n_y)$.")], + ) -> UnscentedKalmanFilter: + """Build an [UnscentedKalmanFilter][..UnscentedKalmanFilter] for this + model and the given observations.""" + return UnscentedKalmanFilter(model=self, y=y) + # --------------------------------------------------------------------------- # Linear-Gaussian model @@ -176,6 +169,12 @@ def get_px( t: Annotated[int, Doc("Time index $t$.")], xp: Annotated[FloatArray, Doc("State at time $t-1$.")], ) -> MvNormal: + r"""Transition distribution of latent state $x_t$ given $x_{t-1}=x_p$. + + \begin{equation} + p(x_t \mid x_{t-1}) = \mathcal{N}(F x_{t-1}, Q) + \end{equation} + """ return MvNormal(mean=self.F @ xp, cov=self.Q) def get_py( @@ -186,55 +185,13 @@ def get_py( ) -> MvNormal: return MvNormal(mean=(self.H @ x).ravel(), cov=self.R) - def get_proposal0( + def kalman_filter( self, - data: Annotated[FloatArray, Doc("Observation data of shape $(T, n_y)$.")], - ) -> MvNormal: - r"""Optimal proposal for the initial state $x_0$ given $y_0$. - - Applies a single Kalman update to the prior $(\mu_0, \Sigma_0)$ using - the first observation, returning the exact filtering distribution - $p(x_0 \mid y_0)$. - """ - return self._get_proposal(self.mu0, self.cov0, data[0]) - - def get_proposal( - self, - t: Annotated[int, Doc("Time index $t$.")], - xp: Annotated[FloatArray, Doc("State at time $t-1$.")], - data: Annotated[FloatArray, Doc("Observation data of shape $(T, n_y)$.")], - ) -> MvNormal: - r"""Optimal proposal for $x_t$ given $x_{t-1}=x_p$ and $y_t$. - - Predicts one step forward with the transition dynamics, then applies a - Kalman update with the observation at time $t$ to return the exact - filtering distribution $p(x_t \mid x_{t-1}, y_t)$. - """ - pred_mean = (self.F @ np.atleast_1d(xp)).ravel() - return self._get_proposal(pred_mean, self.Q, data[t]) - - def _get_proposal( - self, - pred_mean: Annotated[FloatArray, Doc("Predicted state mean.")], - pred_cov: Annotated[FloatArray, Doc("Predicted state covariance.")], - y: Annotated[FloatArray, Doc("Observation $y_t$ of shape $(n_y,)$.")], - ) -> MvNormal: - r"""Kalman update of a Gaussian prediction $(m, P)$ with observation $y$. - - \begin{equation} - \begin{aligned} - S &= H P H^\top + R, \quad K = P H^\top S^{-1} \\ - m' &= m + K (y - H m), \quad P' = P - K H P - \end{aligned} - \end{equation} - """ - H = self.H[:, None] if self.H.ndim == 1 else self.H - S = H @ pred_cov @ H.T + self.R - residual = np.atleast_1d(y) - H @ pred_mean - gain = linalg.solve(S, H @ pred_cov, assume_a="pos").T - mean = pred_mean + gain @ residual - cov = pred_cov - gain @ H @ pred_cov - return MvNormal(mean=mean.ravel(), cov=cov) + y: Annotated[FloatArray, Doc("Observations $y_{1:T}$ of shape $(T, n_y)$.")], + ) -> KalmanFilter: + """Convenience method to create a [KalmanFilter][..KalmanFilter] for + this model and the given observations.""" + return KalmanFilter(model=self, y=y) # --------------------------------------------------------------------------- @@ -250,211 +207,143 @@ class KalmanFilter(BaseModel, arbitrary_types_allowed=True): log-likelihood of the observations. The state estimate of $x_t$ given observations $y_{1:s}$ is written - $\hat{x}_{t \mid s}$ with covariance $P_{t \mid s}$. The prediction - propagates the filtered state through the linear dynamics of the model: + $\hat{x}_{t \mid s}$ with covariance $P_{t \mid s}$. + + The **prediction** propagates the filtered state through the linear + dynamics of the model: \begin{equation} \begin{aligned} + \hat{x}_{0 \mid 0} &= \mu_0 \\ \hat{x}_{t \mid t-1} &= F\, \hat{x}_{t-1 \mid t-1}, \\ P_{t \mid t-1} &= F\, P_{t-1 \mid t-1}\, F^\top + Q. \end{aligned} \end{equation} - The update corrects the prediction with the observation $y_t$, where $S_t$ - is the innovation covariance and $K_t$ the Kalman gain: + The **update** corrects the prediction with the observation $y_t$, where $S_t$ + is the innovation covariance and $K_t$ the optimal Kalman gain: \begin{equation} \begin{aligned} + e_t &= y_t - H\, \hat{x}_{t \mid t-1}, \\ S_t &= H\, P_{t \mid t-1}\, H^\top + R, \\ K_t &= P_{t \mid t-1}\, H^\top S_t^{-1}, \\ - \hat{x}_{t \mid t} &= \hat{x}_{t \mid t-1} - + K_t\,(y_t - H\, \hat{x}_{t \mid t-1}), \\ + \hat{x}_{t \mid t} &= \hat{x}_{t \mid t-1} + K_t\, e_t, \\ P_{t \mid t} &= P_{t \mid t-1} - K_t\, H\, P_{t \mid t-1}. \end{aligned} \end{equation} - Each step adds its contribution to the Gaussian log-likelihood, with - innovation $e_t = y_t - H\, \hat{x}_{t \mid t-1}$: + Each step adds its contribution to the Gaussian log-likelihood: \begin{equation} \log L = -\frac{1}{2} \sum_t \left( n_y \log 2\pi + \log\det S_t + e_t^\top S_t^{-1} e_t \right). \end{equation} - - The update uses the Sherman-Morrison identity for $O(n_y)$ innovation - inversion when the observation matrix is a column vector and the - observation covariance is a scaled identity $h^2 I$, and falls back to the - general update otherwise. """ model: LinearGaussianModel = Field(description="Linear-Gaussian state-space model.") - data: FloatArray = Field(description="Observation data of shape $(T, n_y)$.") + y: FloatArray = Field(description="Observations $y_{1:T}$ of shape $(T, n_y)$.") + states: list[MeanAndCov] = Field( + default_factory=list, + description=( + "List of filtered state estimates. ``states[t]`` is the Gaussian " + r"approximation of $p(x_t \mid y_{1:t})$ with mean and covariance." + ), + ) def model_post_init(self, context: object) -> None: - """Ensure the observation data is a 2d array of shape $(T, n_y)$.""" - self.data = np.atleast_2d(self.data) + """Ensure the observations are a 2d array of shape $(T, n_y)$.""" + self.y = np.atleast_2d(self.y) # -- public API ----------------------------------------------------- - def filter( - self, - ) -> Annotated[ - tuple[list[MeanAndCov], float], - Doc( - "Tuple of ``(filtered_states, log_likelihood)``. " - "``filtered_states[t]`` is the Gaussian approximation of " - r"$p(x_t \mid y_{1:t})$." - ), - ]: + def filter(self) -> float: r"""Run the Kalman forward pass. - Returns the sequence of filtered state estimates and the total + Starts from the model prior, then for each observation applies + [predict][..predict] followed by [update][..update]. Populates + [states][..states] with the filtered estimates and returns the total log-likelihood of the observations. """ - model = self.model - F, Q = model.F, model.Q - observations = self.data - x: FloatArray = np.atleast_1d(np.asarray(model.mu0, dtype=float)) - P: FloatArray = np.atleast_2d(np.asarray(model.cov0, dtype=float)) - n_obs, n_y = observations.shape - n_x = x.shape[0] - if P.shape != (n_x, n_x): - raise ValueError( - f"covariance shape {P.shape} is inconsistent with state " - f"dimension {n_x}" - ) - states: list[MeanAndCov] = [] - log_lik: float = -0.5 * n_obs * n_y * np.log(2.0 * np.pi) - - for t in range(n_obs): - # predict + self.states = [] + log_lik = 0.0 + pred = self.model.get_px0().mean_and_cov() + for t in range(self.y.shape[0]): if t > 0: - x = F @ x - P = F @ P @ F.T + Q - - # update with the observation - x, P, ll_inc = self._update(x, P, observations[t], model.H, model.R) + pred = self.predict(self.states[-1]) + state, ll_inc = self.update(pred, self.y[t]) log_lik += ll_inc - states.append(MeanAndCov(mean=x.copy(), cov=P.copy())) - - return states, log_lik + self.states.append(state) + return log_lik - # -- internal update ------------------------------------------------ + # -- Kalman recursion ----------------------------------------------- - @staticmethod - def _update( - x: FloatArray, - P: FloatArray, - y: FloatArray, - H: FloatArray, - R: FloatArray, - d: FloatArray | float = 0.0, - ) -> tuple[FloatArray, FloatArray, float]: - r"""Single Kalman update step. + def predict( + self, + state: Annotated[ + MeanAndCov, Doc(r"Filtered state $(\hat{x}_{t-1}, P_{t-1})$.") + ], + ) -> MeanAndCov: + r"""One-step-ahead state prediction. - Returns ``(x_new, P_new, ll_inc)`` where ``ll_inc`` is the - contribution to the Gaussian log-likelihood: - $-0.5(\log\det S + e^\top S^{-1} e)$ - with $e = y - (H x + d)$ and $S = H P H^\top + R$. + \begin{equation} + \begin{aligned} + \hat{x}_{t \mid t-1} &= F\, \hat{x}_{t-1 \mid t-1}, \\ + P_{t \mid t-1} &= F\, P_{t-1 \mid t-1}\, F^\top + Q. + \end{aligned} + \end{equation} """ - # detect Sherman-Morrison fast-path - use_sm = KalmanFilter._use_sherman_morrison(H, R) - - # predicted observation mean - if H.ndim == 1: - y_pred = H * x.item() + d # scalar state * column vector - else: - y_pred = H @ x + d # shape (n_y,) - innov = y - y_pred # innovation - - if use_sm: - return KalmanFilter._update_sherman_morrison(x, P, innov, H, R) - else: - return KalmanFilter._update_general(x, P, innov, H, R) - - @staticmethod - def _use_sherman_morrison(H: FloatArray, R: FloatArray) -> bool: - """True when H is a column vector and R is a scaled identity.""" - if H.ndim != 1: - return False - n_y = H.shape[0] - if R.shape != (n_y, n_y): - return False - # check R = h^2 * I - if not np.allclose(np.diag(np.diag(R)), R): - return False - diag = np.diag(R) - if not np.allclose(diag, diag[0]): - return False - return True - - @staticmethod - def _update_sherman_morrison( - x: FloatArray, - P: FloatArray, - innov: FloatArray, - H: FloatArray, - R: FloatArray, - ) -> tuple[FloatArray, FloatArray, float]: - r"""Kalman update via the Sherman-Morrison identity. + model = self.model + mean = model.F @ state.mean + cov = model.F @ state.cov @ model.F.T + model.Q + return MeanAndCov(mean=mean, cov=cov) - When $R = h^2 I$ and $H$ is a column vector $c$, the innovation - covariance $S = h^2 I + P \, c c^\top$ is a rank-1 update to a - scaled identity, inverted in $O(n_y)$. - """ - c = H # column vector, shape (n_y,) - h2 = float(R[0, 0]) # h^2 - n_y = c.shape[0] - - p_scalar: float = float(P.item()) # P is (1, 1) or scalar - cc: float = float(c @ c) # c^T c - cv: float = float(c @ innov) # c^T innov - denom: float = h2 + p_scalar * cc # h^2 + P c^T c - - # log |S| - log_det = (n_y - 1) * np.log(h2) + np.log(denom) - # e^T S^{-1} e - quad = (float(innov @ innov) - p_scalar * cv * cv / denom) / h2 - ll_inc = -0.5 * (log_det + quad) - - # state update - x_new = x + p_scalar * cv / denom - P_new = p_scalar * h2 / denom - - # preserve array shapes - return np.atleast_1d(x_new), np.atleast_2d(P_new), ll_inc - - @staticmethod - def _update_general( - x: FloatArray, - P: FloatArray, - innov: FloatArray, - H: FloatArray, - R: FloatArray, - ) -> tuple[FloatArray, FloatArray, float]: - r"""Standard Kalman update with full linear algebra. + def update( + self, + pred: Annotated[ + MeanAndCov, + Doc(r"Predicted state $(\hat{x}_{t \mid t-1}, P_{t \mid t-1})$."), + ], + y: Annotated[FloatArray, Doc(r"Observation $y_t$ of shape $(n_y,)$.")], + ) -> tuple[MeanAndCov, float]: + r"""Correct the prediction with the observation $y_t$. + + Returns the filtered state and the contribution of $y_t$ to the + Gaussian log-likelihood. - $S = H P H^\top + R$, $K = P H^\top S^{-1}$. + \begin{equation} + \begin{aligned} + e_t &= y_t - H\, \hat{x}_{t \mid t-1}, \\ + S_t &= H\, P_{t \mid t-1}\, H^\top + R, \\ + K_t &= P_{t \mid t-1}\, H^\top S_t^{-1}, \\ + \hat{x}_{t \mid t} &= \hat{x}_{t \mid t-1} + K_t\, e_t, \\ + P_{t \mid t} &= P_{t \mid t-1} - K_t\, H\, P_{t \mid t-1}. + \end{aligned} + \end{equation} + + The log-likelihood contribution of $y_t$ is + + \begin{equation} + -\tfrac{1}{2}\left( + n_y \log 2\pi + \log\det S_t + e_t^\top S_t^{-1} e_t + \right). + \end{equation} """ - if H.ndim == 1: - H2 = H[:, None] # (n_y,) -> (n_y, 1) - else: - H2 = H - S = H2 @ P @ H2.T + R # innovation covariance - # log |S| + e^T S^{-1} e + model = self.model + H = model.H[:, None] if model.H.ndim == 1 else model.H + obs = np.atleast_1d(np.asarray(y, dtype=float)) + innov = obs - H @ pred.mean + S = H @ pred.cov @ H.T + model.R sign, log_det = np.linalg.slogdet(S) if sign <= 0: raise RuntimeError("innovation covariance is not positive definite") - S_inv_innov = linalg.solve(S, innov, assume_a="pos") - quad = float(innov @ S_inv_innov) - ll_inc = -0.5 * (log_det + quad) - - # Kalman gain and update - K = linalg.solve(S, H2 @ P.T, assume_a="pos").T # P @ H^T @ S^{-1} - x_new = x + K @ innov - P_new = P - K @ H2 @ P - return x_new, P_new, ll_inc + quad = float(innov @ linalg.solve(S, innov, assume_a="pos")) + log_lik = -0.5 * (obs.shape[0] * np.log(2.0 * np.pi) + log_det + quad) + gain = linalg.solve(S, H @ pred.cov, assume_a="pos").T # P H^T S^{-1} + mean = pred.mean + gain @ innov + cov = pred.cov - gain @ H @ pred.cov + return MeanAndCov(mean=mean, cov=cov), log_lik # --------------------------------------------------------------------------- @@ -494,7 +383,14 @@ class UnscentedKalmanFilter(BaseModel, arbitrary_types_allowed=True): """ model: StateSpaceModel = Field(description="State-space model.") - data: FloatArray = Field(description="Observation data of shape $(T, n_y)$.") + y: FloatArray = Field(description="Observations $y_{1:T}$ of shape $(T, n_y)$.") + states: list[MeanAndCov] = Field( + default_factory=list, + description=( + "List of filtered state estimates. ``states[t]`` is the Gaussian " + r"approximation of $p(x_t \mid y_{1:t})$ with mean and covariance." + ), + ) alpha: float = Field(default=1e-3, description="Sigma-point spread.") beta: float = Field( default=2.0, description="Prior knowledge factor (2 is optimal for Gaussians)." @@ -502,44 +398,35 @@ class UnscentedKalmanFilter(BaseModel, arbitrary_types_allowed=True): kappa: float = Field(default=0.0, description="Secondary scaling parameter.") def model_post_init(self, context: object) -> None: - """Ensure the observation data is a 2d array of shape $(T, n_y)$.""" - self.data = np.atleast_2d(self.data) + """Ensure the observations are a 2d array of shape $(T, n_y)$.""" + self.y = np.atleast_2d(self.y) # -- public API ----------------------------------------------------- - def filter( - self, - ) -> Annotated[ - tuple[list[MeanAndCov], float], - Doc( - "Tuple of ``(filtered_states, log_likelihood)``. " - "``filtered_states[t]`` is the Gaussian approximation of " - r"$p(x_t \mid y_{1:t})$." - ), - ]: + def filter(self) -> float: r"""Run the UKF forward pass. - Returns the sequence of filtered state estimates and the total - log-likelihood of the observations. + Populates [states][..states] with the filtered estimates and returns + the total log-likelihood of the observations. """ model = self.model - observations = self.data + observations = self.y prior = model.get_px0().mean_and_cov() x: FloatArray = np.atleast_1d(np.asarray(prior.mean, dtype=float)) P: FloatArray = np.atleast_2d(np.asarray(prior.cov, dtype=float)) n_x = x.shape[0] Wm, Wc, lmda = self._weights(n_x) - states: list[MeanAndCov] = [] + self.states.clear() log_lik = 0.0 for t in range(observations.shape[0]): if t > 0: x, P = self._predict(t, x, P, lmda, Wm, Wc) x, P, ll_inc = self._update(t, x, P, observations[t], lmda, Wm, Wc) log_lik += ll_inc - states.append(MeanAndCov(mean=x.copy(), cov=P.copy())) + self.states.append(MeanAndCov(mean=x.copy(), cov=P.copy())) - return states, log_lik + return log_lik # -- internals ------------------------------------------------------ diff --git a/quantflow_tests/test_cir.py b/quantflow_tests/test_cir.py index 82f2743..a0c6309 100644 --- a/quantflow_tests/test_cir.py +++ b/quantflow_tests/test_cir.py @@ -55,3 +55,13 @@ def test_cir_pdf_neg(cir_neg: CIR): # skip x=0 where the analytical pdf diverges for q<0 mask = pdf.x > 0 np.testing.assert_array_almost_equal(pdf.y[mask], m.pdf(pdf.x[mask]), 1e-1) + + +def test_cir_equilibrium(cir: CIR) -> None: + assert cir.equilibrium_mean == cir.theta + assert cir.equilibrium_variance == pytest.approx( + cir.sigma2 * cir.theta / (2.0 * cir.kappa) + ) + # the equilibrium moments are the t -> infinity limit of the conditional ones + assert cir.analytical_mean(1e4) == pytest.approx(cir.equilibrium_mean) + assert cir.analytical_variance(1e4) == pytest.approx(cir.equilibrium_variance) diff --git a/quantflow_tests/test_cir_curve.py b/quantflow_tests/test_cir_curve.py index c01599a..37a9d13 100644 --- a/quantflow_tests/test_cir_curve.py +++ b/quantflow_tests/test_cir_curve.py @@ -3,8 +3,10 @@ from decimal import Decimal import numpy as np +import pandas as pd import pytest +from quantflow.rates.calibration import tenor_to_years from quantflow.rates.cir import CIRCurve @@ -86,3 +88,84 @@ def test_cir_continuously_compounded_rate() -> None: expected_rates = -np.log(df) / ttm computed_rates = np.asarray(curve.continuously_compounded_rate(ttm)) np.testing.assert_allclose(computed_rates, expected_rates, rtol=1e-10) + + +def test_cir_affine_coefficients_match_discount_factor() -> None: + # log D(tau) = A(tau) - B(tau) * r0 must reproduce the closed-form factor + curve = CIRCurve( + rate=Decimal("0.037"), + kappa=Decimal("0.9"), + theta=Decimal("0.05"), + sigma=Decimal("0.08"), + ) + ttm = np.array([0.25, 1.0, 3.0, 7.0, 10.0]) + a, b = curve.affine_coefficients(ttm) + log_df = np.asarray(a) - np.asarray(b) * float(curve.rate) + np.testing.assert_allclose( + np.exp(log_df), np.asarray(curve.discount_factor(ttm)), rtol=1e-12 + ) + + +def _simulate_cir_panel( + curve: CIRCurve, + tenors: list[str], + n_obs: int, + dt: float, + noise: float, + seed: int, +) -> pd.DataFrame: + rng = np.random.default_rng(seed) + kappa = float(curve.kappa) + theta = float(curve.theta) + sigma = float(curve.sigma) + phi = np.exp(-kappa * dt) + s2 = sigma * sigma + r = float(curve.rate) + short_rates = np.empty(n_obs) + for t in range(n_obs): + short_rates[t] = r + mean = theta + (r - theta) * phi + var = ( + max(r, 0.0) * s2 / kappa * (phi - phi * phi) + + theta * s2 / (2.0 * kappa) * (1.0 - phi) ** 2 + ) + r = max(mean + np.sqrt(var) * rng.standard_normal(), 1e-6) + ttm = np.array([tenor_to_years(s) for s in tenors], dtype=float) + rows = [] + for r_t in short_rates: + sample = CIRCurve( + rate=Decimal(str(r_t)), + kappa=curve.kappa, + theta=curve.theta, + sigma=curve.sigma, + ) + df = np.asarray(sample.discount_factor(ttm), dtype=float) + y = -np.log(df) / ttm + rows.append(y + noise * rng.standard_normal(len(tenors))) + index = pd.date_range("2020-01-01", periods=n_obs, freq=pd.Timedelta(days=dt * 365)) + return pd.DataFrame(rows, index=index, columns=tenors) + + +def test_cir_calibrate_historical_rates_recovers_parameters() -> None: + true_curve = CIRCurve( + rate=Decimal("0.04"), + kappa=Decimal("0.6"), + theta=Decimal("0.04"), + sigma=Decimal("0.05"), + ) + tenors = ["3m", "6m", "1y", "2y", "5y", "10y"] + panel = _simulate_cir_panel( + true_curve, tenors, n_obs=600, dt=1 / 52, noise=1e-5, seed=1 + ) + calibrator = CIRCurve().calibrator() + fitted = calibrator.calibrate_historical_rates_dataframe(panel) + assert float(fitted.kappa) == pytest.approx(0.6, rel=0.4) + assert float(fitted.theta) == pytest.approx(0.04, abs=0.01) + assert float(fitted.sigma) == pytest.approx(0.05, rel=0.4) + # the filtered short rate tracks the (latent) simulated short rate + assert len(calibrator.filtered_short_rate) == len(panel) + + +def test_cir_filtered_short_rate_requires_fit() -> None: + with pytest.raises(AttributeError): + CIRCurve().calibrator().filtered_short_rate diff --git a/quantflow_tests/test_ta_kalman.py b/quantflow_tests/test_ta_kalman.py index bfb4e6c..e502808 100644 --- a/quantflow_tests/test_ta_kalman.py +++ b/quantflow_tests/test_ta_kalman.py @@ -5,7 +5,6 @@ from quantflow.dists import MvNormal from quantflow.ta.kalman import ( - KalmanFilter, LinearGaussianModel, MeanAndCov, StateSpaceModel, @@ -47,12 +46,6 @@ def get_px(self, t: int, xp: np.ndarray) -> MvNormal: def get_py(self, t: int, xp: np.ndarray, x: np.ndarray) -> MvNormal: return MvNormal(mean=np.asarray(x, dtype=float), cov=np.eye(1) * 0.1) - def get_proposal0(self, data: np.ndarray) -> MvNormal: - return self.get_px0() - - def get_proposal(self, t: int, xp: np.ndarray, data: np.ndarray) -> MvNormal: - return self.get_px(t, xp) - class TestMeanAndCov: def test_unpacking(self) -> None: @@ -94,35 +87,47 @@ def test_get_py(self) -> None: assert law.mean[0] == pytest.approx(3.0) assert law.cov[0, 0] == pytest.approx(0.5) - def test_proposal0_is_exact_update(self) -> None: - # prior (0, 1), y=1.2, R=0.5 -> S=1.5, K=2/3, mean=0.8, cov=1/3 - mean, cov = linear_1d().get_proposal0(np.array([[1.2]])).mean_and_cov() - assert mean[0] == pytest.approx(0.8) - assert cov[0, 0] == pytest.approx(1.0 / 3.0) - class TestKalmanFilter: def test_filter_shapes_1d(self) -> None: - data = np.random.default_rng(0).normal(size=(15, 1)) - states, ll = KalmanFilter(model=linear_1d(), data=data).filter() - assert len(states) == 15 + y = np.random.default_rng(0).normal(size=(15, 1)) + kf = linear_1d().kalman_filter(y) + ll = kf.filter() + assert len(kf.states) == 15 assert np.isfinite(ll) - for s in states: + for s in kf.states: assert s.mean.shape == (1,) assert s.cov.shape == (1, 1) def test_filter_shapes_2d(self) -> None: - data = np.random.default_rng(1).normal(size=(20, 2)) - states, ll = KalmanFilter(model=linear_2d(), data=data).filter() - assert len(states) == 20 + y = np.random.default_rng(1).normal(size=(20, 2)) + kf = linear_2d().kalman_filter(y) + ll = kf.filter() + assert len(kf.states) == 20 assert ll < 0 - for s in states: + for s in kf.states: assert s.mean.shape == (2,) assert s.cov.shape == (2, 2) - def test_data_promoted_to_2d(self) -> None: - kf = KalmanFilter(model=linear_1d(), data=np.zeros(10)) - assert kf.data.shape == (1, 10) + def test_observations_promoted_to_2d(self) -> None: + kf = linear_1d().kalman_filter(np.zeros(10)) + assert kf.y.shape == (1, 10) + + def test_predict_step(self) -> None: + # x=2, P=1 -> mean = 0.95*2, cov = 0.95^2 + 0.01 + kf = linear_1d().kalman_filter(np.zeros((1, 1))) + pred = kf.predict(MeanAndCov(mean=np.array([2.0]), cov=np.array([[1.0]]))) + assert pred.mean[0] == pytest.approx(0.95 * 2.0) + assert pred.cov[0, 0] == pytest.approx(0.95**2 + 0.01) + + def test_update_is_exact(self) -> None: + # prior (0, 1), y=1.2, R=0.5 -> S=1.5, K=2/3, mean=0.8, cov=1/3 + model = linear_1d() + kf = model.kalman_filter(np.array([[1.2]])) + state, ll = kf.update(model.get_px0().mean_and_cov(), np.array([1.2])) + assert state.mean[0] == pytest.approx(0.8) + assert state.cov[0, 0] == pytest.approx(1.0 / 3.0) + assert np.isfinite(ll) def test_variance_reduces_toward_observation(self) -> None: m = LinearGaussianModel( @@ -133,15 +138,15 @@ def test_variance_reduces_toward_observation(self) -> None: mu0=np.array([0.0]), cov0=np.eye(1) * 100.0, ) - states, _ = KalmanFilter(model=m, data=np.ones((5, 1)) * 5.0).filter() - assert float(states[-1].cov.item()) < 1.0 - assert float(states[-1].mean.item()) == pytest.approx(5.0, abs=0.1) - - def test_sherman_morrison_matches_general(self) -> None: - # H as a 1d column vector with scaled-identity R takes the O(n_y) - # Sherman-Morrison path; the equivalent 2d H takes the general path. - data = np.random.default_rng(3).normal(size=(12, 3)) - sm = LinearGaussianModel( + kf = m.kalman_filter(np.ones((5, 1)) * 5.0) + kf.filter() + assert float(kf.states[-1].cov.item()) < 1.0 + assert float(kf.states[-1].mean.item()) == pytest.approx(5.0, abs=0.1) + + def test_observation_matrix_shapes_match(self) -> None: + # a 1d column-vector H and the equivalent 2d (n_y, 1) H must agree. + y = np.random.default_rng(3).normal(size=(12, 3)) + vec = LinearGaussianModel( F=np.array([[0.95]]), Q=np.array([[0.01]]), H=np.array([1.0, 2.0, 0.5]), @@ -149,11 +154,13 @@ def test_sherman_morrison_matches_general(self) -> None: mu0=np.array([0.0]), cov0=np.array([[1.0]]), ) - general = sm.model_copy(update={"H": np.array([[1.0], [2.0], [0.5]])}) - sm_states, sm_ll = KalmanFilter(model=sm, data=data).filter() - gen_states, gen_ll = KalmanFilter(model=general, data=data).filter() - assert sm_ll == pytest.approx(gen_ll) - for a, b in zip(sm_states, gen_states): + mat = vec.model_copy(update={"H": np.array([[1.0], [2.0], [0.5]])}) + vec_kf = vec.kalman_filter(y) + mat_kf = mat.kalman_filter(y) + vec_ll = vec_kf.filter() + mat_ll = mat_kf.filter() + assert vec_ll == pytest.approx(mat_ll) + for a, b in zip(vec_kf.states, mat_kf.states): assert np.allclose(a.mean, b.mean) assert np.allclose(a.cov, b.cov) @@ -161,11 +168,13 @@ def test_sherman_morrison_matches_general(self) -> None: class TestUnscentedKalmanFilter: def test_reduces_to_kalman_filter(self) -> None: # On a linear-Gaussian model the UKF is exact: it matches the KF. - data = np.random.default_rng(5).normal(size=(20, 2)) - ks, kll = KalmanFilter(model=linear_2d(), data=data).filter() - us, ull = UnscentedKalmanFilter(model=linear_2d(), data=data).filter() + y = np.random.default_rng(5).normal(size=(20, 2)) + kf = linear_2d().kalman_filter(y) + ukf = UnscentedKalmanFilter(model=linear_2d(), y=y) + kll = kf.filter() + ull = ukf.filter() assert kll == pytest.approx(ull) - for k, u in zip(ks, us): + for k, u in zip(kf.states, ukf.states): assert np.allclose(k.mean, u.mean) assert np.allclose(k.cov, u.cov) @@ -176,20 +185,19 @@ def test_runs_on_nonlinear_model(self) -> None: if i > 0: x_true = float(np.sin(x_true)) + 0.1 * rng.normal() obs.append([x_true + 0.3 * rng.normal()]) - states, ll = UnscentedKalmanFilter( - model=SineModel(), data=np.array(obs) - ).filter() - assert len(states) == 15 + ukf = UnscentedKalmanFilter(model=SineModel(), y=np.array(obs)) + ll = ukf.filter() + assert len(ukf.states) == 15 assert np.isfinite(ll) - for s in states: + for s in ukf.states: assert s.mean.shape == (1,) assert s.cov.shape == (1, 1) def test_default_parameters(self) -> None: - ukf = UnscentedKalmanFilter(model=SineModel(), data=np.zeros((8, 1))) + ukf = UnscentedKalmanFilter(model=SineModel(), y=np.zeros((8, 1))) assert ukf.alpha == pytest.approx(1e-3) assert ukf.beta == pytest.approx(2.0) assert ukf.kappa == pytest.approx(0.0) - states, ll = ukf.filter() - assert len(states) == 8 + ll = ukf.filter() + assert len(ukf.states) == 8 assert np.isfinite(ll) From 91f3d6d9e353c739d62821449cd1437fa151a65b Mon Sep 17 00:00:00 2001 From: Luca Date: Thu, 4 Jun 2026 19:59:09 +0100 Subject: [PATCH 2/4] Better tutorial --- docs/examples/rates_kalman.py | 17 ++++++++++------- quantflow/rates/yield_curve.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/docs/examples/rates_kalman.py b/docs/examples/rates_kalman.py index 2be9be4..5be16b5 100644 --- a/docs/examples/rates_kalman.py +++ b/docs/examples/rates_kalman.py @@ -40,13 +40,16 @@ async def fetch() -> pd.DataFrame: print("CIR (unscented Kalman filter)") print(cir.model_dump_json(indent=2, exclude={"ref_date"})) -# rebuild model-implied yields from each filtered short rate path: y = (B r - A) / tau -va_a, va_b = vasicek.affine_coefficients(ttm) -va_A, va_B = np.asarray(va_a, dtype=float), np.asarray(va_b, dtype=float) +# model-implied semi-annual rates at each date from the filtered short rate paths va_short = vasicek_cal.filtered_short_rate -cir_a, cir_b = cir.affine_coefficients(ttm) -cir_A, cir_B = np.asarray(cir_a, dtype=float), np.asarray(cir_b, dtype=float) cir_short = cir_cal.filtered_short_rate +va_model = np.zeros((len(va_short), len(ttm))) +cir_model = np.zeros((len(cir_short), len(ttm))) +for t in range(len(va_short)): + vasicek.rate = float(va_short[t]) + va_model[t] = np.asarray(vasicek.rates(ttm), dtype=float) + cir.rate = float(cir_short[t]) + cir_model[t] = np.asarray(cir.rates(ttm), dtype=float) # observed (par -> continuous) and both model yields per tenor, over time tenors = ["1Y", "2Y", "5Y", "10Y"] @@ -57,8 +60,8 @@ async def fetch() -> pd.DataFrame: row, col = k // 2 + 1, k % 2 + 1 series = { "observed": 2.0 * np.log1p(weekly[tenor].to_numpy() / 2.0) * 100, - "Vasicek": (va_B[i] * va_short - va_A[i]) / ttm[i] * 100, - "CIR": (cir_B[i] * cir_short - cir_A[i]) / ttm[i] * 100, + "Vasicek": va_model[:, i] * 100, + "CIR": cir_model[:, i] * 100, } for name, values in series.items(): fig.add_trace( diff --git a/quantflow/rates/yield_curve.py b/quantflow/rates/yield_curve.py index d8093ef..6c4b0fb 100644 --- a/quantflow/rates/yield_curve.py +++ b/quantflow/rates/yield_curve.py @@ -109,6 +109,34 @@ def continuously_compounded_rate( ) return maybe_float(result) + def rate( + self, + ttm: Annotated[ArrayLike, Doc("Time to maturity in years")], + frequency: Annotated[ + int, + Doc( + "Compounding periods per year (e.g. 2 for semi-annual). " + "Pass 0 for continuously compounded." + ), + ] = 2, + ) -> FloatArrayLike: + r"""Calculate zero rates compounded at the given frequency. + + The continuously compounded rate $r_c(\tau)$ is converted to a + rate compounded $m$ times per year via: + + \begin{equation} + r_m(\tau) = m\,(e^{r_c(\tau)/m} - 1) + \end{equation} + + When ``frequency=0`` the result is continuously compounded (same as + [continuously_compounded_rate][..continuously_compounded_rate]). + """ + rc = self.continuously_compounded_rate(ttm) + if frequency <= 0: + return rc + return frequency * np.expm1(rc / frequency) + def plot( self, ttm_max: Annotated[float, Doc("Maximum time to maturity in years")] = 10.0, From 425e54f8f57f0b77dbd42a06dc84d63355783323 Mon Sep 17 00:00:00 2001 From: Luca Date: Thu, 4 Jun 2026 21:24:54 +0100 Subject: [PATCH 3/4] Fix lint --- docs/examples/rates_kalman.py | 5 +++-- quantflow/rates/yield_curve.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/examples/rates_kalman.py b/docs/examples/rates_kalman.py index 5be16b5..682b9a0 100644 --- a/docs/examples/rates_kalman.py +++ b/docs/examples/rates_kalman.py @@ -1,5 +1,6 @@ import asyncio from datetime import timedelta +from decimal import Decimal import numpy as np import pandas as pd @@ -46,9 +47,9 @@ async def fetch() -> pd.DataFrame: va_model = np.zeros((len(va_short), len(ttm))) cir_model = np.zeros((len(cir_short), len(ttm))) for t in range(len(va_short)): - vasicek.rate = float(va_short[t]) + vasicek.rate = Decimal(str(float(va_short[t]))) va_model[t] = np.asarray(vasicek.rates(ttm), dtype=float) - cir.rate = float(cir_short[t]) + cir.rate = Decimal(str(float(cir_short[t]))) cir_model[t] = np.asarray(cir.rates(ttm), dtype=float) # observed (par -> continuous) and both model yields per tenor, over time diff --git a/quantflow/rates/yield_curve.py b/quantflow/rates/yield_curve.py index 6c4b0fb..6ee2633 100644 --- a/quantflow/rates/yield_curve.py +++ b/quantflow/rates/yield_curve.py @@ -109,7 +109,7 @@ def continuously_compounded_rate( ) return maybe_float(result) - def rate( + def rates( self, ttm: Annotated[ArrayLike, Doc("Time to maturity in years")], frequency: Annotated[ From 00c5b1568d41561f82fc128fe10c6fc976fb5cc5 Mon Sep 17 00:00:00 2001 From: Luca Date: Thu, 4 Jun 2026 22:02:36 +0100 Subject: [PATCH 4/4] Fix lint --- docs/api/rates/index.md | 4 +- docs/theory/kalman.md | 8 +- quantflow/ta/kalman.py | 68 ++++++ quantflow_tests/test_ta_kalman.py | 367 +++++++++++++++++------------- 4 files changed, 288 insertions(+), 159 deletions(-) diff --git a/docs/api/rates/index.md b/docs/api/rates/index.md index 47d16b7..d4b55a9 100644 --- a/docs/api/rates/index.md +++ b/docs/api/rates/index.md @@ -6,7 +6,7 @@ The central concept is the [discount factor](../../glossary.md#discount-factor) **[Rate](interest_rate.md)** represents a spot or forward interest rate with a chosen compounding frequency (continuous by default) and day count convention. It supports continuous and periodic compounding and can be bootstrapped directly from a spot/forward pair. -**[YieldCurve](yield_curve.md)** is the abstract base for term-structure models. It defines the interface via `discount_factor` and `instantaneous_forward_rate`, with the two quantities linked by +**[YieldCurve](yield_curve.md)** is the abstract base for term-structure models. It defines the interface via [discount_factor][quantflow.rates.yield_curve.YieldCurve.discount_factor] and [instantaneous_forward_rate][quantflow.rates.yield_curve.YieldCurve.instantaneous_forward_rate], with the two quantities linked by \begin{equation} f(\tau) = -\frac{\partial \ln D_\tau}{\partial \tau} @@ -18,4 +18,4 @@ The central concept is the [discount factor](../../glossary.md#discount-factor) **[VasicekCurve](vasicek.md)** is a Gaussian mean-reverting short-rate model with analytical formulas for discount factors and instantaneous forward rates. -**[Options Discounting](options.md)** provides `YieldCurveCalibration`, the base class for fitting a yield curve to discount factors, and `OptionsDiscountingCalibration`, which bootstraps asset and quote curves from put-call parity observations. +**[Calibration](calibration.md)** provides [YieldCurveCalibration][quantflow.rates.calibration.YieldCurveCalibration], the base class for fitting a yield curve to discount factors, and [OptionsDiscountingCalibration][quantflow.rates.calibration.OptionsDiscountingCalibration], which bootstraps asset and quote curves from put-call parity observations. diff --git a/docs/theory/kalman.md b/docs/theory/kalman.md index e02ba03..8e9b01e 100644 --- a/docs/theory/kalman.md +++ b/docs/theory/kalman.md @@ -239,9 +239,11 @@ without user intervention. When the transition $f(x, \Delta t)$ is non-linear the Kalman predict step is no longer exact. The unscented Kalman filter (UKF), introduced by [Julier & Uhlmann (1997)](../bibliography.md#julier_uhlmann), replaces it -with a **sigma-point** propagation. The observation update is unchanged: it -reuses the Kalman update step described above, so the UKF inherits the -Sherman-Morrison optimisation when applicable. +with a **sigma-point** propagation. The observation update follows the same +structure, but builds the innovation covariance $S_t$ and the cross covariance +$C_t$ from the propagated sigma points and solves the gain $K_t = C_t S_t^{-1}$ +with a dense Cholesky factorisation. The Sherman-Morrison fast path is specific +to the exact linear filter and is not used here. ### Sigma-Point Predict diff --git a/quantflow/ta/kalman.py b/quantflow/ta/kalman.py index 07a32e1..450e914 100644 --- a/quantflow/ta/kalman.py +++ b/quantflow/ta/kalman.py @@ -199,6 +199,28 @@ def kalman_filter( # --------------------------------------------------------------------------- +def isotropic_noise( + R: Annotated[FloatArray, Doc("Observation noise covariance of shape (n_y, n_y).")], +) -> float | None: + r"""Detect isotropic observation noise of the form $R = h^2 I$. + + Returns the scalar variance $h^2$ when every diagonal entry equals $h^2$ + and all off-diagonal entries are zero, otherwise returns ``None``. + + This is the precondition for the Sherman-Morrison fast path in the Kalman + update: when the noise is isotropic and the state is one-dimensional, the + innovation covariance is a rank-1 update to a scaled identity and can be + inverted in $O(n_y)$ instead of $O(n_y^3)$. + """ + n = R.shape[0] + h2 = float(R[0, 0]) + if h2 <= 0.0: + return None + if n > 1 and not np.allclose(R, h2 * np.eye(n)): + return None + return h2 + + class KalmanFilter(BaseModel, arbitrary_types_allowed=True): r"""Kalman filter for a [LinearGaussianModel][..LinearGaussianModel]. @@ -240,6 +262,23 @@ class KalmanFilter(BaseModel, arbitrary_types_allowed=True): n_y \log 2\pi + \log\det S_t + e_t^\top S_t^{-1} e_t \right). \end{equation} + + When the state is one-dimensional and the observation noise is isotropic + ($R = h^2 I$), the innovation covariance $S_t = h^2 I + P_{t \mid t-1}\, c c^\top$ + is a rank-1 update to a scaled identity, where $c = H$ is a column vector. + + The [Sherman-Morrison identity](../../glossary.md#sherman-morrison-identity) + inverts it in $O(n_y)$ instead of $O(n_y^3)$: + + \begin{equation} + S_t^{-1} = \frac{1}{h^2}\left( + I - \frac{P_{t \mid t-1}\, c c^\top}{h^2 + P_{t \mid t-1}\, c^\top c} + \right). + \end{equation} + + The update detects this case automatically (see + [isotropic_noise][..isotropic_noise]) and applies the fast path; otherwise + it falls back to a dense Cholesky solve. """ model: LinearGaussianModel = Field(description="Linear-Gaussian state-space model.") @@ -334,6 +373,9 @@ def update( H = model.H[:, None] if model.H.ndim == 1 else model.H obs = np.atleast_1d(np.asarray(y, dtype=float)) innov = obs - H @ pred.mean + h2 = isotropic_noise(model.R) + if h2 is not None and model.n_x == 1: + return self._update_isotropic(pred, H[:, 0], innov, h2) S = H @ pred.cov @ H.T + model.R sign, log_det = np.linalg.slogdet(S) if sign <= 0: @@ -345,6 +387,32 @@ def update( cov = pred.cov - gain @ H @ pred.cov return MeanAndCov(mean=mean, cov=cov), log_lik + def _update_isotropic( + self, + pred: MeanAndCov, + c: FloatArray, + innov: FloatArray, + h2: float, + ) -> tuple[MeanAndCov, float]: + """Sherman-Morrison fast path of [update][..update] for a 1d state with + isotropic noise. + + Equivalent to the dense path but inverts the rank-1 innovation + covariance in closed form. See the class docstring and the theory docs + for the derivation. + """ + n_y = innov.shape[0] + p = float(pred.cov[0, 0]) + d = h2 + p * float(c @ c) # h^2 + P c^T c + ce = float(c @ innov) # c^T e_t + log_det = (n_y - 1) * np.log(h2) + np.log(d) + quad = (float(innov @ innov) - p * ce * ce / d) / h2 + log_lik = -0.5 * (n_y * np.log(2.0 * np.pi) + log_det + quad) + gain = (p / d) * c # K_t = (P / d) c^T, shape (n_y,) + mean = pred.mean + gain @ innov + cov = np.array([[p * h2 / d]]) + return MeanAndCov(mean=mean, cov=cov), log_lik + # --------------------------------------------------------------------------- # Unscented Kalman filter diff --git a/quantflow_tests/test_ta_kalman.py b/quantflow_tests/test_ta_kalman.py index e502808..272177a 100644 --- a/quantflow_tests/test_ta_kalman.py +++ b/quantflow_tests/test_ta_kalman.py @@ -9,6 +9,7 @@ MeanAndCov, StateSpaceModel, UnscentedKalmanFilter, + isotropic_noise, ) @@ -47,157 +48,215 @@ def get_py(self, t: int, xp: np.ndarray, x: np.ndarray) -> MvNormal: return MvNormal(mean=np.asarray(x, dtype=float), cov=np.eye(1) * 0.1) -class TestMeanAndCov: - def test_unpacking(self) -> None: - mc = MeanAndCov(mean=np.array([1.0]), cov=np.array([[2.0]])) - m, c = mc - assert m[0] == pytest.approx(1.0) - assert c[0, 0] == pytest.approx(2.0) - - -class TestLinearGaussianModel: - def test_shape_coercion(self) -> None: - m = LinearGaussianModel( - F=np.array(0.95), - Q=np.array(0.01), - H=np.array(1.0), - R=np.array(0.5), - mu0=np.array(0.0), - cov0=np.array(1.0), - ) - assert m.F.shape == (1, 1) - assert m.Q.shape == (1, 1) - assert m.R.shape == (1, 1) - assert m.cov0.shape == (1, 1) - assert m.mu0.shape == (1,) - assert m.n_x == 1 - - def test_get_px0(self) -> None: - mean, cov = linear_1d().get_px0().mean_and_cov() - assert mean[0] == pytest.approx(0.0) - assert cov[0, 0] == pytest.approx(1.0) - - def test_get_px(self) -> None: - law = linear_1d().get_px(1, np.array([2.0])) - assert law.mean[0] == pytest.approx(0.95 * 2.0) - assert law.cov[0, 0] == pytest.approx(0.01) - - def test_get_py(self) -> None: - law = linear_1d().get_py(1, np.array([0.0]), np.array([3.0])) - assert law.mean[0] == pytest.approx(3.0) - assert law.cov[0, 0] == pytest.approx(0.5) - - -class TestKalmanFilter: - def test_filter_shapes_1d(self) -> None: - y = np.random.default_rng(0).normal(size=(15, 1)) - kf = linear_1d().kalman_filter(y) - ll = kf.filter() - assert len(kf.states) == 15 - assert np.isfinite(ll) - for s in kf.states: - assert s.mean.shape == (1,) - assert s.cov.shape == (1, 1) - - def test_filter_shapes_2d(self) -> None: - y = np.random.default_rng(1).normal(size=(20, 2)) - kf = linear_2d().kalman_filter(y) - ll = kf.filter() - assert len(kf.states) == 20 - assert ll < 0 - for s in kf.states: - assert s.mean.shape == (2,) - assert s.cov.shape == (2, 2) - - def test_observations_promoted_to_2d(self) -> None: - kf = linear_1d().kalman_filter(np.zeros(10)) - assert kf.y.shape == (1, 10) - - def test_predict_step(self) -> None: - # x=2, P=1 -> mean = 0.95*2, cov = 0.95^2 + 0.01 - kf = linear_1d().kalman_filter(np.zeros((1, 1))) - pred = kf.predict(MeanAndCov(mean=np.array([2.0]), cov=np.array([[1.0]]))) - assert pred.mean[0] == pytest.approx(0.95 * 2.0) - assert pred.cov[0, 0] == pytest.approx(0.95**2 + 0.01) - - def test_update_is_exact(self) -> None: - # prior (0, 1), y=1.2, R=0.5 -> S=1.5, K=2/3, mean=0.8, cov=1/3 - model = linear_1d() - kf = model.kalman_filter(np.array([[1.2]])) - state, ll = kf.update(model.get_px0().mean_and_cov(), np.array([1.2])) - assert state.mean[0] == pytest.approx(0.8) - assert state.cov[0, 0] == pytest.approx(1.0 / 3.0) - assert np.isfinite(ll) - - def test_variance_reduces_toward_observation(self) -> None: - m = LinearGaussianModel( - F=np.eye(1), - Q=np.eye(1) * 1e-8, - H=np.array([1.0]), - R=np.eye(1) * 0.01, - mu0=np.array([0.0]), - cov0=np.eye(1) * 100.0, - ) - kf = m.kalman_filter(np.ones((5, 1)) * 5.0) - kf.filter() - assert float(kf.states[-1].cov.item()) < 1.0 - assert float(kf.states[-1].mean.item()) == pytest.approx(5.0, abs=0.1) - - def test_observation_matrix_shapes_match(self) -> None: - # a 1d column-vector H and the equivalent 2d (n_y, 1) H must agree. - y = np.random.default_rng(3).normal(size=(12, 3)) - vec = LinearGaussianModel( - F=np.array([[0.95]]), - Q=np.array([[0.01]]), - H=np.array([1.0, 2.0, 0.5]), - R=np.eye(3) * 0.3, - mu0=np.array([0.0]), - cov0=np.array([[1.0]]), - ) - mat = vec.model_copy(update={"H": np.array([[1.0], [2.0], [0.5]])}) - vec_kf = vec.kalman_filter(y) - mat_kf = mat.kalman_filter(y) - vec_ll = vec_kf.filter() - mat_ll = mat_kf.filter() - assert vec_ll == pytest.approx(mat_ll) - for a, b in zip(vec_kf.states, mat_kf.states): - assert np.allclose(a.mean, b.mean) - assert np.allclose(a.cov, b.cov) - - -class TestUnscentedKalmanFilter: - def test_reduces_to_kalman_filter(self) -> None: - # On a linear-Gaussian model the UKF is exact: it matches the KF. - y = np.random.default_rng(5).normal(size=(20, 2)) - kf = linear_2d().kalman_filter(y) - ukf = UnscentedKalmanFilter(model=linear_2d(), y=y) - kll = kf.filter() - ull = ukf.filter() - assert kll == pytest.approx(ull) - for k, u in zip(kf.states, ukf.states): - assert np.allclose(k.mean, u.mean) - assert np.allclose(k.cov, u.cov) - - def test_runs_on_nonlinear_model(self) -> None: - rng = np.random.default_rng(7) - x_true, obs = 0.5, [] - for i in range(15): - if i > 0: - x_true = float(np.sin(x_true)) + 0.1 * rng.normal() - obs.append([x_true + 0.3 * rng.normal()]) - ukf = UnscentedKalmanFilter(model=SineModel(), y=np.array(obs)) - ll = ukf.filter() - assert len(ukf.states) == 15 - assert np.isfinite(ll) - for s in ukf.states: - assert s.mean.shape == (1,) - assert s.cov.shape == (1, 1) - - def test_default_parameters(self) -> None: - ukf = UnscentedKalmanFilter(model=SineModel(), y=np.zeros((8, 1))) - assert ukf.alpha == pytest.approx(1e-3) - assert ukf.beta == pytest.approx(2.0) - assert ukf.kappa == pytest.approx(0.0) - ll = ukf.filter() - assert len(ukf.states) == 8 - assert np.isfinite(ll) +def test_isotropic_noise_detects_scaled_identity() -> None: + assert isotropic_noise(np.eye(4) * 0.25) == pytest.approx(0.25) + + +def test_isotropic_noise_single_observation() -> None: + assert isotropic_noise(np.array([[0.7]])) == pytest.approx(0.7) + + +def test_isotropic_noise_rejects_unequal_diagonal() -> None: + assert isotropic_noise(np.diag([0.1, 0.2, 0.1])) is None + + +def test_isotropic_noise_rejects_off_diagonal() -> None: + R = np.eye(3) * 0.1 + R[0, 1] = R[1, 0] = 0.05 + assert isotropic_noise(R) is None + + +def test_isotropic_noise_rejects_non_positive() -> None: + assert isotropic_noise(np.zeros((2, 2))) is None + + +def test_mean_and_cov_unpacking() -> None: + mc = MeanAndCov(mean=np.array([1.0]), cov=np.array([[2.0]])) + m, c = mc + assert m[0] == pytest.approx(1.0) + assert c[0, 0] == pytest.approx(2.0) + + +def test_shape_coercion() -> None: + m = LinearGaussianModel( + F=np.array(0.95), + Q=np.array(0.01), + H=np.array(1.0), + R=np.array(0.5), + mu0=np.array(0.0), + cov0=np.array(1.0), + ) + assert m.F.shape == (1, 1) + assert m.Q.shape == (1, 1) + assert m.R.shape == (1, 1) + assert m.cov0.shape == (1, 1) + assert m.mu0.shape == (1,) + assert m.n_x == 1 + + +def test_get_px0() -> None: + mean, cov = linear_1d().get_px0().mean_and_cov() + assert mean[0] == pytest.approx(0.0) + assert cov[0, 0] == pytest.approx(1.0) + + +def test_get_px() -> None: + law = linear_1d().get_px(1, np.array([2.0])) + assert law.mean[0] == pytest.approx(0.95 * 2.0) + assert law.cov[0, 0] == pytest.approx(0.01) + + +def test_get_py() -> None: + law = linear_1d().get_py(1, np.array([0.0]), np.array([3.0])) + assert law.mean[0] == pytest.approx(3.0) + assert law.cov[0, 0] == pytest.approx(0.5) + + +def test_filter_shapes_1d() -> None: + y = np.random.default_rng(0).normal(size=(15, 1)) + kf = linear_1d().kalman_filter(y) + ll = kf.filter() + assert len(kf.states) == 15 + assert np.isfinite(ll) + for s in kf.states: + assert s.mean.shape == (1,) + assert s.cov.shape == (1, 1) + + +def test_filter_shapes_2d() -> None: + y = np.random.default_rng(1).normal(size=(20, 2)) + kf = linear_2d().kalman_filter(y) + ll = kf.filter() + assert len(kf.states) == 20 + assert ll < 0 + for s in kf.states: + assert s.mean.shape == (2,) + assert s.cov.shape == (2, 2) + + +def test_observations_promoted_to_2d() -> None: + kf = linear_1d().kalman_filter(np.zeros(10)) + assert kf.y.shape == (1, 10) + + +def test_predict_step() -> None: + # x=2, P=1 -> mean = 0.95*2, cov = 0.95^2 + 0.01 + kf = linear_1d().kalman_filter(np.zeros((1, 1))) + pred = kf.predict(MeanAndCov(mean=np.array([2.0]), cov=np.array([[1.0]]))) + assert pred.mean[0] == pytest.approx(0.95 * 2.0) + assert pred.cov[0, 0] == pytest.approx(0.95**2 + 0.01) + + +def test_update_is_exact() -> None: + # prior (0, 1), y=1.2, R=0.5 -> S=1.5, K=2/3, mean=0.8, cov=1/3 + model = linear_1d() + kf = model.kalman_filter(np.array([[1.2]])) + state, ll = kf.update(model.get_px0().mean_and_cov(), np.array([1.2])) + assert state.mean[0] == pytest.approx(0.8) + assert state.cov[0, 0] == pytest.approx(1.0 / 3.0) + assert np.isfinite(ll) + + +def test_variance_reduces_toward_observation() -> None: + m = LinearGaussianModel( + F=np.eye(1), + Q=np.eye(1) * 1e-8, + H=np.array([1.0]), + R=np.eye(1) * 0.01, + mu0=np.array([0.0]), + cov0=np.eye(1) * 100.0, + ) + kf = m.kalman_filter(np.ones((5, 1)) * 5.0) + kf.filter() + assert float(kf.states[-1].cov.item()) < 1.0 + assert float(kf.states[-1].mean.item()) == pytest.approx(5.0, abs=0.1) + + +def test_isotropic_fast_path_matches_dense( + monkeypatch: pytest.MonkeyPatch, +) -> None: + # With a 1d state and isotropic R the update takes the Sherman-Morrison + # fast path. Forcing isotropic_noise to return None falls back to the dense + # Cholesky solve; the two paths must agree on the same model. + y = np.random.default_rng(11).normal(scale=0.01, size=(40, 6)) + model = LinearGaussianModel( + F=np.array([[0.97]]), + Q=np.array([[1e-4]]), + H=np.linspace(0.5, 5.0, 6), + R=np.eye(6) * 0.001**2, + mu0=np.zeros(1), + cov0=np.array([[1e-3]]), + ) + assert isotropic_noise(model.R) is not None + fast = model.kalman_filter(y) + ll_fast = fast.filter() + + monkeypatch.setattr("quantflow.ta.kalman.isotropic_noise", lambda R: None) + dense = model.kalman_filter(y) + ll_dense = dense.filter() + + assert ll_fast == pytest.approx(ll_dense) + for f, d in zip(fast.states, dense.states): + assert np.allclose(f.mean, d.mean) + assert np.allclose(f.cov, d.cov) + + +def test_observation_matrix_shapes_match() -> None: + # a 1d column-vector H and the equivalent 2d (n_y, 1) H must agree. + y = np.random.default_rng(3).normal(size=(12, 3)) + vec = LinearGaussianModel( + F=np.array([[0.95]]), + Q=np.array([[0.01]]), + H=np.array([1.0, 2.0, 0.5]), + R=np.eye(3) * 0.3, + mu0=np.array([0.0]), + cov0=np.array([[1.0]]), + ) + mat = vec.model_copy(update={"H": np.array([[1.0], [2.0], [0.5]])}) + vec_kf = vec.kalman_filter(y) + mat_kf = mat.kalman_filter(y) + vec_ll = vec_kf.filter() + mat_ll = mat_kf.filter() + assert vec_ll == pytest.approx(mat_ll) + for a, b in zip(vec_kf.states, mat_kf.states): + assert np.allclose(a.mean, b.mean) + assert np.allclose(a.cov, b.cov) + + +def test_ukf_reduces_to_kalman_filter() -> None: + # On a linear-Gaussian model the UKF is exact: it matches the KF. + y = np.random.default_rng(5).normal(size=(20, 2)) + kf = linear_2d().kalman_filter(y) + ukf = UnscentedKalmanFilter(model=linear_2d(), y=y) + kll = kf.filter() + ull = ukf.filter() + assert kll == pytest.approx(ull) + for k, u in zip(kf.states, ukf.states): + assert np.allclose(k.mean, u.mean) + assert np.allclose(k.cov, u.cov) + + +def test_ukf_runs_on_nonlinear_model() -> None: + rng = np.random.default_rng(7) + x_true, obs = 0.5, [] + for i in range(15): + if i > 0: + x_true = float(np.sin(x_true)) + 0.1 * rng.normal() + obs.append([x_true + 0.3 * rng.normal()]) + ukf = UnscentedKalmanFilter(model=SineModel(), y=np.array(obs)) + ll = ukf.filter() + assert len(ukf.states) == 15 + assert np.isfinite(ll) + for s in ukf.states: + assert s.mean.shape == (1,) + assert s.cov.shape == (1, 1) + + +def test_ukf_default_parameters() -> None: + ukf = UnscentedKalmanFilter(model=SineModel(), y=np.zeros((8, 1))) + assert ukf.alpha == pytest.approx(1e-3) + assert ukf.beta == pytest.approx(2.0) + assert ukf.kappa == pytest.approx(0.0) + ll = ukf.filter() + assert len(ukf.states) == 8 + assert np.isfinite(ll)