Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
51dd54b
update file path for task files
kaylawilding Jul 25, 2025
9b5a8c8
update modeling table path variable
kaylawilding Jul 25, 2025
75f79ea
use local branch
kaylawilding Jul 25, 2025
6394c7a
update filepath for modeling
kaylawilding Jul 25, 2025
fa46ed7
try to remove parallel because workers are crashing
kaylawilding Jul 26, 2025
8af9403
Update support score distribution fn call
kaylawilding Jul 28, 2025
ae3e0d7
fix fn call to support score table
kaylawilding Jul 28, 2025
dc22b74
add hack for inst names with extra under scores
kaylawilding Aug 7, 2025
314dd09
hack job root dir
kaylawilding Aug 8, 2025
6b3d846
add first checkpoint
kaylawilding Aug 8, 2025
21b0f4c
more args hack
kaylawilding Aug 8, 2025
5d2a5ab
test without underscore hack as schema has been renamed
kaylawilding Aug 8, 2025
f90b0fa
Add all checkpoint types
kaylawilding Aug 8, 2025
52778e9
Add filtering fn for inference cohort
kaylawilding Aug 9, 2025
5266fd7
add self
kaylawilding Aug 9, 2025
6954916
add import cohort for selection
kaylawilding Aug 9, 2025
12f1947
remove hack code and sample for testing
kaylawilding Aug 9, 2025
fe6867c
testing job id
kaylawilding Aug 12, 2025
a60e596
change output names to db run id instead of mlflow run id
kaylawilding Aug 21, 2025
2ea93ce
add support scores to table with features with most impact
kaylawilding Aug 22, 2025
0271f86
up top shap features to 10
kaylawilding Aug 25, 2025
b6855a7
Update email texts
kaylawilding Aug 28, 2025
96a2691
add email logging
kaylawilding Sep 1, 2025
4fd38cd
debug
kaylawilding Sep 1, 2025
bc935c0
Lower sample size for testing
kaylawilding Sep 3, 2025
3b06850
use personal cluster for testing
kaylawilding Sep 3, 2025
2c48554
Update github_sourced_pdp_inference_pipeline.yml
kaylawilding Sep 3, 2025
ee0791f
personal cluster
kaylawilding Sep 3, 2025
0c493c3
Update github_sourced_pdp_inference_pipeline.yml
kaylawilding Sep 3, 2025
09a408b
remove sample subsetting
kaylawilding Sep 3, 2025
266920e
add box whiskers
kaylawilding Sep 3, 2025
b4f11a8
Update pdp_model_inference.py
kaylawilding Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

.databricks
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ resources:
tasks:
- task_key: notify_and_publish_results
spark_python_task:
python_file: pipelines/tasks/approve_output/approve_output_task.py
python_file: pipelines/pdp/tasks/approve_output/approve_output_task.py
source: GIT
parameters:
- --institution_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ resources:
tasks:
- task_key: data_ingestion
spark_python_task:
python_file: pipelines/tasks/pdp_data_ingestion/pdp_data_ingestion.py
python_file: pipelines/pdp/tasks/pdp_data_ingestion/pdp_data_ingestion.py
source: GIT
parameters:
- --cohort_file_name
Expand All @@ -26,14 +26,15 @@ resources:
- --custom_schemas_path
- "{{job.parameters.custom_schemas_path}}"
job_cluster_key: pdp-inference-pipeline-cluster
# existing_cluster_id: 5507-145238-6we6v2w3
libraries:
- pypi:
package: git+https://github.com/datakind/student-success-tool.git@${var.pip_install_branch}
- task_key: data_preprocessing
depends_on:
- task_key: data_ingestion
spark_python_task:
python_file: pipelines/tasks/pdp_data_preprocessing/pdp_data_preprocessing.py
python_file: pipelines/pdp/tasks/pdp_data_preprocessing/pdp_data_preprocessing.py
source: GIT
parameters:
- --databricks_institution_name
Expand All @@ -50,9 +51,12 @@ resources:
- "{{tasks.data_ingestion.values.course_dataset_validated_path}}"
- --toml_file_path
- "/Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/inference_inputs/config.toml"
- --inference_toml_file_path
- "/Volumes/{{job.parameters.DB_workspace}}/{{job.parameters.databricks_institution_name}}_gold/gold_volume/inference_inputs/inference_config.toml"
- --custom_schemas_path
- "{{job.parameters.custom_schemas_path}}"
job_cluster_key: pdp-inference-pipeline-cluster
# existing_cluster_id: 5507-145238-6we6v2w3
libraries:
- pypi:
package: git+https://github.com/datakind/student-success-tool.git@${var.pip_install_branch}
Expand All @@ -67,7 +71,7 @@ resources:
depends_on:
- task_key: data_preprocessing
spark_python_task:
python_file: pipelines/tasks/data_validation/data_validation_task.py
python_file: pipelines/pdp/tasks/data_validation/data_validation_task.py
source: GIT
parameters:
- --input_table_path
Expand All @@ -80,6 +84,7 @@ resources:
- SERVING
- "{{job.parameters.fail_on_anomalies}}"
job_cluster_key: pdp-inference-pipeline-cluster
# existing_cluster_id: 5507-145238-6we6v2w3
# - task_key: data_validation
# depends_on:
# - task_key: data_preprocessing
Expand All @@ -91,7 +96,7 @@ resources:
depends_on:
- task_key: data_preprocessing
spark_python_task:
python_file: pipelines/tasks/data_validation/data_validation_task.py
python_file: pipelines/pdp/tasks/data_validation/data_validation_task.py
source: GIT
parameters:
- --input_table_path
Expand All @@ -104,14 +109,15 @@ resources:
- SERVING
- "{{job.parameters.fail_on_anomalies}}"
job_cluster_key: pdp-inference-pipeline-cluster
# existing_cluster_id: 5507-145238-6we6v2w3
libraries:
- pypi:
package: tensorflow_data_validation==1.16.1
- task_key: inference
depends_on:
- task_key: data_validation
spark_python_task:
python_file: pipelines/tasks/pdp_model_inference/pdp_model_inference.py
python_file: pipelines/pdp/tasks/pdp_model_inference/pdp_model_inference.py
source: GIT
parameters:
- --databricks_institution_name
Expand All @@ -135,10 +141,11 @@ resources:
- --DK_CC_EMAIL
- "{{job.parameters.DK_CC_EMAIL}}"
- --modeling_table_path
- "{{job.parameters.DB_workspace}}.{{job.parameters.databricks_institution_name}}_silver.{{job.parameters.databricks_institution_name}}_pdp_modeling_ar_deid"
- "{{job.parameters.DB_workspace}}.{{job.parameters.databricks_institution_name}}_silver.modeling"
- --custom_schemas_path
- "{{job.parameters.custom_schemas_path}}"
job_cluster_key: pdp-inference-pipeline-cluster
# existing_cluster_id: 5507-145238-6we6v2w3
libraries:
- pypi:
package: git+https://github.com/datakind/student-success-tool.git@${var.pip_install_branch}
Expand All @@ -164,7 +171,7 @@ resources:
depends_on:
- task_key: inference
spark_python_task:
python_file: pipelines/tasks/pdp_inference_output_publish/pdp_inference_output_publish.py
python_file: pipelines/pdp/tasks/pdp_inference_output_publish/pdp_inference_output_publish.py
source: GIT
parameters:
- --DB_workspace
Expand All @@ -178,6 +185,7 @@ resources:
- --notification_email
- "{{job.parameters.notification_email}}"
job_cluster_key: pdp-inference-pipeline-cluster
# existing_cluster_id: 5507-145238-6we6v2w3
libraries:
- pypi:
package: git+https://github.com/datakind/student-success-tool.git@${var.pip_install_branch}
Expand Down Expand Up @@ -205,7 +213,7 @@ resources:
git_source:
git_url: https://github.com/datakind/student-success-tool
git_provider: gitHub
git_branch: develop
git_branch: inference-testing-v039
tags:
dev: kayla_wilding
queue:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pandas as pd
import sys
import importlib
import tomllib

from databricks.connect import DatabricksSession
from databricks.sdk.runtime import dbutils
Expand Down Expand Up @@ -51,6 +52,9 @@ def __init__(self, args: argparse.Namespace):
self.spark_session = self.get_spark_session()
self.args = args
self.cfg = self.read_config(self.args.toml_file_path)
#hack - remove when we move this to the main config
with open(self.args.inference_toml_file_path, "rb") as f:
self.inf_cfg = tomllib.load(f)

def get_spark_session(self) -> DatabricksSession | None:
"""
Expand Down Expand Up @@ -111,6 +115,46 @@ def read_data_from_delta(
logging.error("Spark session not initialized. Cannot read delta tables.")
raise

def select_inference_cohort(
self, df_course: pd.DataFrame, df_cohort: pd.DataFrame
)-> tuple[pd.DataFrame, pd.DataFrame]:
"""
Selects the specified cohorts from the course and cohort DataFrames.

Args:
df_course: The course DataFrame.
df_cohort: The cohort DataFrame.
cohorts_list: List of cohorts to select (e.g., ["fall 2023", "spring 2024"]).

Returns:
A tuple containing the filtered course and cohort DataFrames.

Raises:
ValueError: If filtering results in empty DataFrames.
"""
#change to main config when its updated
cohorts_list = self.inf_cfg["inference_cohort"]

#We only have cohort and cohort term split up, so combine and strip to lower to prevent cap issues
df_course['cohort_selection'] = df_course['cohort_term'].astype(str).str.lower() + " " + df_course['cohort'].astype(str).str.lower()
df_cohort['cohort_selection'] = df_cohort['cohort_term'].astype(str).str.lower() + " " + df_cohort['cohort'].astype(str).str.lower()

#Subset both datsets to only these cohorts
df_course_filtered = df_course[df_course['cohort_selection'].isin(cohorts_list)]
df_cohort_filtered = df_cohort[df_cohort['cohort_selection'].isin(cohorts_list)]

#Log confirmation we are selecting the correct cohorts
logging.info("Selected cohorts: %s", cohorts_list)

#Throw error if either dataset is empty after filtering
if df_course_filtered.empty or df_cohort_filtered.empty:
logging.error("Selected cohorts resulted in empty DataFrames.")
raise ValueError("Selected cohorts resulted in empty DataFrames.")

logging.info("Cohort selection completed. Course shape: %s, Cohort shape: %s", df_course_filtered.shape, df_cohort_filtered.shape)

return df_course_filtered, df_cohort_filtered

def preprocess_data(
self, df_course: pd.DataFrame, df_cohort: pd.DataFrame
) -> pd.DataFrame:
Expand All @@ -133,6 +177,10 @@ def preprocess_data(
student_criteria = self.cfg.preprocessing.selection.student_criteria
student_id_col = self.cfg.student_id_col

#Select correct cohort

df_course, df_cohort = self.select_inference_cohort(df_course, df_cohort)

# Create student-term dataset
df_student_terms = preprocessing.pdp.make_student_term_dataset(
df_cohort,
Expand All @@ -158,13 +206,50 @@ def preprocess_data(
enrollment_year_col="year_of_enrollment_at_cohort_inst",
valid_enrollment_year=1,
)
elif checkpoint_type == "first":
logging.info("Checkpoint type: first")
df_ckpt = checkpoints.pdp.first_student_terms(
df = df_student_terms,
student_id_cols=student_id_col,
sort_cols=self.cfg.preprocessing.checkpoint.sort_cols,
include_cols=self.cfg.preprocessing.checkpoint.include_cols,
)
elif checkpoint_type == "last":
logging.info("Checkpoint type: last")
df_ckpt = checkpoints.pdp.last_student_terms_in_enrollment_year(
df_student_terms,
enrollment_year=self.cfg.preprocessing.checkpoint.enrollment_year,
enrollment_year_col=self.cfg.preprocessing.checkpoint.enrollment_year_col,
student_id_cols=student_id_col,
sort_cols=self.cfg.preprocessing.checkpoint.sort_cols,
include_cols=self.cfg.preprocessing.checkpoint.include_cols,
)
elif checkpoint_type == "first_at_num_credits_earned":
logging.info("Checkpoint type: first_at_num_credits_earned")
df_ckpt = checkpoints.pdp.first_student_terms_at_num_credits_earned(
df_student_terms,
min_num_credits=self.cfg.preprocessing.checkpoint.min_num_credits,
)

elif checkpoint_type == "first_within_cohort":
logging.info("Checkpoint type: first_within_cohort")
df_ckpt = checkpoints.pdp.first_student_terms_within_cohort(
df_student_terms,
term_is_pre_cohort_col=self.cfg.preprocessing.checkpoint.term_is_pre_cohort_col,
student_id_cols=student_id_col,
)
elif checkpoint_type == "last_in_enrollment_year":
logging.info("Checkpoint type: last_in_enrollment_year")
df_ckpt = checkpoints.pdp.last_student_terms_in_enrollment_year(
df_student_terms,
enrollment_year=self.cfg.preprocessing.checkpoint.enrollment_year,
enrollment_year_col=self.cfg.preprocessing.checkpoint.enrollment_year_col,
student_id_cols=student_id_col,
)
else:
logging.error("Unknown checkpoint type: %s", checkpoint_type)
raise ValueError(f"Unknown checkpoint type: {checkpoint_type}")

df_processed = pd.merge(
df_ckpt, pd.Series(selected_students.index), how="inner", on=student_id_col
)
Expand Down Expand Up @@ -253,6 +338,9 @@ def parse_arguments() -> argparse.Namespace:
parser.add_argument(
"--toml_file_path", type=str, required=True, help="Path to configuration file"
)
parser.add_argument(
"--inference_toml_file_path", type=str, required=True, help="Path to configuration file"
)
parser.add_argument(
"--custom_schemas_path",
required=False,
Expand All @@ -263,13 +351,13 @@ def parse_arguments() -> argparse.Namespace:

if __name__ == "__main__":
args = parse_arguments()

try:
sys.path.append(args.custom_schemas_path)
sys.path.append(
f"/Volumes/staging_sst_01/{args.databricks_institution_name}_gold/gold_volume/inference_inputs"
)
schemas = importlib.import_module("schemas")
# schemas = importlib.import_module(f"{args.databricks_institution_name}.schemas")
logging.info("Running task with custom schema")
except Exception:
from student_success_tool.dataio.schemas import pdp as schemas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def main():
"""Main function."""
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser()
# hack replace any multiple _ if found in the databricks institution name

parser.add_argument(
"--DB_workspace", required=True, help="Databricks workspace of the task."
)
Expand All @@ -117,6 +119,8 @@ def main():
help="User's email who triggered the inference run.",
)
args = parser.parse_args()
# args.databricks_institution_name = args.databricks_institution_name.replace("___", "_")

w = WorkspaceClient()
logging.info("Publishing files to GCP bucket")
publish_inference_output_files(
Expand All @@ -131,6 +135,7 @@ def main():
cc_email_list = ["[email protected]"]
password = w.dbutils.secrets.get(scope="sst", key="MANDRILL_PASSWORD")
logging.info("Sending email notification")
logging.info(f"Notification email will be sent to {args.notification_email}")
send_inference_completion_email(
sender_email, [args.notification_email], cc_email_list, username, password
)
Expand Down
Loading