Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/703.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added explicit clone-origin flags to extended/enhanced CPS datasets and saved ECPS clone diagnostics for clone weight share, modeled-only-poor share, and extreme childcare/tax checks.
22 changes: 22 additions & 0 deletions policyengine_us_data/calibration/puf_impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@

logger = logging.getLogger(__name__)

CLONE_ORIGIN_FLAGS = {
"person": "person_is_puf_clone",
"tax_unit": "tax_unit_is_puf_clone",
"spm_unit": "spm_unit_is_puf_clone",
"family": "family_is_puf_clone",
"household": "household_is_puf_clone",
}

PUF_SUBSAMPLE_TARGET = 20_000
PUF_TOP_PERCENTILE = 99.5

Expand Down Expand Up @@ -531,6 +539,20 @@ def _map_to_entity(pred_values, variable_name):
time_period: np.concatenate([state_fips, state_fips]).astype(np.int32)
}

for entity_key, flag_name in CLONE_ORIGIN_FLAGS.items():
id_variable = f"{entity_key}_id"
if id_variable not in data:
continue
n_entities = len(data[id_variable][time_period])
new_data[flag_name] = {
time_period: np.concatenate(
[
np.zeros(n_entities, dtype=np.int8),
np.ones(n_entities, dtype=np.int8),
]
)
}

if y_full:
for var in IMPUTED_VARIABLES:
if var not in data:
Expand Down
240 changes: 240 additions & 0 deletions policyengine_us_data/datasets/cps/enhanced_cps.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import json
from pathlib import Path

import h5py
from policyengine_core.data import Dataset
import pandas as pd
from policyengine_us_data.utils import (
Expand Down Expand Up @@ -28,6 +32,225 @@
torch = None


def _to_numpy(value) -> np.ndarray:
return np.asarray(getattr(value, "values", value))


def _weighted_share(mask, weights) -> float:
weights = np.asarray(weights, dtype=np.float64)
total_weight = float(weights.sum())
if total_weight <= 0:
return 0.0
mask = np.asarray(mask, dtype=bool)
return 100 * float(weights[mask].sum()) / total_weight


def compute_clone_diagnostics_summary(
*,
household_is_puf_clone,
household_weight,
person_is_puf_clone,
person_weight,
person_in_poverty,
person_reported_in_poverty,
spm_unit_is_puf_clone,
spm_unit_weight,
spm_unit_capped_work_childcare_expenses,
spm_unit_pre_subsidy_childcare_expenses,
spm_unit_taxes,
spm_unit_market_income,
) -> dict[str, float]:
household_is_puf_clone = np.asarray(household_is_puf_clone, dtype=bool)
household_weight = np.asarray(household_weight, dtype=np.float64)
person_is_puf_clone = np.asarray(person_is_puf_clone, dtype=bool)
person_weight = np.asarray(person_weight, dtype=np.float64)
person_in_poverty = np.asarray(person_in_poverty, dtype=bool)
person_reported_in_poverty = np.asarray(person_reported_in_poverty, dtype=bool)
spm_unit_is_puf_clone = np.asarray(spm_unit_is_puf_clone, dtype=bool)
spm_unit_weight = np.asarray(spm_unit_weight, dtype=np.float64)
capped_childcare = np.asarray(
spm_unit_capped_work_childcare_expenses, dtype=np.float64
)
pre_subsidy_childcare = np.asarray(
spm_unit_pre_subsidy_childcare_expenses, dtype=np.float64
)
spm_unit_taxes = np.asarray(spm_unit_taxes, dtype=np.float64)
spm_unit_market_income = np.asarray(spm_unit_market_income, dtype=np.float64)

poor_modeled_only = person_in_poverty & ~person_reported_in_poverty
clone_spm_weight = spm_unit_weight[spm_unit_is_puf_clone].sum()

return {
"clone_household_weight_share_pct": _weighted_share(
household_is_puf_clone, household_weight
),
"clone_person_weight_share_pct": _weighted_share(
person_is_puf_clone, person_weight
),
"clone_poor_modeled_only_person_weight_share_pct": _weighted_share(
person_is_puf_clone & poor_modeled_only,
person_weight,
),
"poor_modeled_only_within_clone_person_weight_share_pct": (
0.0
if person_weight[person_is_puf_clone].sum() <= 0
else _weighted_share(
poor_modeled_only[person_is_puf_clone],
person_weight[person_is_puf_clone],
)
),
"clone_childcare_exceeds_pre_subsidy_share_pct": (
0.0
if clone_spm_weight <= 0
else _weighted_share(
capped_childcare[spm_unit_is_puf_clone]
> pre_subsidy_childcare[spm_unit_is_puf_clone] + 1,
spm_unit_weight[spm_unit_is_puf_clone],
)
),
"clone_childcare_above_5000_share_pct": (
0.0
if clone_spm_weight <= 0
else _weighted_share(
capped_childcare[spm_unit_is_puf_clone] > 5_000,
spm_unit_weight[spm_unit_is_puf_clone],
)
),
"clone_taxes_exceed_market_income_share_pct": (
0.0
if clone_spm_weight <= 0
else _weighted_share(
spm_unit_taxes[spm_unit_is_puf_clone]
> spm_unit_market_income[spm_unit_is_puf_clone] + 1,
spm_unit_weight[spm_unit_is_puf_clone],
)
),
}


def _load_saved_period_array(
file_path: str | Path,
variable_name: str,
period: int,
) -> np.ndarray:
with h5py.File(file_path, "r") as h5_file:
obj = h5_file[variable_name]
if isinstance(obj, h5py.Dataset):
return np.asarray(obj[...])
period_key = str(period)
if period_key in obj:
return np.asarray(obj[period_key][...])
if period in obj:
return np.asarray(obj[period][...])
raise KeyError(f"{variable_name} missing period {period}")


def clone_diagnostics_path(file_path: str | Path) -> Path:
return Path(file_path).with_suffix(".clone_diagnostics.json")


def build_clone_diagnostics_payload(
period_to_diagnostics: dict[int, dict[str, float]],
) -> dict:
if not period_to_diagnostics:
raise ValueError("Expected at least one period of clone diagnostics")

ordered_periods = sorted(period_to_diagnostics)
if len(ordered_periods) == 1:
period = ordered_periods[0]
diagnostics = dict(period_to_diagnostics[period])
diagnostics["period"] = int(period)
return diagnostics

return {
"periods": {
str(period): period_to_diagnostics[period] for period in ordered_periods
}
}


def write_clone_diagnostics_report(file_path: str | Path, diagnostics: dict) -> Path:
output_path = clone_diagnostics_path(file_path)
output_path.write_text(json.dumps(diagnostics, indent=2, sort_keys=True) + "\n")
return output_path


def refresh_clone_diagnostics_report(
file_path: str | Path,
diagnostics_builder,
) -> Path:
output_path = clone_diagnostics_path(file_path)
if output_path.exists():
output_path.unlink()
diagnostics = diagnostics_builder()
return write_clone_diagnostics_report(file_path, diagnostics)


def save_clone_diagnostics_report(
dataset_cls: Type[Dataset],
*,
start_year: int,
end_year: int,
) -> tuple[Path, dict]:
periods = list(range(start_year, end_year + 1))
output_path = refresh_clone_diagnostics_report(
dataset_cls.file_path,
lambda: build_clone_diagnostics_payload(
{
period: build_clone_diagnostics_for_saved_dataset(
dataset_cls,
period,
)
for period in periods
}
),
)
diagnostics_payload = json.loads(output_path.read_text())
return output_path, diagnostics_payload


def build_clone_diagnostics_for_saved_dataset(
dataset_cls: Type[Dataset], period: int
) -> dict[str, float]:
from policyengine_us import Microsimulation

sim = Microsimulation(dataset=dataset_cls)
dataset_path = Path(dataset_cls.file_path)

person_reported_in_poverty = _to_numpy(
sim.calculate("spm_unit_net_income_reported", period=period, map_to="person")
) < _to_numpy(
sim.calculate("spm_unit_spm_threshold", period=period, map_to="person")
)

return compute_clone_diagnostics_summary(
household_is_puf_clone=_load_saved_period_array(
dataset_path, "household_is_puf_clone", period
),
household_weight=_to_numpy(sim.calculate("household_weight", period=period)),
person_is_puf_clone=_load_saved_period_array(
dataset_path, "person_is_puf_clone", period
),
person_weight=_to_numpy(sim.calculate("person_weight", period=period)),
person_in_poverty=_to_numpy(sim.calculate("person_in_poverty", period=period)),
person_reported_in_poverty=person_reported_in_poverty,
spm_unit_is_puf_clone=_load_saved_period_array(
dataset_path, "spm_unit_is_puf_clone", period
),
spm_unit_weight=_to_numpy(sim.calculate("spm_unit_weight", period=period)),
spm_unit_capped_work_childcare_expenses=_to_numpy(
sim.calculate("spm_unit_capped_work_childcare_expenses", period=period)
),
spm_unit_pre_subsidy_childcare_expenses=_to_numpy(
sim.calculate("spm_unit_pre_subsidy_childcare_expenses", period=period)
),
spm_unit_taxes=_to_numpy(sim.calculate("spm_unit_taxes", period=period)),
spm_unit_market_income=_to_numpy(
sim.calculate("spm_unit_market_income", period=period)
),
)


def _get_period_array(period_values: dict, period: int) -> np.ndarray:
"""Get a period array from a TIME_PERIOD_ARRAYS variable dict."""
value = period_values.get(period)
Expand Down Expand Up @@ -313,6 +536,23 @@ def generate(self):
logging.info("Post-generation weight validation passed")

self.save_dataset(data)
try:
output_path, diagnostics_payload = save_clone_diagnostics_report(
type(self),
start_year=self.start_year,
end_year=self.end_year,
)
logging.info("Saved clone diagnostics to %s", output_path)
logging.info(
"Clone diagnostics summary: %s",
diagnostics_payload,
)
except Exception:
logging.warning(
"Unable to compute clone diagnostics for %s",
self.file_path,
exc_info=True,
)


class ReweightedCPS_2024(Dataset):
Expand Down
Loading
Loading