diff --git a/changelog.d/longwise-local-geography.md b/changelog.d/longwise-local-geography.md new file mode 100644 index 000000000..2a986da6d --- /dev/null +++ b/changelog.d/longwise-local-geography.md @@ -0,0 +1 @@ +- Add an OA-first long-format local geography weights artifact (`local_geography_weights.csv.gz`) so UK constituency and local-authority consumers can migrate away from dense area-by-household weight matrices. diff --git a/docs/oa_calibration_pipeline.md b/docs/oa_calibration_pipeline.md index 3bc3560da..8339931f0 100644 --- a/docs/oa_calibration_pipeline.md +++ b/docs/oa_calibration_pipeline.md @@ -132,15 +132,19 @@ Generate per-area H5 files from sparse L0-calibrated weights. **Deliverables:** - `policyengine_uk_data/calibration/publish_local_h5s.py` — extracts per-area H5 subsets from the sparse weight vector; each H5 contains only active households (non-zero weight) with their calibrated weights, plus the linked person and benunit rows +- `policyengine_uk_data/calibration/long_geography.py` — exports matrix-free local geography weights as an OA-first long table, with constituency and LA rows derived from assigned OA geography - `datasets/create_datasets.py` — publish step wired in after calibration, before downrating - `tests/test_publish_local_h5s.py` — 13 tests covering area-household mapping, H5 structure, pruned-household exclusion, weight correctness, person/benunit FK integrity, full publish cycle, summary statistics, and validation **Key design:** - `_get_area_household_indices()`: maps each area code to its household row indices via OA geography columns from clone-and-assign +- `write_long_geography_weights()`: writes `storage/local_geography_weights.csv.gz`, a long sidecar with `area_type`, `area_code`, household identifiers, source-year/source-household provenance, and weights; the production build writes assigned-geography rows, while explicit 2D H5 conversion is available only for small compatibility checks because expanding dense area-by-household matrices is too large for routine builds +- `geography_support_report()`: summarizes low-support areas using unique source households and effective sample size, so clone count and future pooled-FRS builds can be evaluated without mistaking cloned rows for independent evidence - `publish_area_h5()`: writes a single H5 per area — filters to active (non-zero weight) households, extracts linked persons and benunits via FK joins, stores as HDF5 groups with metadata attributes - `publish_local_h5s()`: orchestrates the full publish cycle — loads L0 weight vector, iterates over all areas, writes H5 files to `storage/local_h5s/{area_type}/`, produces `_summary.csv` with per-area statistics - `validate_local_h5s()`: post-publish validation checking file existence, HDF5 structure, and cross-area household ID uniqueness - Supports both constituency (650) and LA (360) area types - Zero-weight households (L0-pruned) are excluded from area H5 files — only active records are published +- The legacy `parliamentary_constituency_weights.h5` and `local_authority_weights.h5` artifacts are still produced during migration; new consumers should prefer the OA-first `local_geography_weights.csv.gz` sidecar. **US reference:** PR #465 (modal) diff --git a/policyengine_uk_data/calibration/clone_and_assign.py b/policyengine_uk_data/calibration/clone_and_assign.py index b6d122c24..397567c1f 100644 --- a/policyengine_uk_data/calibration/clone_and_assign.py +++ b/policyengine_uk_data/calibration/clone_and_assign.py @@ -20,7 +20,6 @@ from policyengine_uk_data.calibration.oa_assignment import ( assign_random_geography, - GeographyAssignment, ) logger = logging.getLogger(__name__) @@ -98,8 +97,6 @@ def clone_and_assign( benunit = dataset.benunit n_households = len(hh) - n_persons = len(person) - n_benunits = len(benunit) logger.info( "Cloning %d households x %d = %d total records", @@ -192,6 +189,9 @@ def clone_and_assign( # Clone household table hh_clone = hh.copy() + hh_clone["source_household_id"] = hh_id_col + if "source_year" not in hh_clone.columns: + hh_clone["source_year"] = dataset.time_period hh_clone["household_id"] = new_hh_ids hh_clone["household_weight"] = hh["household_weight"].values / n_clones diff --git a/policyengine_uk_data/calibration/long_geography.py b/policyengine_uk_data/calibration/long_geography.py new file mode 100644 index 000000000..2fdf26cc5 --- /dev/null +++ b/policyengine_uk_data/calibration/long_geography.py @@ -0,0 +1,694 @@ +"""Long-format local geography weights for cloned UK datasets. + +The legacy local-area exports store weights as dense or sparse-shaped H5 +arrays keyed by area row and household column. Routine builds write assigned +OA-derived rows; small diagnostics can also expand a legacy dense matrix into +one row per active area-household cell. +""" + +from __future__ import annotations + +from collections.abc import Mapping, Sequence +from pathlib import Path + +import h5py +import numpy as np +import pandas as pd + +from policyengine_uk_data.storage import STORAGE_FOLDER +from policyengine_uk_data.utils.calibrate import default_weight_dataset_key + +AREA_TYPES = ("oa", "constituency", "la") +AREA_CODE_FILES = { + "oa": "oa_crosswalk.csv.gz", + "constituency": "constituencies_2024.csv", + "la": "local_authorities_2021.csv", +} +GEO_COLUMNS = { + "oa": "oa_code", + "constituency": "constituency_code_oa", + "la": "la_code_oa", +} +LONG_GEOGRAPHY_WEIGHTS_FILE = "local_geography_weights.csv.gz" +LONG_GEOGRAPHY_COLUMNS = [ + "area_type", + "area_code", + "area_index", + "household_index", + "household_id", + "source_year", + "source_household_id", + "source_household_key", + "clone_index", + "weight", + "weight_source", +] +AREA_SUPPORT_COLUMNS = [ + "area_type", + "area_code", + "area_index", + "n_rows", + "n_source_households", + "total_weight", + "effective_sample_size", + "weight_source", +] +AREA_SUPPORT_SUMMARY_COLUMNS = [ + "area_type", + "n_areas", + "n_nonempty_areas", + "share_nonempty_areas", + "total_rows", + "median_rows", + "p10_rows", + "median_source_households", + "p10_source_households", + "median_effective_sample_size", + "p10_effective_sample_size", + "low_support_areas", + "total_weight", +] + + +def geo_column(area_type: str) -> str: + """Return the cloned-household geography column for a local area type.""" + try: + return GEO_COLUMNS[area_type] + except KeyError as exc: + raise ValueError(f"Unknown area_type: {area_type}") from exc + + +def _as_area_type_tuple(area_types: str | Sequence[str]) -> tuple[str, ...]: + if isinstance(area_types, str): + area_types = (area_types,) + resolved = tuple(area_types) + for area_type in resolved: + geo_column(area_type) + return resolved + + +def _normalize_area_code(value) -> str: + if isinstance(value, bytes): + value = value.decode() + if pd.isna(value): + return "" + return str(value).strip() + + +def _normalize_area_codes(values) -> pd.Series: + return pd.Series([_normalize_area_code(value) for value in values]) + + +def load_area_codes( + area_type: str, + storage_folder: Path | str | None = None, +) -> pd.DataFrame: + """Load canonical area codes for a supported local geography.""" + area_code_file = AREA_CODE_FILES.get(area_type) + if area_code_file is None: + raise ValueError(f"Unknown area_type: {area_type}") + + if storage_folder is None: + storage_folder = STORAGE_FOLDER + + path = Path(storage_folder) / area_code_file + area_codes = pd.read_csv(path) + if area_type == "oa" and "oa_code" in area_codes.columns: + area_codes = area_codes.rename(columns={"oa_code": "code"}) + if "code" not in area_codes.columns: + raise ValueError(f"{path} must contain a 'code' or 'oa_code' column") + area_codes = area_codes.copy() + area_codes["code"] = _normalize_area_codes(area_codes["code"]) + area_codes = area_codes.drop_duplicates(subset=["code"]) + return area_codes + + +def _household_ids(household: pd.DataFrame) -> np.ndarray: + if "household_id" in household.columns: + return household["household_id"].to_numpy() + return np.arange(len(household), dtype=np.int64) + + +def _source_household_ids(household: pd.DataFrame) -> np.ndarray: + if "source_household_id" in household.columns: + return household["source_household_id"].to_numpy() + return _household_ids(household) + + +def _source_years(household: pd.DataFrame) -> np.ndarray: + if "source_year" in household.columns: + return household["source_year"].to_numpy() + return np.array([""] * len(household), dtype=object) + + +def _source_household_keys( + source_years: np.ndarray, + source_household_ids: np.ndarray, +) -> np.ndarray: + return np.array( + [ + f"{year}:{household_id}" if str(year) else str(household_id) + for year, household_id in zip(source_years, source_household_ids) + ], + dtype=object, + ) + + +def _clone_indices(household: pd.DataFrame) -> np.ndarray: + if "clone_index" in household.columns: + return household["clone_index"].to_numpy(dtype=np.int64) + return np.zeros(len(household), dtype=np.int64) + + +def _default_weights(household: pd.DataFrame) -> np.ndarray: + if "household_weight" in household.columns: + return household["household_weight"].to_numpy(dtype=np.float64) + return np.ones(len(household), dtype=np.float64) + + +def _validate_weight_array( + weights: np.ndarray, + *, + n_households: int, + n_areas: int, + area_type: str, +) -> np.ndarray: + weights = np.asarray(weights, dtype=np.float64) + if weights.ndim == 1: + if len(weights) != n_households: + raise ValueError( + f"1D {area_type} weights have {len(weights)} records, " + f"expected {n_households}." + ) + elif weights.ndim == 2: + if weights.shape != (n_areas, n_households): + raise ValueError( + f"2D {area_type} weights have shape {weights.shape}, expected " + f"({n_areas}, {n_households})." + ) + else: + raise ValueError( + f"{area_type} weights must be 1D or 2D; got shape {weights.shape}." + ) + return weights + + +def build_long_geography_frame( + dataset, + area_types: str | Sequence[str] = AREA_TYPES, + area_codes: Mapping[str, pd.DataFrame | pd.Series | Sequence[str]] + | pd.DataFrame + | pd.Series + | Sequence[str] + | None = None, + weights: np.ndarray | Sequence[float] | None = None, + weight_source: str | None = None, + min_weight: float = 0.0, + drop_zero_weights: bool = False, +) -> pd.DataFrame: + """Build a long local-geography assignment table. + + Args: + dataset: Cloned dataset with household geography columns. + area_types: ``"oa"``, ``"constituency"``, ``"la"``, or any subset. + area_codes: Canonical area codes defining row order. For multiple area + types, pass a mapping from area type to codes. If omitted, codes + are loaded from storage. + weights: Optional calibrated weights. A 1D vector is interpreted as + one household weight per record and is joined to each household's + assigned area. A 2D array is interpreted as the legacy + ``(n_areas, n_households)`` area-by-household layout and emits + one row per matrix cell; this is only valid when building one area + type at a time. + weight_source: Label describing where ``weight`` came from. + min_weight: Threshold used when dropping zero/small weights. + drop_zero_weights: If true, keep only rows with ``weight > min_weight``. + + Returns: + DataFrame with columns ``area_type``, ``area_code``, ``area_index``, + ``household_index``, source identity columns, ``weight`` and + ``weight_source``. + """ + resolved_area_types = _as_area_type_tuple(area_types) + household = dataset.household + n_households = len(household) + household_indices = np.arange(n_households, dtype=np.int64) + household_ids = _household_ids(household) + source_years = _source_years(household) + source_household_ids = _source_household_ids(household) + source_household_keys = _source_household_keys( + source_years, + source_household_ids, + ) + clone_indices = _clone_indices(household) + + weight_array = None + if weights is not None: + weight_array = np.asarray(weights, dtype=np.float64) + if weight_array.ndim == 2 and len(resolved_area_types) != 1: + raise ValueError("2D weights can only be converted for one area_type.") + + frames = [] + for area_type in resolved_area_types: + column = geo_column(area_type) + if column not in household.columns: + raise ValueError(f"dataset.household must contain {column!r}.") + + if area_codes is None: + area_codes_df = load_area_codes(area_type) + elif isinstance(area_codes, Mapping): + area_codes_df = area_codes[area_type] + else: + if len(resolved_area_types) != 1: + raise ValueError( + "area_codes must be a mapping when building multiple area_types." + ) + area_codes_df = area_codes + + if isinstance(area_codes_df, pd.DataFrame): + canonical_codes = _normalize_area_codes(area_codes_df["code"]) + else: + canonical_codes = _normalize_area_codes(area_codes_df) + + code_to_index = {code: i for i, code in enumerate(canonical_codes)} + normalized_codes = _normalize_area_codes(household[column]) + area_indices = normalized_codes.map(code_to_index).to_numpy() + assigned = ~pd.isna(area_indices) + + if weight_array is None: + row_weights = _default_weights(household) + resolved_weight_source = weight_source or "household_weight" + else: + weights_for_area = _validate_weight_array( + weight_array, + n_households=n_households, + n_areas=len(canonical_codes), + area_type=area_type, + ) + resolved_weight_source = weight_source or "calibrated_weight" + if weights_for_area.ndim == 2: + if drop_zero_weights: + row_area_indices, row_household_indices = np.where( + weights_for_area > min_weight + ) + else: + row_area_indices, row_household_indices = np.indices( + weights_for_area.shape + ) + row_area_indices = row_area_indices.ravel() + row_household_indices = row_household_indices.ravel() + + canonical_array = canonical_codes.to_numpy(dtype=object) + frame = pd.DataFrame( + { + "area_type": area_type, + "area_code": canonical_array[row_area_indices], + "area_index": row_area_indices.astype(np.int64), + "household_index": row_household_indices.astype(np.int64), + "household_id": household_ids[row_household_indices], + "source_year": source_years[row_household_indices], + "source_household_id": source_household_ids[ + row_household_indices + ], + "source_household_key": source_household_keys[ + row_household_indices + ], + "clone_index": clone_indices[row_household_indices], + "weight": weights_for_area[ + row_area_indices, + row_household_indices, + ], + "weight_source": resolved_weight_source, + } + ) + frames.append(frame[LONG_GEOGRAPHY_COLUMNS]) + continue + row_weights = weights_for_area + + frame = pd.DataFrame( + { + "area_type": area_type, + "area_code": normalized_codes, + "area_index": area_indices, + "household_index": household_indices, + "household_id": household_ids, + "source_year": source_years, + "source_household_id": source_household_ids, + "source_household_key": source_household_keys, + "clone_index": clone_indices, + "weight": row_weights, + "weight_source": resolved_weight_source, + } + ) + frame = frame.loc[assigned].copy() + frame["area_index"] = frame["area_index"].astype(np.int64) + if drop_zero_weights: + frame = frame.loc[frame["weight"] > min_weight].copy() + frames.append(frame[LONG_GEOGRAPHY_COLUMNS]) + + if not frames: + return pd.DataFrame(columns=LONG_GEOGRAPHY_COLUMNS) + return pd.concat(frames, ignore_index=True) + + +def area_support_from_long_geography( + frame: pd.DataFrame, + area_codes: Mapping[str, pd.DataFrame | pd.Series | Sequence[str]] | None = None, +) -> pd.DataFrame: + """Summarize row and weight support by geography area.""" + if frame.empty: + if area_codes is None: + return pd.DataFrame(columns=AREA_SUPPORT_COLUMNS) + expected_frames = [] + for area_type, codes in area_codes.items(): + if isinstance(codes, pd.DataFrame): + code_values = ( + codes["code"] if "code" in codes.columns else codes["oa_code"] + ) + else: + code_values = codes + normalized = _normalize_area_codes(code_values) + expected_frames.append( + pd.DataFrame( + { + "area_type": area_type, + "area_code": normalized, + "area_index": np.arange(len(normalized), dtype=np.int64), + "n_rows": 0, + "n_source_households": 0, + "total_weight": 0.0, + "effective_sample_size": 0.0, + "weight_source": "", + } + ) + ) + return pd.concat(expected_frames, ignore_index=True)[AREA_SUPPORT_COLUMNS] + + working = frame.copy() + working["weight_squared"] = working["weight"].astype(float) ** 2 + grouped = ( + working.groupby(["area_type", "area_code"], sort=False) + .agg( + area_index=("area_index", "first"), + n_rows=("household_id", "size"), + n_source_households=("source_household_key", "nunique"), + total_weight=("weight", "sum"), + weight_squared=("weight_squared", "sum"), + weight_source=( + "weight_source", + lambda values: ",".join(sorted({str(value) for value in values})), + ), + ) + .reset_index() + ) + grouped["effective_sample_size"] = np.where( + grouped["weight_squared"] > 0, + grouped["total_weight"] ** 2 / grouped["weight_squared"], + 0.0, + ) + grouped = grouped.drop(columns=["weight_squared"]) + + if area_codes is None: + return grouped[AREA_SUPPORT_COLUMNS] + + expected_frames = [] + for area_type, codes in area_codes.items(): + if isinstance(codes, pd.DataFrame): + code_values = codes["code"] if "code" in codes.columns else codes["oa_code"] + else: + code_values = codes + normalized = _normalize_area_codes(code_values) + expected_frames.append( + pd.DataFrame( + { + "area_type": area_type, + "area_code": normalized, + "area_index": np.arange(len(normalized), dtype=np.int64), + } + ) + ) + expected = pd.concat(expected_frames, ignore_index=True) + support = expected.merge( + grouped, + on=["area_type", "area_code"], + how="left", + suffixes=("", "_observed"), + ) + support["area_index"] = support["area_index_observed"].fillna(support["area_index"]) + support = support.drop(columns=["area_index_observed"]) + for column in ( + "n_rows", + "n_source_households", + "total_weight", + "effective_sample_size", + ): + support[column] = support[column].fillna(0) + support["weight_source"] = support["weight_source"].fillna("") + support["area_index"] = support["area_index"].astype(np.int64) + support["n_rows"] = support["n_rows"].astype(np.int64) + support["n_source_households"] = support["n_source_households"].astype(np.int64) + return support[AREA_SUPPORT_COLUMNS] + + +def summarize_area_support( + area_support: pd.DataFrame, + min_source_households: int = 10, + min_effective_sample_size: float = 10.0, +) -> pd.DataFrame: + """Collapse per-area support into clone-count credibility diagnostics.""" + rows = [] + for area_type, group in area_support.groupby("area_type", sort=False): + nonempty = group[group["n_rows"] > 0] + low_support = group[ + (group["n_source_households"] < min_source_households) + | (group["effective_sample_size"] < min_effective_sample_size) + ] + rows.append( + { + "area_type": area_type, + "n_areas": int(len(group)), + "n_nonempty_areas": int(len(nonempty)), + "share_nonempty_areas": ( + float(len(nonempty) / len(group)) if len(group) else 0.0 + ), + "total_rows": int(group["n_rows"].sum()), + "median_rows": float(group["n_rows"].median()) if len(group) else 0.0, + "p10_rows": float(group["n_rows"].quantile(0.1)) if len(group) else 0.0, + "median_source_households": float(group["n_source_households"].median()) + if len(group) + else 0.0, + "p10_source_households": float( + group["n_source_households"].quantile(0.1) + ) + if len(group) + else 0.0, + "median_effective_sample_size": float( + group["effective_sample_size"].median() + ) + if len(group) + else 0.0, + "p10_effective_sample_size": float( + group["effective_sample_size"].quantile(0.1) + ) + if len(group) + else 0.0, + "low_support_areas": int(len(low_support)), + "total_weight": float(group["total_weight"].sum()), + } + ) + return pd.DataFrame(rows, columns=AREA_SUPPORT_SUMMARY_COLUMNS) + + +def geography_support_report( + dataset, + area_types: str | Sequence[str] = AREA_TYPES, + area_codes: Mapping[str, pd.DataFrame | pd.Series | Sequence[str]] + | pd.DataFrame + | pd.Series + | Sequence[str] + | None = None, + weights: np.ndarray | Sequence[float] | None = None, + min_source_households: int = 10, + min_effective_sample_size: float = 10.0, +) -> tuple[pd.DataFrame, pd.DataFrame]: + """Return per-area support and summary diagnostics for a dataset.""" + resolved_area_types = _as_area_type_tuple(area_types) + if area_codes is None: + expected_codes = { + area_type: load_area_codes(area_type) for area_type in resolved_area_types + } + frame_area_codes = expected_codes + elif isinstance(area_codes, Mapping): + expected_codes = area_codes + frame_area_codes = area_codes + else: + expected_codes = None + frame_area_codes = area_codes + + frame = build_long_geography_frame( + dataset=dataset, + area_types=resolved_area_types, + area_codes=frame_area_codes, + weights=weights, + drop_zero_weights=True, + ) + area_support = area_support_from_long_geography( + frame, + area_codes=expected_codes, + ) + summary = summarize_area_support( + area_support, + min_source_households=min_source_households, + min_effective_sample_size=min_effective_sample_size, + ) + return area_support, summary + + +def clone_count_support_sweep( + dataset, + clone_counts: Sequence[int] = (1, 2, 5, 10), + seed: int = 42, + crosswalk_path: str | None = None, + area_types: Sequence[str] = ("constituency", "la"), + area_codes: Mapping[str, pd.DataFrame | pd.Series | Sequence[str]] | None = None, + min_source_households: int = 10, + min_effective_sample_size: float = 10.0, +) -> pd.DataFrame: + """Evaluate local geography support as clone counts scale up.""" + from policyengine_uk_data.calibration.clone_and_assign import clone_and_assign + + summaries = [] + for n_clones in clone_counts: + cloned = clone_and_assign( + dataset, + n_clones=n_clones, + seed=seed, + crosswalk_path=crosswalk_path, + ) + _, summary = geography_support_report( + cloned, + area_types=area_types, + area_codes=area_codes, + min_source_households=min_source_households, + min_effective_sample_size=min_effective_sample_size, + ) + summary.insert(0, "n_clones", n_clones) + summaries.append(summary) + + if not summaries: + return pd.DataFrame(columns=["n_clones", *AREA_SUPPORT_SUMMARY_COLUMNS]) + return pd.concat(summaries, ignore_index=True) + + +def build_area_household_indices( + dataset, + area_type: str, + area_codes: Sequence[str], +) -> dict[str, np.ndarray]: + """Map each area code to assigned household row indices.""" + assignments = build_long_geography_frame( + dataset=dataset, + area_types=area_type, + area_codes=area_codes, + ) + groups = assignments.groupby("area_code", sort=False)["household_index"] + grouped_indices = {code: group.to_numpy(dtype=np.int64) for code, group in groups} + return { + _normalize_area_code(code): grouped_indices.get( + _normalize_area_code(code), + np.array([], dtype=np.int64), + ) + for code in area_codes + } + + +def _resolve_storage_path(path: str | Path) -> Path: + path = Path(path) + if path.is_absolute(): + return path + return STORAGE_FOLDER / path + + +def _load_weight_array( + weight_file: str | Path, + dataset_key: str | None = None, + *, + area_type: str = "area", + max_dense_cells: int | None = 10_000_000, +) -> np.ndarray: + if dataset_key is None: + dataset_key = default_weight_dataset_key() + + weight_path = _resolve_storage_path(weight_file) + with h5py.File(weight_path, "r") as f: + if dataset_key not in f: + available = ", ".join(sorted(f.keys())) + raise KeyError( + f"Dataset key {dataset_key!r} not found in {weight_path}; " + f"available keys: {available}" + ) + dataset = f[dataset_key] + if ( + max_dense_cells is not None + and len(dataset.shape) == 2 + and int(np.prod(dataset.shape)) > max_dense_cells + ): + raise ValueError( + f"Refusing to expand {int(np.prod(dataset.shape)):,} dense " + f"{area_type} area-household cells into a long CSV. Use assigned " + "geography rows, a 1D L0 weight vector, or pass a higher " + "max_dense_cells for an explicit diagnostic conversion." + ) + return dataset[:] + + +def write_long_geography_weights( + dataset, + weight_files: Mapping[str, str | Path] | None = None, + dataset_key: str | None = None, + output_path: str | Path = LONG_GEOGRAPHY_WEIGHTS_FILE, + area_types: Sequence[str] = AREA_TYPES, + min_weight: float = 0.0, + max_dense_cells: int | None = 10_000_000, +) -> pd.DataFrame: + """Write calibrated local geography weights as a long CSV artifact.""" + weight_files = weight_files or {} + frames = [] + for area_type in area_types: + weight_file = weight_files.get(area_type) + if weight_file is None: + weights = None + weight_source = "household_weight" + else: + weights = _load_weight_array( + weight_file, + dataset_key, + area_type=area_type, + max_dense_cells=max_dense_cells, + ) + weight_source = Path(weight_file).name + area_codes = load_area_codes(area_type) + frames.append( + build_long_geography_frame( + dataset=dataset, + area_types=area_type, + area_codes=area_codes, + weights=weights, + weight_source=weight_source, + min_weight=min_weight, + drop_zero_weights=True, + ) + ) + + long_weights = pd.concat(frames, ignore_index=True) + output_path = _resolve_storage_path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + long_weights.to_csv(output_path, index=False, compression="infer") + return long_weights + + +def load_long_geography_weights( + path: str | Path = LONG_GEOGRAPHY_WEIGHTS_FILE, +) -> pd.DataFrame: + """Load the long local geography weights artifact.""" + return pd.read_csv(_resolve_storage_path(path), dtype={"area_code": str}) diff --git a/policyengine_uk_data/calibration/publish_local_h5s.py b/policyengine_uk_data/calibration/publish_local_h5s.py index 309d10c18..2d00a7810 100644 --- a/policyengine_uk_data/calibration/publish_local_h5s.py +++ b/policyengine_uk_data/calibration/publish_local_h5s.py @@ -21,6 +21,9 @@ import pandas as pd from policyengine_uk_data.storage import STORAGE_FOLDER +from policyengine_uk_data.calibration.long_geography import ( + build_area_household_indices, +) from policyengine_uk_data.utils.calibrate import default_weight_dataset_key logger = logging.getLogger(__name__) @@ -43,19 +46,7 @@ def _get_area_household_indices( Returns: Dict mapping area code to array of household row indices. """ - geo_col = "constituency_code_oa" if area_type == "constituency" else "la_code_oa" - hh_codes = dataset.household[geo_col].values - - area_to_indices: dict[str, list[int]] = {code: [] for code in area_codes} - for j, code in enumerate(hh_codes): - code_str = str(code) - if code_str in area_to_indices: - area_to_indices[code_str].append(j) - - return { - code: np.array(indices, dtype=np.int64) - for code, indices in area_to_indices.items() - } + return build_area_household_indices(dataset, area_type, area_codes) def _extract_entity_subset( diff --git a/policyengine_uk_data/datasets/create_datasets.py b/policyengine_uk_data/datasets/create_datasets.py index bebdc5d74..3be346f37 100644 --- a/policyengine_uk_data/datasets/create_datasets.py +++ b/policyengine_uk_data/datasets/create_datasets.py @@ -103,6 +103,7 @@ def main(): "Clone and assign OA geography", "Calibrate constituency weights", "Calibrate local authority weights", + "Export long local geography weights", "Calibrate public service aggregates", "Calibrate fuel litres", "Save final dataset", @@ -279,6 +280,18 @@ def main(): ) update_dataset("Calibrate local authority weights", "completed") + update_dataset("Export long local geography weights", "processing") + from policyengine_uk_data.calibration.long_geography import ( + LONG_GEOGRAPHY_WEIGHTS_FILE, + write_long_geography_weights, + ) + + write_long_geography_weights( + dataset=frs_calibrated_constituencies, + output_path=STORAGE_FOLDER / LONG_GEOGRAPHY_WEIGHTS_FILE, + ) + update_dataset("Export long local geography weights", "completed") + frs_calibrated = frs_calibrated_constituencies if materialize_base_year: update_dataset(materialize_step, "processing") @@ -349,6 +362,7 @@ def main(): "enhanced_dataset": frs_release.enhanced_dataset_file, "tiny_base_dataset": frs_release.tiny_base_dataset_file, "tiny_enhanced_dataset": frs_release.tiny_enhanced_dataset_file, + "long_geography_weights": "local_geography_weights.csv.gz", "imputations_applied": "consumption, wealth, VAT, services, income, capital_gains, salary_sacrifice, student_loan_plan", "calibration": "national, LA and constituency targets", }, diff --git a/policyengine_uk_data/storage/download_completed_datasets.py b/policyengine_uk_data/storage/download_completed_datasets.py index 972bd1c5e..da5cd0075 100644 --- a/policyengine_uk_data/storage/download_completed_datasets.py +++ b/policyengine_uk_data/storage/download_completed_datasets.py @@ -1,4 +1,7 @@ from policyengine_uk_data.datasets.frs_release import CURRENT_FRS_RELEASE +from policyengine_uk_data.calibration.long_geography import ( + LONG_GEOGRAPHY_WEIGHTS_FILE, +) from policyengine_uk_data.utils.hf_destinations import PRIVATE_REPO from policyengine_uk_data.utils.huggingface import download from pathlib import Path @@ -10,6 +13,7 @@ CURRENT_FRS_RELEASE.base_dataset_file, "parliamentary_constituency_weights.h5", "local_authority_weights.h5", + LONG_GEOGRAPHY_WEIGHTS_FILE, ] FILES = [FOLDER / file for file in FILES] diff --git a/policyengine_uk_data/storage/upload_completed_datasets.py b/policyengine_uk_data/storage/upload_completed_datasets.py index 70520b63b..b5f5d79c0 100644 --- a/policyengine_uk_data/storage/upload_completed_datasets.py +++ b/policyengine_uk_data/storage/upload_completed_datasets.py @@ -1,4 +1,7 @@ from policyengine_uk_data.datasets.frs_release import CURRENT_FRS_RELEASE +from policyengine_uk_data.calibration.long_geography import ( + LONG_GEOGRAPHY_WEIGHTS_FILE, +) from policyengine_uk_data.storage import STORAGE_FOLDER from policyengine_uk_data.utils.data_upload import upload_data_files from policyengine_uk_data.utils.hf_destinations import PRIVATE_REPO @@ -13,6 +16,7 @@ def upload_datasets(): STORAGE_FOLDER / frs_release.tiny_enhanced_dataset_file, STORAGE_FOLDER / "parliamentary_constituency_weights.h5", STORAGE_FOLDER / "local_authority_weights.h5", + STORAGE_FOLDER / LONG_GEOGRAPHY_WEIGHTS_FILE, ] for file_path in dataset_files: diff --git a/policyengine_uk_data/tests/test_long_geography.py b/policyengine_uk_data/tests/test_long_geography.py new file mode 100644 index 000000000..2114e2545 --- /dev/null +++ b/policyengine_uk_data/tests/test_long_geography.py @@ -0,0 +1,339 @@ +from pathlib import Path + +import h5py +import numpy as np +import pandas as pd +import pytest + +from policyengine_uk_data.calibration.long_geography import ( + LONG_GEOGRAPHY_COLUMNS, + LONG_GEOGRAPHY_WEIGHTS_FILE, + area_support_from_long_geography, + build_area_household_indices, + build_long_geography_frame, + geography_support_report, + write_long_geography_weights, +) + + +class MockDataset: + def __init__(self): + self.household = pd.DataFrame( + { + "household_id": [101, 102, 103, 104], + "source_year": [2023, 2023, 2023, 2023], + "source_household_id": [1, 2, 3, 1], + "clone_index": [0, 0, 0, 1], + "household_weight": [1.0, 2.0, 3.0, 1.0], + "oa_code": [b"E00000001", "E00000002", "", "E00000003"], + "constituency_code_oa": [ + b"E14001001", + "E14001002", + "", + "E14001001", + ], + "la_code_oa": ["E09000001", "E09000001", "X99999999", None], + } + ) + + +def _write_h5(path: Path, key: str, data: np.ndarray) -> None: + with h5py.File(path, "w") as f: + f.create_dataset(key, data=data) + + +def test_build_long_geography_frame_is_oa_first_and_keeps_schema(): + dataset = MockDataset() + + frame = build_long_geography_frame( + dataset, + area_types="oa", + area_codes=["E00000001", "E00000002", "E00000003"], + ) + + assert frame.columns.tolist() == LONG_GEOGRAPHY_COLUMNS + assert frame["area_code"].tolist() == [ + "E00000001", + "E00000002", + "E00000003", + ] + assert frame["household_index"].tolist() == [0, 1, 3] + assert frame["household_id"].tolist() == [101, 102, 104] + assert frame["source_year"].tolist() == [2023, 2023, 2023] + assert frame["source_household_id"].tolist() == [1, 2, 1] + assert frame["source_household_key"].tolist() == [ + "2023:1", + "2023:2", + "2023:1", + ] + assert frame["clone_index"].tolist() == [0, 0, 1] + assert frame["weight"].tolist() == [1.0, 2.0, 1.0] + assert frame["weight_source"].unique().tolist() == ["household_weight"] + + +def test_build_long_geography_frame_applies_1d_weights_to_each_area_type(): + dataset = MockDataset() + weights = np.array([10.0, 0.0, 30.0, 40.0]) + + frame = build_long_geography_frame( + dataset, + area_types=("constituency", "la"), + area_codes={ + "constituency": ["E14001001", "E14001002"], + "la": ["E09000001"], + }, + weights=weights, + weight_source="test_weights", + drop_zero_weights=True, + ) + + assert set(frame["area_type"]) == {"constituency", "la"} + assert frame["household_index"].tolist() == [0, 3, 0] + assert frame["weight"].tolist() == [10.0, 40.0, 10.0] + assert frame["weight_source"].unique().tolist() == ["test_weights"] + + +def test_build_long_geography_frame_converts_legacy_2d_weights_to_rows(): + dataset = MockDataset() + weights = np.array( + [ + [11.0, 12.0, 13.0, 14.0], + [21.0, 22.0, 23.0, 24.0], + ] + ) + + frame = build_long_geography_frame( + dataset, + area_types="constituency", + area_codes=["E14001001", "E14001002"], + weights=weights, + ) + + assert frame["area_index"].tolist() == [0, 0, 0, 0, 1, 1, 1, 1] + assert frame["area_code"].tolist() == [ + "E14001001", + "E14001001", + "E14001001", + "E14001001", + "E14001002", + "E14001002", + "E14001002", + "E14001002", + ] + assert frame["household_index"].tolist() == [0, 1, 2, 3, 0, 1, 2, 3] + assert frame["weight"].tolist() == [ + 11.0, + 12.0, + 13.0, + 14.0, + 21.0, + 22.0, + 23.0, + 24.0, + ] + + +def test_2d_weights_can_only_build_one_area_type(): + dataset = MockDataset() + weights = np.ones((2, 4)) + + with pytest.raises(ValueError, match="one area_type"): + build_long_geography_frame( + dataset, + area_types=("constituency", "la"), + area_codes={ + "constituency": ["E14001001", "E14001002"], + "la": ["E09000001"], + }, + weights=weights, + ) + + +def test_write_long_geography_weights_refuses_large_dense_conversion( + tmp_path, + monkeypatch, +): + dataset = MockDataset() + (tmp_path / "constituencies_2024.csv").write_text( + "code,name\nE14001001,A\nE14001002,B\n" + ) + _write_h5( + tmp_path / "constituency_weights.h5", + "2025", + np.ones((2, 4)), + ) + + import policyengine_uk_data.calibration.long_geography as mod + + monkeypatch.setattr(mod, "STORAGE_FOLDER", tmp_path) + + with pytest.raises(ValueError, match="Refusing to expand"): + write_long_geography_weights( + dataset=dataset, + weight_files={"constituency": "constituency_weights.h5"}, + dataset_key="2025", + output_path=tmp_path / LONG_GEOGRAPHY_WEIGHTS_FILE, + area_types=("constituency",), + max_dense_cells=4, + ) + + +def test_build_area_household_indices_returns_empty_arrays_for_empty_areas(): + dataset = MockDataset() + + indices = build_area_household_indices( + dataset, + area_type="constituency", + area_codes=["E14001001", "E14001002", "E14001003"], + ) + + assert indices["E14001001"].tolist() == [0, 3] + assert indices["E14001002"].tolist() == [1] + assert indices["E14001003"].tolist() == [] + + +def test_build_long_geography_frame_requires_geography_column(): + dataset = MockDataset() + dataset.household = dataset.household.drop(columns=["la_code_oa"]) + + with pytest.raises(ValueError, match="la_code_oa"): + build_long_geography_frame( + dataset, + area_types="la", + area_codes=["E09000001"], + ) + + +def test_area_support_tracks_unique_source_households_and_ess(): + dataset = MockDataset() + frame = build_long_geography_frame( + dataset, + area_types="constituency", + area_codes=["E14001001", "E14001002", "E14001003"], + ) + + support = area_support_from_long_geography( + frame, + area_codes={"constituency": ["E14001001", "E14001002", "E14001003"]}, + ) + + area = support[support["area_code"] == "E14001001"].iloc[0] + assert area["n_rows"] == 2 + assert area["n_source_households"] == 1 + assert area["effective_sample_size"] == pytest.approx(2.0) + + empty_area = support[support["area_code"] == "E14001003"].iloc[0] + assert empty_area["n_rows"] == 0 + assert empty_area["effective_sample_size"] == 0 + + +def test_area_support_distinguishes_same_household_id_from_different_years(): + dataset = MockDataset() + dataset.household.loc[3, "source_year"] = 2024 + frame = build_long_geography_frame( + dataset, + area_types="constituency", + area_codes=["E14001001"], + ) + + support = area_support_from_long_geography(frame) + + assert support["n_source_households"].iloc[0] == 2 + + +def test_geography_support_report_summarizes_low_support_areas(): + dataset = MockDataset() + + _, summary = geography_support_report( + dataset, + area_types=("constituency", "la"), + area_codes={ + "constituency": ["E14001001", "E14001002", "E14001003"], + "la": ["E09000001"], + }, + min_source_households=2, + min_effective_sample_size=2, + ) + + constituency = summary[summary["area_type"] == "constituency"].iloc[0] + assert constituency["n_areas"] == 3 + assert constituency["n_nonempty_areas"] == 2 + assert constituency["low_support_areas"] == 3 + + +def test_write_long_geography_weights_combines_oa_and_derived_area_types( + tmp_path, + monkeypatch, +): + dataset = MockDataset() + pd.DataFrame({"oa_code": ["E00000001", "E00000002", "E00000003"]}).to_csv( + tmp_path / "oa_crosswalk.csv.gz", + index=False, + compression="gzip", + ) + (tmp_path / "constituencies_2024.csv").write_text( + "code,name\nE14001001,A\nE14001002,B\n" + ) + (tmp_path / "local_authorities_2021.csv").write_text("code,name\nE09000001,C\n") + _write_h5( + tmp_path / "constituency_weights.h5", + "2025", + np.array( + [ + [11.0, 12.0, 13.0, 14.0], + [21.0, 22.0, 23.0, 24.0], + ] + ), + ) + _write_h5( + tmp_path / "la_weights.h5", + "2025", + np.array([100.0, 0.0, 300.0, 400.0]), + ) + + import policyengine_uk_data.calibration.long_geography as mod + + monkeypatch.setattr(mod, "STORAGE_FOLDER", tmp_path) + + output_path = tmp_path / LONG_GEOGRAPHY_WEIGHTS_FILE + frame = write_long_geography_weights( + dataset=dataset, + weight_files={ + "constituency": "constituency_weights.h5", + "la": "la_weights.h5", + }, + dataset_key="2025", + output_path=output_path, + ) + + assert output_path.exists() + loaded = pd.read_csv(output_path) + pd.testing.assert_frame_equal(loaded, frame, check_dtype=False) + assert frame["area_type"].tolist() == [ + "oa", + "oa", + "oa", + "constituency", + "constituency", + "constituency", + "constituency", + "constituency", + "constituency", + "constituency", + "constituency", + "la", + ] + assert frame["weight"].tolist() == [ + 1.0, + 2.0, + 1.0, + 11.0, + 12.0, + 13.0, + 14.0, + 21.0, + 22.0, + 23.0, + 24.0, + 100.0, + ] diff --git a/policyengine_uk_data/tests/test_release_manifest.py b/policyengine_uk_data/tests/test_release_manifest.py index e1858bfda..2fac4c219 100644 --- a/policyengine_uk_data/tests/test_release_manifest.py +++ b/policyengine_uk_data/tests/test_release_manifest.py @@ -113,12 +113,17 @@ def test_build_release_manifest_tracks_uk_release_artifacts(tmp_path): tmp_path / "local_authority_weights.h5", weights_bytes, ) + long_weights_path = _write_file( + tmp_path / "local_geography_weights.csv.gz", + b"long-geography-weights", + ) manifest = build_release_manifest( files_with_repo_paths=[ (enhanced_path, "enhanced_frs_2023_24.h5"), (baseline_path, "frs_2023_24.h5"), (weights_path, "local_authority_weights.h5"), + (long_weights_path, "local_geography_weights.csv.gz"), ], version="1.40.4", repo_id=PRIVATE_REPO, @@ -177,6 +182,10 @@ def test_build_release_manifest_tracks_uk_release_artifacts(tmp_path): ) assert manifest["artifacts"]["frs_2023_24"]["sha256"] == _sha256(baseline_bytes) assert manifest["artifacts"]["local_authority_weights"]["kind"] == "weights" + assert manifest["artifacts"]["local_geography_weights"]["kind"] == "weights" + assert manifest["artifacts"]["local_geography_weights"]["path"] == ( + "local_geography_weights.csv.gz" + ) assert manifest["artifacts"]["enhanced_frs_2023_24"]["uri"] == ( f"hf://model/{PRIVATE_REPO}@1.40.4/enhanced_frs_2023_24.h5" ) diff --git a/policyengine_uk_data/utils/release_manifest.py b/policyengine_uk_data/utils/release_manifest.py index 442d05a70..c96d66ba0 100644 --- a/policyengine_uk_data/utils/release_manifest.py +++ b/policyengine_uk_data/utils/release_manifest.py @@ -27,12 +27,17 @@ def _compute_file_checksum(file_path: Path) -> str: def _artifact_key(path_in_repo: str) -> str: - return str(PurePosixPath(path_in_repo).with_suffix("")) + path = PurePosixPath(path_in_repo) + if path.name.endswith(".csv.gz"): + return str(path.with_suffix("").with_suffix("")) + return str(path.with_suffix("")) def _artifact_kind(path_in_repo: str) -> str: path = PurePosixPath(path_in_repo) suffix = path.suffix.lower() + if "weight" in path.stem: + return "weights" if suffix == ".h5": if "weight" in path.stem: return "weights"