Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 103 additions & 5 deletions nbs/distributed.timegpt.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
" target_col: str = 'y',\n",
" X_df: Optional[fugue.AnyDataFrame] = None,\n",
" level: Optional[List[Union[int, float]]] = None,\n",
" quantiles: Optional[List[float]] = None,\n",
" finetune_steps: int = 0,\n",
" finetune_loss: str = 'default',\n",
" clean_ex_first: bool = True,\n",
Expand All @@ -177,6 +178,7 @@
" time_col=time_col,\n",
" target_col=target_col,\n",
" level=level,\n",
" quantiles=quantiles,\n",
" finetune_steps=finetune_steps,\n",
" finetune_loss=finetune_loss,\n",
" clean_ex_first=clean_ex_first,\n",
Expand All @@ -186,7 +188,12 @@
" date_features_to_one_hot=date_features_to_one_hot,\n",
" model=model,\n",
" )\n",
" schema = self._get_forecast_schema(id_col=id_col, time_col=time_col, level=level)\n",
" schema = self._get_forecast_schema(\n",
" id_col=id_col, \n",
" time_col=time_col, \n",
" level=level,\n",
" quantiles=quantiles,\n",
" )\n",
" fcst_df = self._distribute_method(\n",
" method=self._forecast if X_df is None else self._forecast_x,\n",
" df=df,\n",
Expand Down Expand Up @@ -246,6 +253,7 @@
" time_col: str = 'ds',\n",
" target_col: str = 'y',\n",
" level: Optional[List[Union[int, float]]] = None,\n",
" quantiles: Optional[List[float]] = None,\n",
" finetune_steps: int = 0,\n",
" finetune_loss: str = 'default',\n",
" clean_ex_first: bool = True,\n",
Expand All @@ -264,6 +272,7 @@
" time_col=time_col,\n",
" target_col=target_col,\n",
" level=level,\n",
" quantiles=quantiles,\n",
" finetune_steps=finetune_steps,\n",
" finetune_loss=finetune_loss,\n",
" clean_ex_first=clean_ex_first,\n",
Expand All @@ -274,7 +283,13 @@
" n_windows=n_windows,\n",
" step_size=step_size,\n",
" )\n",
" schema = self._get_forecast_schema(id_col=id_col, time_col=time_col, level=level, cv=True)\n",
" schema = self._get_forecast_schema(\n",
" id_col=id_col, \n",
" time_col=time_col, \n",
" level=level,\n",
" quantiles=quantiles,\n",
" cv=True,\n",
" )\n",
" fcst_df = self._distribute_method(\n",
" method=self._cross_validation,\n",
" df=df,\n",
Expand Down Expand Up @@ -330,15 +345,23 @@
" return timegpt._cross_validation(df=df, **kwargs)\n",
" \n",
" @staticmethod\n",
" def _get_forecast_schema(id_col, time_col, level, cv=False):\n",
" def _get_forecast_schema(id_col, time_col, level, quantiles, cv=False):\n",
" schema = f'{id_col}:string,{time_col}:datetime'\n",
" if cv:\n",
" schema = f'{schema},cutoff:datetime'\n",
" schema = f'{schema},TimeGPT:double'\n",
" if (level is not None) and (quantiles is not None):\n",
" raise Exception(\n",
" \"you should include `level` or `quantiles` but not both.\"\n",
" )\n",
" if level is not None:\n",
" level = sorted(level)\n",
" schema = f'{schema},{\",\".join([f\"TimeGPT-lo-{lv}:double\" for lv in reversed(level)])}'\n",
" schema = f'{schema},{\",\".join([f\"TimeGPT-hi-{lv}:double\" for lv in level])}'\n",
" if quantiles is not None:\n",
" quantiles = sorted(quantiles)\n",
" q_cols = [f'TimeGPT-q-{int(q * 100)}:double' for q in quantiles]\n",
" schema = f'{schema},{\",\".join(q_cols)}'\n",
" return Schema(schema)\n",
" \n",
" @staticmethod\n",
Expand Down Expand Up @@ -798,6 +821,34 @@
" #test_anomalies_same_results_num_partitions(df, id_col=id_col, time_col=time_col, date_features=True, clean_ex_first=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"def test_quantiles(df: fugue.AnyDataFrame, id_col: str = 'id_col', time_col: str = 'time_col'):\n",
" test_qls = list(np.arange(0.1, 1, 0.1))\n",
" exp_q_cols = [f\"TimeGPT-q-{int(q * 100)}\" for q in test_qls]\n",
" def test_method_qls(method, **kwargs):\n",
" df_qls = method(\n",
" df=df, \n",
" h=12, \n",
" id_col=id_col,\n",
" time_col=time_col, \n",
" quantiles=test_qls,\n",
" **kwargs\n",
" )\n",
" df_qls = fa.as_pandas(df_qls)\n",
" assert all(col in df_qls.columns for col in exp_q_cols)\n",
" # test monotonicity of quantiles\n",
" df_qls.apply(lambda x: x.is_monotonic_increasing, axis=1).sum() == len(exp_q_cols)\n",
" test_method_qls(distributed_timegpt.forecast)\n",
" test_method_qls(distributed_timegpt.forecast, add_history=True)\n",
" test_method_qls(distributed_timegpt.cross_validation)"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -867,6 +918,16 @@
"spark_diff_cols_df = spark.createDataFrame(series_diff_cols)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"test_quantiles(spark_df, id_col=\"unique_id\", time_col=\"ds\")"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -941,7 +1002,26 @@
"source": [
"#| hide\n",
"dask_df = dd.from_pandas(series, npartitions=2)\n",
"dask_diff_cols_df = dd.from_pandas(series_diff_cols, npartitions=2)\n",
"dask_diff_cols_df = dd.from_pandas(series_diff_cols, npartitions=2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"test_quantiles(dask_df, id_col=\"unique_id\", time_col=\"ds\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"test_forecast_dataframe(dask_df)\n",
"test_forecast_dataframe_diff_cols(dask_diff_cols_df)\n",
"test_anomalies_dataframe(dask_df)\n",
Expand Down Expand Up @@ -1007,7 +1087,25 @@
"# add mock node to simulate a cluster\n",
"mock_node = ray_cluster.add_node(num_cpus=2)\n",
"ray_df = ray.data.from_pandas(series)\n",
"ray_diff_cols_df = ray.data.from_pandas(series_diff_cols)\n",
"ray_diff_cols_df = ray.data.from_pandas(series_diff_cols)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"test_quantiles(ray_df, id_col=\"unique_id\", time_col=\"ds\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#| hide\n",
"test_forecast_dataframe(ray_df)\n",
"test_forecast_dataframe_diff_cols(ray_diff_cols_df)\n",
"test_anomalies_dataframe(ray_df)\n",
Expand Down
Loading