diff --git a/changelog.d/702.fixed b/changelog.d/702.fixed new file mode 100644 index 000000000..6e2bafc59 --- /dev/null +++ b/changelog.d/702.fixed @@ -0,0 +1 @@ +Aligned clone-half enhanced CPS generation with the modeled SPM pipeline by rebuilding cloned SPM thresholds deterministically and keeping zero-weight synthetic households near zero in sparse reweighting priors. diff --git a/policyengine_us_data/calibration/calibration_utils.py b/policyengine_us_data/calibration/calibration_utils.py index 8af1bab7a..a4c489bab 100644 --- a/policyengine_us_data/calibration/calibration_utils.py +++ b/policyengine_us_data/calibration/calibration_utils.py @@ -7,10 +7,12 @@ import numpy as np import pandas as pd -from spm_calculator import SPMCalculator, spm_equivalence_scale -from spm_calculator.geoadj import calculate_geoadj_from_rent - -from policyengine_us_data.utils.spm import TENURE_CODE_MAP +from policyengine_us_data.utils.spm import ( + TENURE_CODE_MAP, + calculate_geoadj_from_rent, + get_spm_reference_thresholds, + spm_equivalence_scale, +) from policyengine_us.variables.household.demographic.geographic.state_name import ( StateName, ) @@ -491,6 +493,7 @@ def get_cd_index_mapping(db_uri: str = None): tuple: (cd_to_index dict, index_to_cd dict, cds_ordered list) """ from sqlalchemy import create_engine, text + from pathlib import Path from policyengine_us_data.storage import STORAGE_FOLDER if db_uri is None: @@ -531,7 +534,7 @@ def load_geo_labels(path) -> List[str]: def load_cd_geoadj_values( cds_to_calibrate: List[str], -) -> Dict[str, float]: +) -> Dict[str, Dict[str, float]]: """ Load geographic adjustment factors from rent data CSV. Uses median 2BR rent by CD vs national median to compute geoadj. @@ -550,11 +553,18 @@ def load_cd_geoadj_values( # Build lookup from rent data rent_lookup = {} for _, row in rent_df.iterrows(): - geoadj = calculate_geoadj_from_rent( - local_rent=row["median_2br_rent"], - national_rent=row["national_median_2br_rent"], - ) - rent_lookup[row["cd_geoid"]] = geoadj + rent_lookup[row["cd_geoid"]] = { + tenure: calculate_geoadj_from_rent( + local_rent=row["median_2br_rent"], + national_rent=row["national_median_2br_rent"], + tenure=tenure, + ) + for tenure in ( + "owner_with_mortgage", + "owner_without_mortgage", + "renter", + ) + } geoadj_dict = {} for cd in cds_to_calibrate: @@ -562,7 +572,11 @@ def load_cd_geoadj_values( geoadj_dict[cd] = rent_lookup[cd] else: print(f"Warning: No rent data for CD {cd}, using geoadj=1.0") - geoadj_dict[cd] = 1.0 + geoadj_dict[cd] = { + "owner_with_mortgage": 1.0, + "owner_without_mortgage": 1.0, + "renter": 1.0, + } return geoadj_dict @@ -593,6 +607,11 @@ def calculate_spm_thresholds_vectorized( Returns: Float32 array of SPM thresholds, one per SPM unit. """ + person_ages = np.asarray(person_ages) + person_spm_unit_ids = np.asarray(person_spm_unit_ids) + spm_unit_tenure_types = np.asarray(spm_unit_tenure_types) + spm_unit_geoadj = np.asarray(spm_unit_geoadj, dtype=np.float64) + n_units = len(spm_unit_tenure_types) # Count adults and children per SPM unit @@ -614,8 +633,7 @@ def calculate_spm_thresholds_vectorized( tenure_codes[mask] = code # Look up base thresholds - calc = SPMCalculator(year=year) - base_thresholds = calc.get_base_thresholds() + base_thresholds = get_spm_reference_thresholds(year) thresholds = np.zeros(n_units, dtype=np.float32) for i in range(n_units): diff --git a/policyengine_us_data/calibration/publish_local_area.py b/policyengine_us_data/calibration/publish_local_area.py index 68ce32cee..0429c933f 100644 --- a/policyengine_us_data/calibration/publish_local_area.py +++ b/policyengine_us_data/calibration/publish_local_area.py @@ -8,11 +8,6 @@ python publish_local_area.py [--skip-download] [--states-only] [--upload] """ -import hashlib -import json -import shutil - - import numpy as np from pathlib import Path from typing import List @@ -31,6 +26,9 @@ from policyengine_us_data.calibration.block_assignment import ( derive_geography_from_blocks, ) +from policyengine_us_data.calibration.county_assignment import ( + get_county_filter_probability, +) from policyengine_us_data.calibration.clone_and_assign import ( GeographyAssignment, assign_random_geography, @@ -45,76 +43,28 @@ CHECKPOINT_FILE_CITIES = Path("completed_cities.txt") WORK_DIR = Path("local_area_build") -NYC_COUNTY_FIPS = {"36005", "36047", "36061", "36081", "36085"} - - -META_FILE = WORK_DIR / "checkpoint_meta.json" - - -def compute_input_fingerprint( - weights_path: Path, dataset_path: Path, n_clones: int, seed: int -) -> str: - h = hashlib.sha256() - for p in [weights_path, dataset_path]: - with open(p, "rb") as f: - while chunk := f.read(8192): - h.update(chunk) - h.update(f"{n_clones}:{seed}".encode()) - return h.hexdigest()[:16] - - -def validate_or_clear_checkpoints(fingerprint: str): - if META_FILE.exists(): - stored = json.loads(META_FILE.read_text()) - if stored.get("fingerprint") == fingerprint: - print(f"Inputs unchanged ({fingerprint}), resuming...") - return - print( - f"Inputs changed " - f"({stored.get('fingerprint')} -> {fingerprint}), " - f"clearing..." - ) - else: - print(f"No checkpoint metadata, starting fresh ({fingerprint})") - h5_count = sum( - 1 - for subdir in ["states", "districts", "cities"] - if (WORK_DIR / subdir).exists() - for _ in (WORK_DIR / subdir).rglob("*.h5") - ) - if h5_count > 0: - print( - f"WARNING: {h5_count} H5 files exist. " - f"Clearing only checkpoint files, preserving H5s." - ) - for cp in [ - CHECKPOINT_FILE, - CHECKPOINT_FILE_DISTRICTS, - CHECKPOINT_FILE_CITIES, - ]: - if cp.exists(): - cp.unlink() - else: - for cp in [ - CHECKPOINT_FILE, - CHECKPOINT_FILE_DISTRICTS, - CHECKPOINT_FILE_CITIES, - ]: - if cp.exists(): - cp.unlink() - for subdir in ["states", "districts", "cities"]: - d = WORK_DIR / subdir - if d.exists(): - shutil.rmtree(d) - META_FILE.parent.mkdir(parents=True, exist_ok=True) - META_FILE.write_text(json.dumps({"fingerprint": fingerprint})) - - -SUB_ENTITIES = [ - "tax_unit", - "spm_unit", - "family", - "marital_unit", +NYC_COUNTIES = { + "QUEENS_COUNTY_NY", + "BRONX_COUNTY_NY", + "RICHMOND_COUNTY_NY", + "NEW_YORK_COUNTY_NY", + "KINGS_COUNTY_NY", +} + +NYC_CDS = [ + "3603", + "3605", + "3606", + "3607", + "3608", + "3609", + "3610", + "3611", + "3612", + "3613", + "3614", + "3615", + "3616", ] @@ -163,7 +113,7 @@ def build_h5( dataset_path: Path, output_path: Path, cd_subset: List[str] = None, - county_fips_filter: set = None, + county_filter: set = None, takeup_filter: List[str] = None, ) -> Path: """Build an H5 file by cloning records for each nonzero weight. @@ -174,8 +124,8 @@ def build_h5( dataset_path: Path to base dataset H5 file. output_path: Where to write the output H5 file. cd_subset: If provided, only include clones for these CDs. - county_fips_filter: If provided, zero out weights for clones - whose county FIPS is not in this set. + county_filter: If provided, scale weights by P(target|CD) + for city datasets. takeup_filter: List of takeup vars to apply. Returns: @@ -216,11 +166,17 @@ def build_h5( cd_mask = np.vectorize(lambda cd: cd in cd_subset_set)(clone_cds_matrix) W[~cd_mask] = 0 - # County FIPS filtering: zero out clones not in target counties - if county_fips_filter is not None: - fips_array = np.asarray(geography.county_fips).reshape(n_clones_total, n_hh) - fips_mask = np.isin(fips_array, list(county_fips_filter)) - W[~fips_mask] = 0 + # County filtering: scale weights by P(target_counties | CD) + if county_filter is not None: + unique_cds = np.unique(clone_cds_matrix) + cd_prob = { + cd: get_county_filter_probability(cd, county_filter) for cd in unique_cds + } + p_matrix = np.vectorize( + cd_prob.__getitem__, + otypes=[float], + )(clone_cds_matrix) + W *= p_matrix label = ( f"CD subset {cd_subset}" @@ -237,7 +193,7 @@ def build_h5( if n_clones == 0: raise ValueError( f"No active clones after filtering. " - f"cd_subset={cd_subset}, county_fips_filter={county_fips_filter}" + f"cd_subset={cd_subset}, county_filter={county_filter}" ) clone_weights = W[active_geo, active_hh] active_blocks = blocks.reshape(n_clones_total, n_hh)[active_geo, active_hh] @@ -258,6 +214,12 @@ def build_h5( for p_idx, p_hh_id in enumerate(person_hh_ids): hh_to_persons[hh_id_to_idx[int(p_hh_id)]].append(p_idx) + SUB_ENTITIES = [ + "tax_unit", + "spm_unit", + "family", + "marital_unit", + ] hh_to_entity = {} entity_id_arrays = {} person_entity_id_arrays = {} @@ -351,6 +313,22 @@ def build_h5( unique_geo = derive_geography_from_blocks(unique_blocks) clone_geo = {k: v[block_inv] for k, v in unique_geo.items()} + # === Calculate weights for all entity levels === + person_weights = np.repeat(clone_weights, persons_per_clone) + per_person_wt = clone_weights / np.maximum(persons_per_clone, 1) + + entity_weights = {} + for ek in SUB_ENTITIES: + n_ents = len(entity_clone_idx[ek]) + ent_person_counts = np.zeros(n_ents, dtype=np.int32) + np.add.at( + ent_person_counts, + new_person_entity_ids[ek], + 1, + ) + clone_ids_e = np.repeat(np.arange(n_clones), entities_per_clone[ek]) + entity_weights[ek] = per_person_wt[clone_ids_e] * ent_person_counts + # === Determine variables to save === vars_to_save = set(sim.input_variables) vars_to_save.add("county") @@ -400,6 +378,7 @@ def build_h5( for period in periods: values = holder.get_array(period) + # Convert Arrow-backed arrays to numpy before indexing if hasattr(values, "_pa_array") or hasattr(values, "_ndarray"): values = np.asarray(values) @@ -436,12 +415,16 @@ def build_h5( } # === Override weights === - # Only write household_weight; sub-entity weights (tax_unit_weight, - # spm_unit_weight, person_weight, etc.) are formula variables in - # policyengine-us that derive from household_weight at runtime. data["household_weight"] = { time_period: clone_weights.astype(np.float32), } + data["person_weight"] = { + time_period: person_weights.astype(np.float32), + } + for ek in SUB_ENTITIES: + data[f"{ek}_weight"] = { + time_period: entity_weights[ek].astype(np.float32), + } # === Override geography === data["state_fips"] = { @@ -487,19 +470,11 @@ def build_h5( print("Recalculating SPM thresholds...") unique_cds_list = sorted(set(active_clone_cds)) cd_geoadj_values = load_cd_geoadj_values(unique_cds_list) - # Build per-SPM-unit geoadj from clone's CD - spm_clone_ids = np.repeat( - np.arange(n_clones, dtype=np.int64), - entities_per_clone["spm_unit"], - ) - spm_unit_geoadj = np.array( - [cd_geoadj_values[active_clone_cds[c]] for c in spm_clone_ids], - dtype=np.float64, - ) - # Get cloned person ages and SPM tenure types + # Get cloned person ages and SPM unit IDs person_ages = sim.calculate("age", map_to="person").values[person_clone_idx] + # Get cloned tenure types spm_tenure_holder = sim.get_holder("spm_unit_tenure_type") spm_tenure_periods = spm_tenure_holder.get_known_periods() if spm_tenure_periods: @@ -516,6 +491,21 @@ def build_h5( dtype="S30", ) + # Build per-SPM-unit geoadj from the clone's CD and tenure. + spm_clone_ids = np.repeat( + np.arange(n_clones, dtype=np.int64), + entities_per_clone["spm_unit"], + ) + spm_unit_geoadj = np.array( + [ + cd_geoadj_values[active_clone_cds[clone_id]][ + (spm_tenure_cloned[spm_unit_index].decode().lower()) + ] + for spm_unit_index, clone_id in enumerate(spm_clone_ids) + ], + dtype=np.float64, + ) + new_spm_thresholds = calculate_spm_thresholds_vectorized( person_ages=person_ages, person_spm_unit_ids=new_person_entity_ids["spm_unit"], @@ -754,6 +744,8 @@ def build_cities( """Build city H5 files with checkpointing, optionally uploading.""" w = np.load(weights_path) + all_cds = sorted(set(geography.cd_geoid.astype(str))) + cities_dir = output_dir / "cities" cities_dir.mkdir(parents=True, exist_ok=True) @@ -763,29 +755,34 @@ def build_cities( if "NYC" in completed_cities: print("Skipping NYC (already completed)") else: - output_path = cities_dir / "NYC.h5" - - try: - build_h5( - weights=w, - geography=geography, - dataset_path=dataset_path, - output_path=output_path, - county_fips_filter=NYC_COUNTY_FIPS, - takeup_filter=takeup_filter, - ) - - if upload: - print("Uploading NYC.h5 to GCP...") - upload_local_area_file(str(output_path), "cities", skip_hf=True) - hf_queue.append((str(output_path), "cities")) - - record_completed_city("NYC") - print("Completed NYC") - - except Exception as e: - print(f"ERROR building NYC: {e}") - raise + cd_subset = [cd for cd in all_cds if cd in NYC_CDS] + if not cd_subset: + print("No NYC-related CDs found, skipping") + else: + output_path = cities_dir / "NYC.h5" + + try: + build_h5( + weights=w, + geography=geography, + dataset_path=dataset_path, + output_path=output_path, + cd_subset=cd_subset, + county_filter=NYC_COUNTIES, + takeup_filter=takeup_filter, + ) + + if upload: + print("Uploading NYC.h5 to GCP...") + upload_local_area_file(str(output_path), "cities", skip_hf=True) + hf_queue.append((str(output_path), "cities")) + + record_completed_city("NYC") + print("Completed NYC") + + except Exception as e: + print(f"ERROR building NYC: {e}") + raise if upload and hf_queue: print(f"\nUploading batch of {len(hf_queue)} city files to HuggingFace...") @@ -880,19 +877,9 @@ def main(): print(f"Using dataset: {inputs['dataset']}") - print("Computing input fingerprint...") - fingerprint = compute_input_fingerprint( - inputs["weights"], - inputs["dataset"], - args.n_clones, - args.seed, - ) - validate_or_clear_checkpoints(fingerprint) - - print("Loading base simulation to get household count...") - _sim = Microsimulation(dataset=str(inputs["dataset"])) - n_hh = len(_sim.calculate("household_id", map_to="household").values) - del _sim + sim = Microsimulation(dataset=str(inputs["dataset"])) + n_hh = sim.calculate("household_id", map_to="household").shape[0] + del sim print(f"\nBase dataset has {n_hh:,} households") geo_cache = WORK_DIR / f"geography_{n_hh}x{args.n_clones}_s{args.seed}.npz" diff --git a/policyengine_us_data/datasets/cps/enhanced_cps.py b/policyengine_us_data/datasets/cps/enhanced_cps.py index 3d691ff4f..085245cc4 100644 --- a/policyengine_us_data/datasets/cps/enhanced_cps.py +++ b/policyengine_us_data/datasets/cps/enhanced_cps.py @@ -28,6 +28,37 @@ torch = None +def initialize_weight_priors( + original_weights: np.ndarray, + seed: int = 1456, + epsilon: float = 1e-6, +) -> np.ndarray: + """Build deterministic positive priors for sparse reweighting. + + Original CPS households should keep their survey weights unchanged. + Clone-half households start with zero weight on purpose, so they + should receive only a tiny positive epsilon to keep the log + optimization well-defined without giving them a meaningful head start. + """ + + weights = np.asarray(original_weights, dtype=np.float64) + if np.any(weights < 0): + raise ValueError("original_weights must be non-negative") + + rng = np.random.default_rng(seed) + priors = np.empty_like(weights, dtype=np.float64) + + positive_mask = weights > 0 + if positive_mask.any(): + priors[positive_mask] = weights[positive_mask] + + zero_mask = ~positive_mask + if zero_mask.any(): + priors[zero_mask] = epsilon * rng.uniform(1.0, 2.0, size=zero_mask.sum()) + + return priors + + def _get_period_array(period_values: dict, period: int) -> np.ndarray: """Get a period array from a TIME_PERIOD_ARRAYS variable dict.""" value = period_values.get(period) @@ -221,9 +252,9 @@ def generate(self): data = sim.dataset.load_dataset() base_year = int(sim.default_calculation_period) data["household_weight"] = {} - original_weights = sim.calculate("household_weight") - original_weights = original_weights.values + np.random.normal( - 1, 0.1, len(original_weights) + original_weights = initialize_weight_priors( + sim.calculate("household_weight").values, + seed=1456, ) bad_targets = [ @@ -357,9 +388,9 @@ def generate(self): sim = Microsimulation(dataset=self.input_dataset) data = sim.dataset.load_dataset() - original_weights = sim.calculate("household_weight") - original_weights = original_weights.values + np.random.normal( - 1, 0.1, len(original_weights) + original_weights = initialize_weight_priors( + sim.calculate("household_weight").values, + seed=1456, ) for year in [2024]: loss_matrix, targets_array = build_loss_matrix(self.input_dataset, year) diff --git a/policyengine_us_data/datasets/cps/extended_cps.py b/policyengine_us_data/datasets/cps/extended_cps.py index c840f29af..f753a5b85 100644 --- a/policyengine_us_data/datasets/cps/extended_cps.py +++ b/policyengine_us_data/datasets/cps/extended_cps.py @@ -18,6 +18,10 @@ convert_mortgage_interest_to_structural_inputs, impute_tax_unit_mortgage_balance_hints, ) +from policyengine_us_data.utils.spm import ( + get_spm_reference_thresholds, + spm_equivalence_scale, +) from policyengine_us_data.utils.policyengine import has_policyengine_us_variables from policyengine_us_data.utils.retirement_limits import ( get_retirement_limits, @@ -76,7 +80,6 @@ def _supports_structural_mortgage_inputs() -> bool: "spm_unit_payroll_tax_reported", "spm_unit_federal_tax_reported", "spm_unit_state_tax_reported", - "spm_unit_spm_threshold", "spm_unit_net_income_reported", "spm_unit_pre_subsidy_childcare_expenses", # Medical expenses @@ -324,6 +327,25 @@ def reconcile_ss_subcomponents(predictions, total_ss): "social_security_survivors", } +_SPM_TENURE_TO_REFERENCE_KEY = { + b"OWNER_WITH_MORTGAGE": "owner_with_mortgage", + b"OWNER_WITHOUT_MORTGAGE": "owner_without_mortgage", + b"RENTER": "renter", + "OWNER_WITH_MORTGAGE": "owner_with_mortgage", + "OWNER_WITHOUT_MORTGAGE": "owner_without_mortgage", + "RENTER": "renter", +} + + +def _reference_threshold_key_for_tenure(tenure_type) -> str: + reference_key = _SPM_TENURE_TO_REFERENCE_KEY.get(tenure_type) + if reference_key is None: + raise ValueError( + "Unsupported spm_unit_tenure_type for cloned threshold rebuild: " + f"{tenure_type!r}" + ) + return reference_key + def derive_clone_capped_childcare_expenses( donor_pre_subsidy: np.ndarray, @@ -487,6 +509,75 @@ def _apply_post_processing(predictions, X_test, time_period, data): return predictions +def _index_into_spm_units( + person_spm_unit_ids: np.ndarray, + spm_unit_ids: np.ndarray, +) -> np.ndarray: + if np.all(spm_unit_ids[:-1] <= spm_unit_ids[1:]): + indices = np.searchsorted(spm_unit_ids, person_spm_unit_ids) + valid = (indices >= 0) & (indices < len(spm_unit_ids)) + if valid.all() and np.array_equal(spm_unit_ids[indices], person_spm_unit_ids): + return indices + + indexer = pd.Index(spm_unit_ids).get_indexer(person_spm_unit_ids) + if (indexer < 0).any(): + raise ValueError("person_spm_unit_id contains values missing from spm_unit_id") + return indexer + + +def rebuild_cloned_spm_thresholds(data: dict, time_period: int) -> np.ndarray: + """Rebuild cloned SPM thresholds from donor-half geography. + + The clone half should keep the donor household's geography but use the + current threshold formula rather than a learned QRF output. We infer the + donor-half geographic adjustment from the stored first-half thresholds and + apply that same geography to the corresponding clone-half SPM units. + """ + + person_ages = np.asarray(data["age"][time_period]) + person_spm_unit_ids = np.asarray(data["person_spm_unit_id"][time_period]) + spm_unit_ids = np.asarray(data["spm_unit_id"][time_period]) + tenure_types = np.asarray(data["spm_unit_tenure_type"][time_period]) + stored_thresholds = np.asarray( + data["spm_unit_spm_threshold"][time_period], dtype=float + ) + + n_units = len(spm_unit_ids) + if n_units % 2 != 0: + raise ValueError( + "Expected cloned SPM units to have an even-length spm_unit_id array" + ) + + unit_indices = _index_into_spm_units(person_spm_unit_ids, spm_unit_ids) + is_adult = person_ages >= 18 + adult_counts = np.zeros(n_units, dtype=np.int32) + child_counts = np.zeros(n_units, dtype=np.int32) + np.add.at(adult_counts, unit_indices, is_adult.astype(np.int32)) + np.add.at(child_counts, unit_indices, (~is_adult).astype(np.int32)) + + thresholds_by_tenure = get_spm_reference_thresholds(time_period) + reference_base = np.array( + [ + thresholds_by_tenure[_reference_threshold_key_for_tenure(tenure)] + for tenure in tenure_types + ], + dtype=float, + ) + equivalence_scale = spm_equivalence_scale(adult_counts, child_counts) + + n_half = n_units // 2 + donor_denominator = reference_base[:n_half] * equivalence_scale[:n_half] + donor_geoadj = np.divide( + stored_thresholds[:n_half], + donor_denominator, + out=np.ones(n_half, dtype=float), + where=donor_denominator > 0, + ) + geoadj = np.concatenate([donor_geoadj, donor_geoadj]) + rebuilt = reference_base * equivalence_scale * geoadj + return rebuilt.astype(np.float32) + + def _splice_cps_only_predictions( data: dict, predictions: pd.DataFrame, @@ -625,6 +716,14 @@ def generate(self): time_period=self.time_period, dataset_path=str(self.cps.file_path), ) + if "spm_unit_spm_threshold" in new_data: + logger.info("Rebuilding cloned SPM thresholds from donor-half geography") + new_data["spm_unit_spm_threshold"] = { + self.time_period: rebuild_cloned_spm_thresholds( + new_data, + self.time_period, + ) + } new_data = self._rename_imputed_to_inputs(new_data) if _supports_structural_mortgage_inputs(): diff --git a/policyengine_us_data/db/etl_national_targets.py b/policyengine_us_data/db/etl_national_targets.py index 7f98dcd86..0521521a4 100644 --- a/policyengine_us_data/db/etl_national_targets.py +++ b/policyengine_us_data/db/etl_national_targets.py @@ -12,6 +12,9 @@ from policyengine_us_data.storage.calibration_targets.soi_metadata import ( RETIREMENT_CONTRIBUTION_TARGETS, ) +from policyengine_us_data.utils.census_spm import ( + build_census_spm_capped_housing_subsidy_target, +) from policyengine_us_data.utils.db import ( DEFAULT_YEAR, etl_argparser, @@ -185,13 +188,7 @@ def extract_national_targets(year: int = DEFAULT_YEAR): "notes": "Work and childcare expenses for SPM", "year": HARDCODED_YEAR, }, - { - "variable": "spm_unit_capped_housing_subsidy", - "value": 35e9, - "source": "HUD/Census", - "notes": "Housing subsidies", - "year": HARDCODED_YEAR, - }, + build_census_spm_capped_housing_subsidy_target(HARDCODED_YEAR), { "variable": "tanf", "value": 9e9, diff --git a/policyengine_us_data/storage/calibration_targets/pull_hardcoded_targets.py b/policyengine_us_data/storage/calibration_targets/pull_hardcoded_targets.py index 3199a56a2..0503c7b59 100644 --- a/policyengine_us_data/storage/calibration_targets/pull_hardcoded_targets.py +++ b/policyengine_us_data/storage/calibration_targets/pull_hardcoded_targets.py @@ -1,52 +1,67 @@ import pandas as pd import numpy as np from policyengine_us_data.storage import CALIBRATION_FOLDER +from policyengine_us_data.utils.census_spm import ( + get_census_spm_capped_housing_subsidy_total, +) """ -Hardcoded targets for the year 2024 from CPS-derived statistics and other sources. Include medical expenses, sum of SPM thresholds, and child support expenses. +Hardcoded targets for the year 2024 from CPS-derived statistics and other +sources. Housing uses the Census SPM capped subsidy concept, not HUD spending. """ -HARD_CODED_TOTALS = { - "health_insurance_premiums_without_medicare_part_b": 385e9, - "other_medical_expenses": 278e9, - "medicare_part_b_premiums": 112e9, - "over_the_counter_health_expenses": 72e9, - "spm_unit_spm_threshold": 3_945e9, - "child_support_expense": 33e9, - "child_support_received": 33e9, - "spm_unit_capped_work_childcare_expenses": 348e9, - "spm_unit_capped_housing_subsidy": 35e9, - "tanf": 9e9, - # Alimony could be targeted via SOI - "alimony_income": 13e9, - "alimony_expense": 13e9, - # Rough estimate, not CPS derived - "real_estate_taxes": 500e9, # Rough estimate between 350bn and 600bn total property tax collections - "rent": 735e9, # ACS total uprated by CPI - # Table 5A from https://www.irs.gov/statistics/soi-tax-stats-individual-information-return-form-w2-statistics - # shows $38,316,190,000 in Box 7: Social security tips (2018) - # Wages and salaries grew 32% from 2018 to 2023: https://fred.stlouisfed.org/graph/?g=1J0CC - # Assume 40% through 2024 - "tip_income": 38e9 * 1.4, -} + +def get_hard_coded_totals( + year: int = 2024, + storage_folder=None, +) -> dict[str, float]: + return { + "health_insurance_premiums_without_medicare_part_b": 385e9, + "other_medical_expenses": 278e9, + "medicare_part_b_premiums": 112e9, + "over_the_counter_health_expenses": 72e9, + "spm_unit_spm_threshold": 3_945e9, + "child_support_expense": 33e9, + "child_support_received": 33e9, + "spm_unit_capped_work_childcare_expenses": 348e9, + "spm_unit_capped_housing_subsidy": get_census_spm_capped_housing_subsidy_total( + year, storage_folder=storage_folder + ), + "tanf": 9e9, + # Alimony could be targeted via SOI + "alimony_income": 13e9, + "alimony_expense": 13e9, + # Rough estimate, not CPS derived + "real_estate_taxes": 500e9, # Rough estimate between 350bn and 600bn total property tax collections + "rent": 735e9, # ACS total uprated by CPI + # Table 5A from https://www.irs.gov/statistics/soi-tax-stats-individual-information-return-form-w2-statistics + # shows $38,316,190,000 in Box 7: Social security tips (2018) + # Wages and salaries grew 32% from 2018 to 2023: https://fred.stlouisfed.org/graph/?g=1J0CC + # Assume 40% through 2024 + "tip_income": 38e9 * 1.4, + } -def pull_hardcoded_targets(): +def pull_hardcoded_targets(year: int = 2024, storage_folder=None): """ Returns a DataFrame with hardcoded targets for various CPS-derived statistics and other sources. """ + hard_coded_totals = get_hard_coded_totals( + year=year, + storage_folder=storage_folder, + ) data = { - "DATA_SOURCE": ["hardcoded"] * len(HARD_CODED_TOTALS), - "GEO_ID": ["0000000US"] * len(HARD_CODED_TOTALS), - "GEO_NAME": ["national"] * len(HARD_CODED_TOTALS), - "VARIABLE": list(HARD_CODED_TOTALS.keys()), - "VALUE": list(HARD_CODED_TOTALS.values()), + "DATA_SOURCE": ["hardcoded"] * len(hard_coded_totals), + "GEO_ID": ["0000000US"] * len(hard_coded_totals), + "GEO_NAME": ["national"] * len(hard_coded_totals), + "VARIABLE": list(hard_coded_totals.keys()), + "VALUE": list(hard_coded_totals.values()), "IS_COUNT": [0.0] - * len(HARD_CODED_TOTALS), # All values are monetary amounts, not counts + * len(hard_coded_totals), # All values are monetary amounts, not counts "BREAKDOWN_VARIABLE": [np.nan] - * len(HARD_CODED_TOTALS), # No breakdown variable for hardcoded targets - "LOWER_BOUND": [np.nan] * len(HARD_CODED_TOTALS), - "UPPER_BOUND": [np.nan] * len(HARD_CODED_TOTALS), + * len(hard_coded_totals), # No breakdown variable for hardcoded targets + "LOWER_BOUND": [np.nan] * len(hard_coded_totals), + "UPPER_BOUND": [np.nan] * len(hard_coded_totals), } df = pd.DataFrame(data) diff --git a/policyengine_us_data/tests/test_local_area_calibration/test_spm_thresholds.py b/policyengine_us_data/tests/test_local_area_calibration/test_spm_thresholds.py new file mode 100644 index 000000000..3e5b8bd34 --- /dev/null +++ b/policyengine_us_data/tests/test_local_area_calibration/test_spm_thresholds.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import numpy as np +import pandas as pd +import pytest + +from policyengine_us import CountryTaxBenefitSystem +from policyengine_us_data.calibration.calibration_utils import ( + calculate_spm_thresholds_vectorized, + load_cd_geoadj_values, +) + +SYSTEM = CountryTaxBenefitSystem() +CPI_U = SYSTEM.parameters.gov.bls.cpi.cpi_u + + +def test_load_cd_geoadj_values_returns_tenure_specific_lookup(monkeypatch): + rent_df = pd.DataFrame( + { + "cd_id": ["0101"], + "median_2br_rent": [1_500.0], + "national_median_2br_rent": [1_000.0], + } + ) + monkeypatch.setattr( + "policyengine_us_data.calibration.calibration_utils.pd.read_csv", + lambda *args, **kwargs: rent_df, + ) + + geoadj_lookup = load_cd_geoadj_values(["101"]) + + assert geoadj_lookup["101"]["renter"] == pytest.approx(1.2215) + assert geoadj_lookup["101"]["owner_with_mortgage"] == pytest.approx(1.217) + assert geoadj_lookup["101"]["owner_without_mortgage"] == pytest.approx(1.1615) + + +def test_calculate_spm_thresholds_vectorized_matches_policyengine_us_future_path(): + thresholds = calculate_spm_thresholds_vectorized( + person_ages=np.array([40, 35, 10, 8]), + person_spm_unit_ids=np.array([0, 0, 0, 0]), + spm_unit_tenure_types=np.array([b"RENTER"]), + spm_unit_geoadj=np.array([1.1]), + year=2027, + ) + + cpi_ratio = float(CPI_U("2027-02-01") / CPI_U("2024-02-01")) + expected = 39_430.0 * cpi_ratio * 1.1 + + assert thresholds[0] == pytest.approx(expected) diff --git a/policyengine_us_data/utils/census_spm.py b/policyengine_us_data/utils/census_spm.py new file mode 100644 index 000000000..e7a9c32b4 --- /dev/null +++ b/policyengine_us_data/utils/census_spm.py @@ -0,0 +1,65 @@ +from pathlib import Path + +import pandas as pd + +from policyengine_us_data.storage import STORAGE_FOLDER + +# These totals are derived from raw public-use Census CPS ASEC files by +# summing SPM_CAPHOUSESUB * SPM_WEIGHT / 100 at the SPM-unit level. +# They represent the Census SPM capped housing subsidy concept, not HUD +# spending or outlays. +CENSUS_SPM_CAPPED_HOUSING_SUBSIDY_TOTALS = { + 2022: 29_549_204_420.92, + 2023: 31_844_144_470.85, + 2024: 33_649_114_150.37, +} + + +def get_census_spm_capped_housing_subsidy_total( + year: int, + storage_folder: Path | str | None = None, +) -> float: + """ + Return the Census CPS ASEC total for SPM_CAPHOUSESUB. + + If a storage folder is provided, recompute directly from the local + raw Census CPS HDF file. Otherwise, use the checked-in year-specific + totals above so callers do not depend on local data files at import + time. + """ + + if storage_folder is None: + if year not in CENSUS_SPM_CAPPED_HOUSING_SUBSIDY_TOTALS: + raise ValueError( + f"No published Census SPM capped housing subsidy total for {year}." + ) + return CENSUS_SPM_CAPPED_HOUSING_SUBSIDY_TOTALS[year] + + storage_path = Path(storage_folder) / f"census_cps_{year}.h5" + if not storage_path.exists(): + raise FileNotFoundError( + f"Missing raw Census CPS file for {year}: {storage_path}" + ) + + with pd.HDFStore(storage_path, mode="r") as store: + spm_unit = store["spm_unit"][["SPM_CAPHOUSESUB", "SPM_WEIGHT"]] + + return float((spm_unit.SPM_CAPHOUSESUB * spm_unit.SPM_WEIGHT).sum() / 100) + + +def build_census_spm_capped_housing_subsidy_target( + year: int, + storage_folder: Path | str | None = None, +) -> dict: + return { + "variable": "spm_unit_capped_housing_subsidy", + "value": get_census_spm_capped_housing_subsidy_total( + year, storage_folder=storage_folder + ), + "source": "Census CPS ASEC public-use SPM_CAPHOUSESUB", + "notes": ( + "Capped SPM housing subsidy total from raw Census CPS ASEC " + "SPM_CAPHOUSESUB; not HUD spending or outlays." + ), + "year": year, + } diff --git a/policyengine_us_data/utils/hud_housing.py b/policyengine_us_data/utils/hud_housing.py new file mode 100644 index 000000000..1b7620e5d --- /dev/null +++ b/policyengine_us_data/utils/hud_housing.py @@ -0,0 +1,49 @@ +HUD_USER_HOUSING_ASSISTANCE_BENCHMARKS = { + 2022: { + "reported_households": 4_537_614, + "average_monthly_spending_per_unit": 899, + }, + 2023: { + "reported_households": 4_569_973, + "average_monthly_spending_per_unit": 989, + }, + 2024: { + "reported_households": 4_584_691, + "average_monthly_spending_per_unit": 1_067, + }, + 2025: { + "reported_households": 4_519_561, + "average_monthly_spending_per_unit": 1_135, + }, +} + + +def get_hud_user_housing_benchmark(year: int) -> dict: + if year not in HUD_USER_HOUSING_ASSISTANCE_BENCHMARKS: + raise ValueError(f"No HUD USER housing benchmark for {year}.") + + benchmark = dict(HUD_USER_HOUSING_ASSISTANCE_BENCHMARKS[year]) + benchmark["annual_spending_total"] = ( + benchmark["reported_households"] + * benchmark["average_monthly_spending_per_unit"] + * 12 + ) + return benchmark + + +def build_hud_user_housing_assistance_benchmark(year: int) -> dict: + benchmark = get_hud_user_housing_benchmark(year) + return { + "variable": "housing_assistance", + "annual_spending_total": benchmark["annual_spending_total"], + "reported_households": benchmark["reported_households"], + "average_monthly_spending_per_unit": benchmark[ + "average_monthly_spending_per_unit" + ], + "source": "HUD USER Picture of Subsidized Households", + "notes": ( + "Annual federal spending and occupied assisted households from " + "HUD USER; not Census SPM capped subsidy." + ), + "year": year, + } diff --git a/policyengine_us_data/utils/loss.py b/policyengine_us_data/utils/loss.py index 9a81ea9d6..fee180e3f 100644 --- a/policyengine_us_data/utils/loss.py +++ b/policyengine_us_data/utils/loss.py @@ -12,6 +12,9 @@ from policyengine_us_data.storage.calibration_targets.soi_metadata import ( RETIREMENT_CONTRIBUTION_TARGETS, ) +from policyengine_us_data.utils.census_spm import ( + get_census_spm_capped_housing_subsidy_total, +) from policyengine_core.reforms import Reform from policyengine_us_data.utils.soi import pe_to_soi, get_soi @@ -31,7 +34,9 @@ "child_support_expense": 33e9, "child_support_received": 33e9, "spm_unit_capped_work_childcare_expenses": 348e9, - "spm_unit_capped_housing_subsidy": 35e9, + "spm_unit_capped_housing_subsidy": get_census_spm_capped_housing_subsidy_total( + 2024 + ), "tanf": 9e9, # Alimony could be targeted via SOI "alimony_income": 13e9, diff --git a/policyengine_us_data/utils/spm.py b/policyengine_us_data/utils/spm.py index ad3c9e9fb..a2220339b 100644 --- a/policyengine_us_data/utils/spm.py +++ b/policyengine_us_data/utils/spm.py @@ -1,7 +1,69 @@ -"""SPM threshold calculation utilities using the spm-calculator package.""" +"""SPM threshold calculation utilities aligned with PolicyEngine US.""" +from functools import lru_cache import numpy as np -from spm_calculator import SPMCalculator, spm_equivalence_scale +from policyengine_us import CountryTaxBenefitSystem + +PUBLISHED_SPM_REFERENCE_THRESHOLDS = { + 2015: { + "renter": 25_155.0, + "owner_with_mortgage": 24_859.0, + "owner_without_mortgage": 20_639.0, + }, + 2016: { + "renter": 25_558.0, + "owner_with_mortgage": 25_248.0, + "owner_without_mortgage": 20_943.0, + }, + 2017: { + "renter": 26_213.0, + "owner_with_mortgage": 25_897.0, + "owner_without_mortgage": 21_527.0, + }, + 2018: { + "renter": 26_905.0, + "owner_with_mortgage": 26_565.0, + "owner_without_mortgage": 22_095.0, + }, + 2019: { + "renter": 27_515.0, + "owner_with_mortgage": 27_172.0, + "owner_without_mortgage": 22_600.0, + }, + 2020: { + "renter": 28_881.0, + "owner_with_mortgage": 28_533.0, + "owner_without_mortgage": 23_948.0, + }, + 2021: { + "renter": 31_453.0, + "owner_with_mortgage": 31_089.0, + "owner_without_mortgage": 26_022.0, + }, + 2022: { + "renter": 33_402.0, + "owner_with_mortgage": 32_949.0, + "owner_without_mortgage": 27_679.0, + }, + 2023: { + "renter": 36_606.0, + "owner_with_mortgage": 36_192.0, + "owner_without_mortgage": 30_347.0, + }, + 2024: { + "renter": 39_430.0, + "owner_with_mortgage": 39_068.0, + "owner_without_mortgage": 32_586.0, + }, +} + +LATEST_PUBLISHED_SPM_THRESHOLD_YEAR = max(PUBLISHED_SPM_REFERENCE_THRESHOLDS) +REFERENCE_RAW_SCALE = 3**0.7 +TENURE_HOUSING_SHARES = { + "owner_with_mortgage": 0.434, + "owner_without_mortgage": 0.323, + "renter": 0.443, +} TENURE_CODE_MAP = { 1: "owner_with_mortgage", @@ -10,6 +72,74 @@ } +@lru_cache(maxsize=1) +def _get_cpi_u_parameter(): + system = CountryTaxBenefitSystem() + return system.parameters.gov.bls.cpi.cpi_u + + +def spm_equivalence_scale(num_adults, num_children): + adults, children = np.broadcast_arrays( + np.asarray(num_adults, dtype=float), + np.asarray(num_children, dtype=float), + ) + + raw = np.zeros_like(adults, dtype=float) + has_people = (adults + children) > 0 + with_children = has_people & (children > 0) + + single_adult_with_children = with_children & (adults <= 1) + raw[single_adult_with_children] = ( + 1.0 + 0.8 + 0.5 * np.maximum(children[single_adult_with_children] - 1, 0) + ) ** 0.7 + + multi_adult_with_children = with_children & ~single_adult_with_children + raw[multi_adult_with_children] = ( + adults[multi_adult_with_children] + 0.5 * children[multi_adult_with_children] + ) ** 0.7 + + no_children = has_people & ~with_children + one_adult = no_children & (adults <= 1) + two_adults = no_children & (adults == 2) + larger_adult_units = no_children & (adults > 2) + + raw[one_adult] = 1.0 + raw[two_adults] = 1.41 + raw[larger_adult_units] = adults[larger_adult_units] ** 0.7 + + return raw / REFERENCE_RAW_SCALE + + +def get_spm_reference_thresholds(year: int) -> dict[str, float]: + if year in PUBLISHED_SPM_REFERENCE_THRESHOLDS: + return PUBLISHED_SPM_REFERENCE_THRESHOLDS[year].copy() + + if year < min(PUBLISHED_SPM_REFERENCE_THRESHOLDS): + raise ValueError( + f"No published SPM reference thresholds for {year}. " + f"Earliest available year is {min(PUBLISHED_SPM_REFERENCE_THRESHOLDS)}." + ) + + cpi_u = _get_cpi_u_parameter() + factor = float( + cpi_u(f"{year}-02-01") / cpi_u(f"{LATEST_PUBLISHED_SPM_THRESHOLD_YEAR}-02-01") + ) + latest_thresholds = PUBLISHED_SPM_REFERENCE_THRESHOLDS[ + LATEST_PUBLISHED_SPM_THRESHOLD_YEAR + ] + return {tenure: value * factor for tenure, value in latest_thresholds.items()} + + +def calculate_geoadj_from_rent( + local_rent, + national_rent: float, + tenure: str = "renter", +): + share = TENURE_HOUSING_SHARES[tenure] + rent_ratio = np.asarray(local_rent, dtype=float) / float(national_rent) + return rent_ratio * share + (1.0 - share) + + def calculate_spm_thresholds_with_geoadj( num_adults: np.ndarray, num_children: np.ndarray, @@ -35,8 +165,7 @@ def calculate_spm_thresholds_with_geoadj( Returns: Array of SPM threshold values. """ - calc = SPMCalculator(year=year) - base_thresholds = calc.get_base_thresholds() + base_thresholds = get_spm_reference_thresholds(year) n = len(num_adults) thresholds = np.zeros(n) diff --git a/tests/unit/test_census_spm_housing_targets.py b/tests/unit/test_census_spm_housing_targets.py new file mode 100644 index 000000000..e595848d1 --- /dev/null +++ b/tests/unit/test_census_spm_housing_targets.py @@ -0,0 +1,174 @@ +import importlib +import sys +import types + +import pandas as pd +import validation.benefit_validation as benefit_validation + +from policyengine_us_data.db import etl_national_targets +from policyengine_us_data.storage.calibration_targets.pull_hardcoded_targets import ( + pull_hardcoded_targets, +) +from policyengine_us_data.utils.census_spm import ( + build_census_spm_capped_housing_subsidy_target, + get_census_spm_capped_housing_subsidy_total, +) +from policyengine_us_data.utils.hud_housing import ( + build_hud_user_housing_assistance_benchmark, + get_hud_user_housing_benchmark, +) + + +def _write_census_cps(storage_dir, year=2024): + path = storage_dir / f"census_cps_{year}.h5" + with pd.HDFStore(path, mode="w") as store: + store["spm_unit"] = pd.DataFrame( + { + "SPM_CAPHOUSESUB": [1_000.0, 0.0, 2_000.0], + "SPM_WEIGHT": [150, 250, 100], + } + ) + return path + + +def test_get_census_spm_capped_housing_subsidy_total_uses_raw_cps_spm_values( + tmp_path, +): + _write_census_cps(tmp_path) + + total = get_census_spm_capped_housing_subsidy_total(2024, storage_folder=tmp_path) + + expected = (1_000 * 150 + 0 * 250 + 2_000 * 100) / 100 + assert total == expected + + +def test_pull_hardcoded_targets_uses_census_spm_housing_total(tmp_path): + _write_census_cps(tmp_path) + + targets = pull_hardcoded_targets(year=2024, storage_folder=tmp_path) + housing = targets.loc[ + targets.VARIABLE == "spm_unit_capped_housing_subsidy", "VALUE" + ].iat[0] + + assert housing == (1_000 * 150 + 0 * 250 + 2_000 * 100) / 100 + + +def test_extract_national_targets_uses_census_spm_housing_target(monkeypatch): + class DummyMicrosimulation: + def __init__(self, dataset): + self.default_calculation_period = 2024 + + policyengine_us = importlib.import_module("policyengine_us") + monkeypatch.setitem( + sys.modules, + "policyengine_us", + types.SimpleNamespace( + **{ + name: getattr(policyengine_us, name) + for name in dir(policyengine_us) + if not name.startswith("__") and name != "Microsimulation" + }, + Microsimulation=DummyMicrosimulation, + ), + ) + monkeypatch.setattr( + etl_national_targets, + "build_census_spm_capped_housing_subsidy_target", + lambda year, storage_folder=None: { + "variable": "spm_unit_capped_housing_subsidy", + "value": 123.0, + "source": "Census CPS ASEC public-use SPM_CAPHOUSESUB", + "notes": "Capped SPM housing subsidy total from raw Census CPS ASEC.", + "year": year, + }, + ) + + targets = etl_national_targets.extract_national_targets(2024) + housing = next( + target + for target in targets["direct_sum_targets"] + if target["variable"] == "spm_unit_capped_housing_subsidy" + ) + + assert housing["value"] == 123.0 + assert housing["source"] == "Census CPS ASEC public-use SPM_CAPHOUSESUB" + assert "Capped SPM housing subsidy" in housing["notes"] + + +def test_build_census_spm_capped_housing_subsidy_target_labels_concept(tmp_path): + _write_census_cps(tmp_path) + + target = build_census_spm_capped_housing_subsidy_target( + 2024, storage_folder=tmp_path + ) + + assert target["variable"] == "spm_unit_capped_housing_subsidy" + assert target["value"] == (1_000 * 150 + 0 * 250 + 2_000 * 100) / 100 + assert target["source"] == "Census CPS ASEC public-use SPM_CAPHOUSESUB" + assert "not HUD spending" in target["notes"] + + +def test_get_hud_user_housing_benchmark_uses_annual_spending_concept(): + benchmark = get_hud_user_housing_benchmark(2022) + + assert benchmark["reported_households"] == 4_537_614 + assert benchmark["average_monthly_spending_per_unit"] == 899 + assert benchmark["annual_spending_total"] == 48_951_779_832 + + +def test_build_hud_user_housing_assistance_benchmark_labels_admin_concept(): + benchmark = build_hud_user_housing_assistance_benchmark(2022) + + assert benchmark["variable"] == "housing_assistance" + assert benchmark["source"] == "HUD USER Picture of Subsidized Households" + assert benchmark["reported_households"] == 4_537_614 + assert benchmark["annual_spending_total"] == 48_951_779_832 + assert "not Census SPM capped subsidy" in benchmark["notes"] + + +def test_get_program_benchmarks_keeps_housing_concepts_separate(monkeypatch): + monkeypatch.setattr( + benefit_validation, + "build_census_spm_capped_housing_subsidy_target", + lambda year: { + "variable": "spm_unit_capped_housing_subsidy", + "value": 111e9, + "source": "Census benchmark", + "notes": "Census concept", + "year": year, + }, + ) + monkeypatch.setattr( + benefit_validation, + "build_hud_user_housing_assistance_benchmark", + lambda year: { + "variable": "housing_assistance", + "annual_spending_total": 222e9, + "reported_households": 3_000_000, + "average_monthly_spending_per_unit": 999, + "source": "HUD USER benchmark", + "notes": "HUD concept", + "year": year, + }, + ) + + benchmarks = benefit_validation.get_program_benchmarks(2022) + + assert ( + benchmarks["housing_spm_capped"]["variable"] + == "spm_unit_capped_housing_subsidy" + ) + assert benchmarks["housing_spm_capped"]["benchmark_total"] == 111 + assert benchmarks["housing_spm_capped"]["benchmark_source"] == "Census benchmark" + + assert benchmarks["housing_assistance_hud_user"]["variable"] == "housing_assistance" + assert benchmarks["housing_assistance_hud_user"]["benchmark_total"] == 222 + assert ( + benchmarks["housing_assistance_hud_user"]["benchmark_source"] + == "HUD USER benchmark" + ) + assert ( + benchmarks["housing_assistance_hud_user"]["benchmark_participants_millions"] + == 3 + ) + assert benchmarks["housing_assistance_hud_user"]["map_to"] == "household" diff --git a/tests/unit/test_extended_cps.py b/tests/unit/test_extended_cps.py index e32172db2..7accde701 100644 --- a/tests/unit/test_extended_cps.py +++ b/tests/unit/test_extended_cps.py @@ -11,6 +11,7 @@ import pandas as pd import pytest +from policyengine_us_data.datasets.cps.enhanced_cps import initialize_weight_priors from policyengine_us_data.calibration.puf_impute import ( IMPUTED_VARIABLES, OVERRIDDEN_IMPUTED_VARIABLES, @@ -19,6 +20,7 @@ CPS_ONLY_IMPUTED_VARIABLES, CPS_STAGE2_INCOME_PREDICTORS, apply_retirement_constraints, + rebuild_cloned_spm_thresholds, derive_clone_capped_childcare_expenses, reconcile_ss_subcomponents, ) @@ -111,6 +113,7 @@ def test_pension_income_not_in_cps_only(self): should_not_be_here = { "taxable_private_pension_income", "tax_exempt_private_pension_income", + "spm_unit_spm_threshold", } present = should_not_be_here & set(CPS_ONLY_IMPUTED_VARIABLES) assert present == set(), ( @@ -198,6 +201,81 @@ def test_falls_back_to_zero_without_parent_proxies(self): np.testing.assert_allclose(result, np.array([0.0])) +class TestDeterministicSpmThresholdRebuild: + def test_rebuild_cloned_spm_thresholds_uses_donor_geoadj(self): + data = { + "age": {2024: np.array([40, 12, 70, 40, 12, 70], dtype=np.int32)}, + "person_spm_unit_id": { + 2024: np.array([10, 10, 20, 30, 30, 40], dtype=np.int32) + }, + "spm_unit_id": {2024: np.array([10, 20, 30, 40], dtype=np.int32)}, + "spm_unit_tenure_type": { + 2024: np.array( + [ + b"RENTER", + b"OWNER_WITHOUT_MORTGAGE", + b"RENTER", + b"OWNER_WITHOUT_MORTGAGE", + ] + ) + }, + "spm_unit_spm_threshold": { + 2024: np.array( + [20_000.0, 15_000.0, 999_999.0, 999_999.0], dtype=np.float64 + ) + }, + } + + rebuilt = rebuild_cloned_spm_thresholds(data, 2024) + + np.testing.assert_allclose(rebuilt[:2], np.array([20_000.0, 15_000.0])) + np.testing.assert_allclose(rebuilt[2:], np.array([20_000.0, 15_000.0])) + + def test_rebuild_cloned_spm_thresholds_rejects_unknown_tenure(self): + data = { + "age": {2024: np.array([40, 12, 40, 12], dtype=np.int32)}, + "person_spm_unit_id": {2024: np.array([10, 10, 20, 20], dtype=np.int32)}, + "spm_unit_id": {2024: np.array([10, 20], dtype=np.int32)}, + "spm_unit_tenure_type": { + 2024: np.array([b"RENTER", b"UNRECOGNIZED"], dtype="|S12") + }, + "spm_unit_spm_threshold": { + 2024: np.array([20_000.0, 20_000.0], dtype=np.float64) + }, + } + + with pytest.raises(ValueError, match="Unsupported spm_unit_tenure_type"): + rebuild_cloned_spm_thresholds(data, 2024) + + +class TestWeightPriorInitialization: + def test_initialize_weight_priors_keeps_zero_weight_records_near_zero(self): + weights = np.array([1_500.0, 0.0, 625.0, 0.0], dtype=np.float64) + + priors = initialize_weight_priors(weights, seed=123) + + assert np.all(priors > 0) + assert priors[1] < 1e-4 + assert priors[3] < 1e-4 + assert priors[0] == pytest.approx(1_500.0) + assert priors[2] == pytest.approx(625.0) + + def test_initialize_weight_priors_preserves_positive_weights_exactly(self): + weights = np.array([1_500.0, 625.0, 42.0], dtype=np.float64) + + priors = initialize_weight_priors(weights, seed=123) + + np.testing.assert_array_equal(priors, weights) + + def test_initialize_weight_priors_is_reproducible(self): + weights = np.array([400.0, 0.0, 100.0], dtype=np.float64) + + priors_a = initialize_weight_priors(weights, seed=77) + priors_b = initialize_weight_priors(weights, seed=77) + + np.testing.assert_allclose(priors_a, priors_b) + + class TestRetirementConstraints: """Post-processing retirement constraints enforce IRS caps.""" diff --git a/validation/benefit_validation.py b/validation/benefit_validation.py index cf4689720..14360782d 100644 --- a/validation/benefit_validation.py +++ b/validation/benefit_validation.py @@ -9,41 +9,75 @@ import numpy as np from policyengine_us import Microsimulation from policyengine_us_data.datasets.cps.enhanced_cps import EnhancedCPS - - -def analyze_benefit_underreporting(): - """Analyze impact of CPS benefit underreporting on results.""" - - sim = Microsimulation(dataset=EnhancedCPS, dataset_year=2022) - - programs = { +from policyengine_us_data.utils.census_spm import ( + build_census_spm_capped_housing_subsidy_target, +) +from policyengine_us_data.utils.hud_housing import ( + build_hud_user_housing_assistance_benchmark, +) + + +def get_program_benchmarks(year: int): + housing_spm_target = build_census_spm_capped_housing_subsidy_target(year) + housing_hud_target = build_hud_user_housing_assistance_benchmark(year) + return { "snap": { "variable": "snap", - "admin_total": 114.1, # billions, from USDA + "benchmark_total": 114.1, # billions, from USDA + "benchmark_source": "USDA", "participation_rate": 0.82, # from USDA + "weight_variable": "spm_unit_weight", }, "ssi": { "variable": "ssi", - "admin_total": 56.7, # from SSA + "benchmark_total": 56.7, # from SSA + "benchmark_source": "SSA", "participation_rate": 0.93, + "weight_variable": "spm_unit_weight", }, "tanf": { "variable": "tanf", - "admin_total": 7.1, # from HHS + "benchmark_total": 7.1, # from HHS + "benchmark_source": "HHS", "participation_rate": 0.23, + "weight_variable": "spm_unit_weight", }, - "housing": { - "variable": "housing_benefit", - "admin_total": 50.3, # from HUD - "participation_rate": 0.76, + "housing_spm_capped": { + "variable": "spm_unit_capped_housing_subsidy", + "benchmark_total": housing_spm_target["value"] / 1e9, + "benchmark_source": housing_spm_target["source"], + "participation_rate": np.nan, + "weight_variable": "spm_unit_weight", + }, + "housing_assistance_hud_user": { + "variable": "housing_assistance", + "benchmark_total": housing_hud_target["annual_spending_total"] / 1e9, + "benchmark_source": housing_hud_target["source"], + "benchmark_participants_millions": ( + housing_hud_target["reported_households"] / 1e6 + ), + "participation_rate": np.nan, + "weight_variable": "household_weight", + "map_to": "household", }, } + +def analyze_benefit_underreporting(): + """Analyze impact of CPS benefit underreporting on results.""" + + year = 2022 + sim = Microsimulation(dataset=EnhancedCPS, dataset_year=year) + programs = get_program_benchmarks(year) + results = [] for program, info in programs.items(): # Calculate totals - benefit = sim.calculate(info["variable"], 2022) - weight = sim.calculate("household_weight", 2022) + benefit_kwargs = {} + if info.get("map_to"): + benefit_kwargs["map_to"] = info["map_to"] + benefit = sim.calculate(info["variable"], year, **benefit_kwargs) + weight = sim.calculate(info["weight_variable"], year) # Total benefits total = (benefit * weight).sum() / 1e9 # billions @@ -53,15 +87,24 @@ def analyze_benefit_underreporting(): weighted_participants = ((benefit > 0) * weight).sum() / 1e6 # millions # Underreporting factor - underreporting = info["admin_total"] / total if total > 0 else np.inf + benchmark_ratio = info["benchmark_total"] / total if total > 0 else np.inf + benchmark_participants = info.get("benchmark_participants_millions") + participant_ratio = ( + weighted_participants / benchmark_participants + if benchmark_participants not in (None, 0) + else np.nan + ) results.append( { "program": program, "enhanced_cps_total": total, - "admin_total": info["admin_total"], - "underreporting_factor": underreporting, + "benchmark_total": info["benchmark_total"], + "benchmark_source": info["benchmark_source"], + "benchmark_ratio": benchmark_ratio, "participants_millions": weighted_participants, + "benchmark_participants_millions": benchmark_participants, + "participant_ratio": participant_ratio, "mean_benefit": ( benefit[benefit > 0].mean() if (benefit > 0).any() else 0 ), @@ -77,11 +120,11 @@ def validate_program_interactions(): sim = Microsimulation(dataset=EnhancedCPS, dataset_year=2022) # Get program benefits - snap = sim.calculate("snap", 2022) > 0 - medicaid = sim.calculate("medicaid", 2022) > 0 - ssi = sim.calculate("ssi", 2022) > 0 - tanf = sim.calculate("tanf", 2022) > 0 - housing = sim.calculate("housing_benefit", 2022) > 0 + snap = sim.calculate("snap", 2022, map_to="household") > 0 + medicaid = sim.calculate("medicaid", 2022, map_to="household") > 0 + ssi = sim.calculate("ssi", 2022, map_to="household") > 0 + tanf = sim.calculate("tanf", 2022, map_to="household") > 0 + housing = sim.calculate("housing_assistance", 2022, map_to="household") > 0 weight = sim.calculate("household_weight", 2022)