Skip to content
Draft

idk #1049

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# affirm/group_bys/paybright/repayment_data_v1_0.py

from ai.chronon.api.ttypes import Source, EntitySource, MetaData
from ai.chronon.query import Query, select
from ai.chronon.group_by import GroupBy

# This source uses a StagingQuery to automatically handle S3 YYYY/MM/DD partitioning.
# No manual view creation needed - Chronon handles everything automatically!

from ai.chronon.staging_query import StagingQuery
from ai.chronon.utils import get_staging_query_output_table_name

# StagingQuery automatically converts your S3 partitioning to Chronon's expected format
paybright_staging_query = StagingQuery(
query="""
SELECT *,
CONCAT(year, '-', LPAD(month, 2, '0'), '-', LPAD(day, 2, '0')) as ds
FROM (
SELECT *,
INPUT_FILE_NAME() as file_path,
REGEXP_EXTRACT(INPUT_FILE_NAME(), '.*/([0-9]{4})/([0-9]{1,2})/([0-9]{1,2})/.*', 1) as year,
REGEXP_EXTRACT(INPUT_FILE_NAME(), '.*/([0-9]{4})/([0-9]{1,2})/([0-9]{1,2})/.*', 2) as month,
REGEXP_EXTRACT(INPUT_FILE_NAME(), '.*/([0-9]{4})/([0-9]{1,2})/([0-9]{1,2})/.*', 3) as day
FROM parquet.`s3://affirm-risk-sherlock-ca/feature-store/paybright_repayment_data/v1`
WHERE ds BETWEEN '{{ start_date }}' AND '{{ end_date }}'
)
""",
startPartition="2025-01-01", # Adjust to your data start date
metaData=MetaData(
name="paybright_repayment_data_v1",
team="affirm",
outputNamespace="affirm"
)
)

paybright_snapshot_src = Source(
entities=EntitySource(
snapshotTable=get_staging_query_output_table_name(paybright_staging_query.v1), # ← Auto-generated table
query=Query(
selects=select(
# entity key
"user_phone_number",
# timestamps
"snapshot_time",
# features
"galactus__paybright__user__history_average_lateness_days__v1",
"galactus__paybright__user__history_average_zeroed_lateness_days__v1",
"galactus__paybright__user__history_days_since_last_payment__v1",
"galactus__paybright__user__history_max_lateness_days__v1",
"galactus__paybright__user__history_num_outstanding_loans__v1",
"galactus__paybright__user__history_num_payments_last_60d__v1",
"galactus__paybright__user__history_prop_fully_paid_off_loans__il__v1",
"galactus__paybright__user__history_prop_fully_paid_off_loans__sp__v1",
"galactus__paybright__user__history_total_payment_amount_cents_60d__v1",
"galactus__paybright__user__history_total_payment_amount_cents__v1",
)
),
)
)

# Passthrough snapshot → no aggregations
paybright_repayment_data_v1_0 = GroupBy(
sources=[paybright_snapshot_src],
keys=["user_phone_number"],
aggregations=None,
)
5 changes: 5 additions & 0 deletions api/py/test/sample/teams.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,10 @@
"cs_ds": {
"description": "Used for unit testing purposes",
"namespace": "default"
},
"affirm": {
"description": "Affirm team configurations",
"namespace": "default"
}
}

74 changes: 74 additions & 0 deletions group_bys/affirm/instrument_risk/instrument_risk_data_v1_0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# affirm/group_bys/instrument_risk/instrument_risk_data_v1_0.py

from ai.chronon.api.ttypes import Source, EntitySource, MetaData, Aggregation, Operation, Window, TimeUnit
from ai.chronon.query import Query, select
from ai.chronon.group_by import GroupBy

# This source uses a StagingQuery to automatically handle S3 YYYY/MM/DD partitioning.
# No manual view creation needed - Chronon handles everything automatically!

from ai.chronon.utils import get_staging_query_output_table_name
from staging_queries.affirm.instrument_risk_staging_query import v1 as instrument_risk_staging_query

instrument_risk_snapshot_src = Source(
entities=EntitySource(
snapshotTable=get_staging_query_output_table_name(instrument_risk_staging_query), # ← Auto-generated table
query=Query(
selects=select(
# entity key
"user_ari",
# timestamps
"snapshot_time",
"processed_time",
# features
"model_name",
"model_version",
"galactus__predict_pay__instrument_aris__v1",
"galactus__predict_pay__instrument_risk_scores__v1",
),
time_column="snapshot_time" # Required for windowed aggregations
),
)
)

# Add meaningful aggregations for instrument risk data analysis
instrument_risk_data_v1_0 = GroupBy(
sources=[instrument_risk_snapshot_src],
keys=["user_ari"],
aggregations=[
# Model version tracking
Aggregation(
inputColumn="model_version",
operation=Operation.LAST,
windows=[Window(7, TimeUnit.DAYS), Window(30, TimeUnit.DAYS)]
),

# Risk score aggregations (for arrays, we'll use LAST for now)
Aggregation(
inputColumn="galactus__predict_pay__instrument_risk_scores__v1",
operation=Operation.LAST,
windows=[Window(7, TimeUnit.DAYS), Window(30, TimeUnit.DAYS), Window(90, TimeUnit.DAYS)]
),

# Instrument ARIs tracking
Aggregation(
inputColumn="galactus__predict_pay__instrument_aris__v1",
operation=Operation.LAST,
windows=[Window(7, TimeUnit.DAYS), Window(30, TimeUnit.DAYS), Window(90, TimeUnit.DAYS)]
),

# Model name tracking
Aggregation(
inputColumn="model_name",
operation=Operation.LAST,
windows=[Window(7, TimeUnit.DAYS), Window(30, TimeUnit.DAYS)]
),

# Count of predictions over time
Aggregation(
inputColumn="snapshot_time",
operation=Operation.COUNT,
windows=[Window(7, TimeUnit.DAYS), Window(30, TimeUnit.DAYS), Window(90, TimeUnit.DAYS)]
),
],
)
108 changes: 108 additions & 0 deletions group_bys/affirm/paybright/repayment_data_v1_0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# affirm/group_bys/paybright/repayment_data_v1_0.py

from ai.chronon.api.ttypes import Source, EntitySource, MetaData, Aggregation, Operation, Window, TimeUnit
from ai.chronon.query import Query, select
from ai.chronon.group_by import GroupBy

# This source uses a StagingQuery to automatically handle S3 YYYY/MM/DD partitioning.
# No manual view creation needed - Chronon handles everything automatically!

from ai.chronon.utils import get_staging_query_output_table_name
from staging_queries.affirm.paybright_staging_query import v1 as paybright_staging_query

paybright_snapshot_src = Source(
entities=EntitySource(
snapshotTable=get_staging_query_output_table_name(paybright_staging_query), # ← Auto-generated table
query=Query(
selects=select(
# entity key
"user_phone_number",
# timestamps
"snapshot_time",
# features
"galactus__paybright__user__history_average_lateness_days__v1",
"galactus__paybright__user__history_average_zeroed_lateness_days__v1",
"galactus__paybright__user__history_days_since_last_payment__v1",
"galactus__paybright__user__history_max_lateness_days__v1",
"galactus__paybright__user__history_num_outstanding_loans__v1",
"galactus__paybright__user__history_num_payments_last_60d__v1",
"galactus__paybright__user__history_prop_fully_paid_off_loans__il__v1",
"galactus__paybright__user__history_prop_fully_paid_off_loans__sp__v1",
"galactus__paybright__user__history_total_payment_amount_cents_60d__v1",
"galactus__paybright__user__history_total_payment_amount_cents__v1",
),
time_column="snapshot_time" # Required for windowed aggregations
),
)
)

# Add meaningful aggregations for repayment data analysis
paybright_repayment_data_v1_0 = GroupBy(
sources=[paybright_snapshot_src],
keys=["user_phone_number"],
aggregations=[
# Average lateness metrics
Aggregation(
inputColumn="galactus__paybright__user__history_average_lateness_days__v1",
operation=Operation.AVERAGE,
windows=[Window(7, TimeUnit.DAYS), Window(30, TimeUnit.DAYS), Window(90, TimeUnit.DAYS)]
),
Aggregation(
inputColumn="galactus__paybright__user__history_average_zeroed_lateness_days__v1",
operation=Operation.AVERAGE,
windows=[Window(7, TimeUnit.DAYS), Window(30, TimeUnit.DAYS), Window(90, TimeUnit.DAYS)]
),

# Max lateness
Aggregation(
inputColumn="galactus__paybright__user__history_max_lateness_days__v1",
operation=Operation.MAX,
windows=[Window(7, TimeUnit.DAYS), Window(30, TimeUnit.DAYS), Window(90, TimeUnit.DAYS)]
),

# Days since last payment
Aggregation(
inputColumn="galactus__paybright__user__history_days_since_last_payment__v1",
operation=Operation.MIN,
windows=[Window(7, TimeUnit.DAYS), Window(30, TimeUnit.DAYS)]
),

# Outstanding loans count
Aggregation(
inputColumn="galactus__paybright__user__history_num_outstanding_loans__v1",
operation=Operation.COUNT,
windows=[Window(7, TimeUnit.DAYS), Window(30, TimeUnit.DAYS), Window(90, TimeUnit.DAYS)]
),

# Payment frequency
Aggregation(
inputColumn="galactus__paybright__user__history_num_payments_last_60d__v1",
operation=Operation.SUM,
windows=[Window(7, TimeUnit.DAYS), Window(30, TimeUnit.DAYS)]
),

# Payment success rates
Aggregation(
inputColumn="galactus__paybright__user__history_prop_fully_paid_off_loans__il__v1",
operation=Operation.AVERAGE,
windows=[Window(30, TimeUnit.DAYS), Window(90, TimeUnit.DAYS)]
),
Aggregation(
inputColumn="galactus__paybright__user__history_prop_fully_paid_off_loans__sp__v1",
operation=Operation.AVERAGE,
windows=[Window(30, TimeUnit.DAYS), Window(90, TimeUnit.DAYS)]
),

# Payment amounts
Aggregation(
inputColumn="galactus__paybright__user__history_total_payment_amount_cents_60d__v1",
operation=Operation.SUM,
windows=[Window(7, TimeUnit.DAYS), Window(30, TimeUnit.DAYS)]
),
Aggregation(
inputColumn="galactus__paybright__user__history_total_payment_amount_cents__v1",
operation=Operation.SUM,
windows=[Window(7, TimeUnit.DAYS), Window(30, TimeUnit.DAYS), Window(90, TimeUnit.DAYS)]
),
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
{
"metaData": {
"name": "affirm.instrument_risk.instrument_risk_data_v1_0.instrument_risk_data_v1_0",
"customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}",
"dependencies": [
"{\"name\": \"wait_for_affirm_instrument_risk_staging_query_v1_ds\", \"spec\": \"affirm_instrument_risk_staging_query_v1/ds={{ ds }}\", \"start\": null, \"end\": null}"
],
"tableProperties": {
"source": "chronon"
},
"outputNamespace": "default",
"team": "affirm",
"offlineSchedule": "@daily"
},
"sources": [
{
"entities": {
"snapshotTable": "affirm_instrument_risk_staging_query_v1",
"query": {
"selects": {
"user_ari": "user_ari",
"snapshot_time": "snapshot_time",
"processed_time": "processed_time",
"model_name": "model_name",
"model_version": "model_version",
"galactus__predict_pay__instrument_aris__v1": "galactus__predict_pay__instrument_aris__v1",
"galactus__predict_pay__instrument_risk_scores__v1": "galactus__predict_pay__instrument_risk_scores__v1"
},
"timeColumn": "snapshot_time",
"setups": []
}
}
}
],
"keyColumns": [
"user_ari"
],
"aggregations": [
{
"inputColumn": "model_version",
"operation": 3,
"windows": [
{
"length": 7,
"timeUnit": 1
},
{
"length": 30,
"timeUnit": 1
}
]
},
{
"inputColumn": "galactus__predict_pay__instrument_risk_scores__v1",
"operation": 3,
"windows": [
{
"length": 7,
"timeUnit": 1
},
{
"length": 30,
"timeUnit": 1
},
{
"length": 90,
"timeUnit": 1
}
]
},
{
"inputColumn": "galactus__predict_pay__instrument_aris__v1",
"operation": 3,
"windows": [
{
"length": 7,
"timeUnit": 1
},
{
"length": 30,
"timeUnit": 1
},
{
"length": 90,
"timeUnit": 1
}
]
},
{
"inputColumn": "model_name",
"operation": 3,
"windows": [
{
"length": 7,
"timeUnit": 1
},
{
"length": 30,
"timeUnit": 1
}
]
},
{
"inputColumn": "snapshot_time",
"operation": 6,
"windows": [
{
"length": 7,
"timeUnit": 1
},
{
"length": 30,
"timeUnit": 1
},
{
"length": 90,
"timeUnit": 1
}
]
}
]
}
Loading