From 2a989a2c3d12327c5a8789190a29067a25654334 Mon Sep 17 00:00:00 2001 From: janrth Date: Fri, 12 Dec 2025 00:13:22 +0100 Subject: [PATCH 1/8] validates format of update df --- mlforecast/core.py | 26 +++++++++++++++++++++++--- mlforecast/forecast.py | 4 ++-- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/mlforecast/core.py b/mlforecast/core.py index 2dd4c682..a42fb99a 100644 --- a/mlforecast/core.py +++ b/mlforecast/core.py @@ -862,9 +862,27 @@ def load(path: Union[str, Path], protocol: Optional[str] = None) -> "TimeSeries" with fsspec.open(path, "rb", protocol=protocol) as f: ts = cloudpickle.load(f) return ts - - def update(self, df: DataFrame) -> None: - """Update the values of the stored series.""" + + def _validate_new_df(self, df: pd.DataFrame) -> None: + if isinstance(df, pl.DataFrame): + max_date_update_observed = df.group_by(self.id_col).agg(pl.col(self.time_col).max()).to_pandas()[self.time_col] + min_date_update_observed = df.group_by(self.id_col).agg(pl.col(self.time_col).min()).to_pandas()[self.time_col] + else: + max_date_update_observed = df.groupby(self.id_col, observed=True)[self.time_col].max() + min_date_update_observed = df.groupby(self.id_col, observed=True)[self.time_col].min() + last_dates = dict(zip(self.uids, self.last_dates)) + id_col = df[self.id_col].unique().to_numpy() + df_temp = pd.DataFrame({self.id_col: id_col}) + df_temp['last_date'] = df_temp[self.id_col].map(last_dates) + df_temp['expected_next_date'] = df_temp['last_date'] + pd.tseries.frequencies.to_offset(self.freq) + min_date_update_expected = df_temp.groupby(self.id_col, observed=True).expected_next_date.min().values + + assert np.sum((max_date_update_observed - min_date_update_expected).dt.days) == np.sum((max_date_update_observed - min_date_update_observed).dt.days), \ + """Expected size mismatch: Each unique_id needs to start from the last + day of the previous history and should contain continuous observations.""" + + def update(self, df: DataFrame, validate_input: str = False) -> None: + """Update the values of the stored series.""" validate_format(df, self.id_col, self.time_col, self.target_col) uids = self.uids if isinstance(uids, pd.Index): @@ -875,6 +893,8 @@ def update(self, df: DataFrame) -> None: df = ufp.sort(df, by=[self.id_col, self.time_col]) values = df[self.target_col].to_numpy() values = values.astype(self.ga.data.dtype, copy=False) + if validate_input: + self._validate_new_df(df=df) id_counts = ufp.counts_by_id(df, self.id_col) try: sizes = ufp.join(uids, id_counts, on=self.id_col, how="outer_coalesce") diff --git a/mlforecast/forecast.py b/mlforecast/forecast.py index 35138aa5..b6f8f401 100644 --- a/mlforecast/forecast.py +++ b/mlforecast/forecast.py @@ -962,10 +962,10 @@ def load(path: Union[str, Path]) -> "MLForecast": fcst._cs_df = intervals["scores"] return fcst - def update(self, df: DataFrame) -> None: + def update(self, df: DataFrame, validate_input: str = False) -> None: """Update the values of the stored series. Args: df (pandas or polars DataFrame): Dataframe with new observations. """ - self.ts.update(df) + self.ts.update(df, validate_input) From e4ebecaf5abae28f1aeda1bb1c4e74aff393dbff Mon Sep 17 00:00:00 2001 From: janrth Date: Fri, 12 Dec 2025 00:13:22 +0100 Subject: [PATCH 2/8] validates format of update df --- mlforecast/core.py | 26 +++++++++++++++++++++++--- mlforecast/forecast.py | 4 ++-- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/mlforecast/core.py b/mlforecast/core.py index 2dd4c682..a42fb99a 100644 --- a/mlforecast/core.py +++ b/mlforecast/core.py @@ -862,9 +862,27 @@ def load(path: Union[str, Path], protocol: Optional[str] = None) -> "TimeSeries" with fsspec.open(path, "rb", protocol=protocol) as f: ts = cloudpickle.load(f) return ts - - def update(self, df: DataFrame) -> None: - """Update the values of the stored series.""" + + def _validate_new_df(self, df: pd.DataFrame) -> None: + if isinstance(df, pl.DataFrame): + max_date_update_observed = df.group_by(self.id_col).agg(pl.col(self.time_col).max()).to_pandas()[self.time_col] + min_date_update_observed = df.group_by(self.id_col).agg(pl.col(self.time_col).min()).to_pandas()[self.time_col] + else: + max_date_update_observed = df.groupby(self.id_col, observed=True)[self.time_col].max() + min_date_update_observed = df.groupby(self.id_col, observed=True)[self.time_col].min() + last_dates = dict(zip(self.uids, self.last_dates)) + id_col = df[self.id_col].unique().to_numpy() + df_temp = pd.DataFrame({self.id_col: id_col}) + df_temp['last_date'] = df_temp[self.id_col].map(last_dates) + df_temp['expected_next_date'] = df_temp['last_date'] + pd.tseries.frequencies.to_offset(self.freq) + min_date_update_expected = df_temp.groupby(self.id_col, observed=True).expected_next_date.min().values + + assert np.sum((max_date_update_observed - min_date_update_expected).dt.days) == np.sum((max_date_update_observed - min_date_update_observed).dt.days), \ + """Expected size mismatch: Each unique_id needs to start from the last + day of the previous history and should contain continuous observations.""" + + def update(self, df: DataFrame, validate_input: str = False) -> None: + """Update the values of the stored series.""" validate_format(df, self.id_col, self.time_col, self.target_col) uids = self.uids if isinstance(uids, pd.Index): @@ -875,6 +893,8 @@ def update(self, df: DataFrame) -> None: df = ufp.sort(df, by=[self.id_col, self.time_col]) values = df[self.target_col].to_numpy() values = values.astype(self.ga.data.dtype, copy=False) + if validate_input: + self._validate_new_df(df=df) id_counts = ufp.counts_by_id(df, self.id_col) try: sizes = ufp.join(uids, id_counts, on=self.id_col, how="outer_coalesce") diff --git a/mlforecast/forecast.py b/mlforecast/forecast.py index 35138aa5..b6f8f401 100644 --- a/mlforecast/forecast.py +++ b/mlforecast/forecast.py @@ -962,10 +962,10 @@ def load(path: Union[str, Path]) -> "MLForecast": fcst._cs_df = intervals["scores"] return fcst - def update(self, df: DataFrame) -> None: + def update(self, df: DataFrame, validate_input: str = False) -> None: """Update the values of the stored series. Args: df (pandas or polars DataFrame): Dataframe with new observations. """ - self.ts.update(df) + self.ts.update(df, validate_input) From e2c13f04cc1fd089537a6db2027ce8a626cf10d7 Mon Sep 17 00:00:00 2001 From: janrth Date: Sat, 3 Jan 2026 00:15:17 +0100 Subject: [PATCH 3/8] fix type annotations for bool --- mlforecast/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlforecast/core.py b/mlforecast/core.py index a42fb99a..c23c4931 100644 --- a/mlforecast/core.py +++ b/mlforecast/core.py @@ -881,7 +881,7 @@ def _validate_new_df(self, df: pd.DataFrame) -> None: """Expected size mismatch: Each unique_id needs to start from the last day of the previous history and should contain continuous observations.""" - def update(self, df: DataFrame, validate_input: str = False) -> None: + def update(self, df: DataFrame, validate_input: bool = False) -> None: """Update the values of the stored series.""" validate_format(df, self.id_col, self.time_col, self.target_col) uids = self.uids From cd774d897eb0f091fea356a18b9feadd1efcce4d Mon Sep 17 00:00:00 2001 From: janrth Date: Sat, 3 Jan 2026 00:29:15 +0100 Subject: [PATCH 4/8] fixing type annotation in forecast.py --- mlforecast/forecast.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mlforecast/forecast.py b/mlforecast/forecast.py index fc1482e4..b346423f 100644 --- a/mlforecast/forecast.py +++ b/mlforecast/forecast.py @@ -961,7 +961,7 @@ def load(path: Union[str, Path]) -> "MLForecast": fcst._cs_df = intervals["scores"] return fcst - def update(self, df: DataFrame, validate_input: str = False) -> None: + def update(self, df: DataFrame, validate_input: bool = False) -> None: """Update the values of the stored series. Args: From cd6cd48ee2095be58628b35b9acef2e08a73a827 Mon Sep 17 00:00:00 2001 From: janrth Date: Tue, 13 Jan 2026 21:58:27 +0100 Subject: [PATCH 5/8] add full validations for update function plus tests --- mlforecast/core.py | 118 +++++++++++++++++++++++++++---- tests/test_core.py | 168 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 272 insertions(+), 14 deletions(-) diff --git a/mlforecast/core.py b/mlforecast/core.py index c23c4931..481cc133 100644 --- a/mlforecast/core.py +++ b/mlforecast/core.py @@ -865,21 +865,111 @@ def load(path: Union[str, Path], protocol: Optional[str] = None) -> "TimeSeries" def _validate_new_df(self, df: pd.DataFrame) -> None: if isinstance(df, pl.DataFrame): - max_date_update_observed = df.group_by(self.id_col).agg(pl.col(self.time_col).max()).to_pandas()[self.time_col] - min_date_update_observed = df.group_by(self.id_col).agg(pl.col(self.time_col).min()).to_pandas()[self.time_col] + stats = ( + df.group_by(self.id_col) + .agg( + pl.col(self.time_col).min().alias("_min"), + pl.col(self.time_col).max().alias("_max"), + pl.len().alias("_size"), + ) + .sort(self.id_col) + ) + last_dates_df = pl_DataFrame( + {self.id_col: self.uids, "_last": self.last_dates} + ) + expected_start = ufp.offset_times(last_dates_df["_last"], self.freq, 1) + expected_df = last_dates_df.with_columns( + pl.Series(name="_expected_start", values=expected_start) + ).select([self.id_col, "_expected_start"]) + stats = stats.join(expected_df, on=self.id_col, how="left") + bad_starts = stats.filter( + pl.col("_expected_start").is_not_null() + & (pl.col("_min") != pl.col("_expected_start")) + ) + if bad_starts.height: + bad_ids = bad_starts[self.id_col].to_list() + raise ValueError( + "Series have invalid start dates. " + f"Expected start at last_date + freq for: {bad_ids}." + ) + if isinstance(self.freq, int): + diffs = pl.col("_max") - pl.col("_min") + misaligned = stats.filter((diffs % self.freq) != 0) + if misaligned.height: + raise ValueError( + "Found timestamps not aligned to the configured frequency." + ) + expected_count = diffs // self.freq + 1 + else: + delta = pd.Timedelta(pd.tseries.frequencies.to_offset(self.freq)) + delta_ns = delta.value + min_ns = pl.col("_min").dt.timestamp("ns") + max_ns = pl.col("_max").dt.timestamp("ns") + diffs_ns = max_ns - min_ns + misaligned = stats.filter((diffs_ns % delta_ns) != 0) + if misaligned.height: + raise ValueError( + "Found timestamps not aligned to the configured frequency." + ) + expected_count = diffs_ns // delta_ns + 1 + gaps = stats.filter(expected_count != pl.col("_size")) + if gaps.height: + bad_ids = gaps[self.id_col].to_list() + raise ValueError( + "Found gaps or duplicate timestamps in the update for: " + f"{bad_ids}." + ) + return + stats = ( + df.groupby(self.id_col, observed=True)[self.time_col] + .agg(["min", "max", "size"]) + .rename(columns={"min": "_min", "max": "_max", "size": "_size"}) + .reset_index() + ) + last_dates_df = pd.DataFrame( + {self.id_col: self.uids, "_last": self.last_dates} + ) + expected_start = ufp.offset_times(last_dates_df["_last"], self.freq, 1) + expected_df = pd.DataFrame( + {self.id_col: last_dates_df[self.id_col], "_expected_start": expected_start} + ) + stats[self.id_col] = stats[self.id_col].astype(str) + expected_df[self.id_col] = expected_df[self.id_col].astype(str) + stats = stats.merge(expected_df, on=self.id_col, how="left") + start_mismatch = stats["_expected_start"].notna() & ( + stats["_min"] != stats["_expected_start"] + ) + if start_mismatch.any(): + bad_ids = stats.loc[start_mismatch, self.id_col].tolist() + raise ValueError( + "Series have invalid start dates. " + f"Expected start at last_date + freq for: {bad_ids}." + ) + diffs = stats["_max"] - stats["_min"] + if isinstance(self.freq, int): + delta = self.freq + remainder = diffs % delta + if (remainder != 0).any(): + raise ValueError( + "Found timestamps not aligned to the configured frequency." + ) + expected_count = diffs // delta + 1 else: - max_date_update_observed = df.groupby(self.id_col, observed=True)[self.time_col].max() - min_date_update_observed = df.groupby(self.id_col, observed=True)[self.time_col].min() - last_dates = dict(zip(self.uids, self.last_dates)) - id_col = df[self.id_col].unique().to_numpy() - df_temp = pd.DataFrame({self.id_col: id_col}) - df_temp['last_date'] = df_temp[self.id_col].map(last_dates) - df_temp['expected_next_date'] = df_temp['last_date'] + pd.tseries.frequencies.to_offset(self.freq) - min_date_update_expected = df_temp.groupby(self.id_col, observed=True).expected_next_date.min().values - - assert np.sum((max_date_update_observed - min_date_update_expected).dt.days) == np.sum((max_date_update_observed - min_date_update_observed).dt.days), \ - """Expected size mismatch: Each unique_id needs to start from the last - day of the previous history and should contain continuous observations.""" + offset = pd.tseries.frequencies.to_offset(self.freq) + delta = pd.Timedelta(offset) + remainder = diffs % delta + if (remainder != pd.Timedelta(0)).any(): + raise ValueError( + "Found timestamps not aligned to the configured frequency." + ) + expected_count = diffs // delta + 1 + gaps = expected_count != stats["_size"] + if gaps.any(): + bad_ids = stats.loc[gaps, self.id_col].tolist() + raise ValueError( + "Found gaps or duplicate timestamps in the update for: " + f"{bad_ids}." + ) def update(self, df: DataFrame, validate_input: bool = False) -> None: """Update the values of the stored series.""" diff --git a/tests/test_core.py b/tests/test_core.py index 73b5b71b..8debfaea 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -561,6 +561,174 @@ def test_ts_update(series): last7 = ts.ga.take_from_groups(slice(-7, None)).data assert 0 < np.abs(last7 / orig_last7 - 1).mean() < 0.5 + +def _make_valid_update(series, engine): + if engine == "polars": + last_vals = series.join( + series.group_by("unique_id").agg(pl.col("ds").max()), + on=["unique_id", "ds"], + ) + update1 = last_vals.with_columns(pl.col("ds").dt.offset_by("1d")) + update2 = last_vals.with_columns(pl.col("ds").dt.offset_by("2d")) + update = pl.concat([update1, update2]) + return update.with_columns((pl.col("y") + 1).alias("y")) + last_vals = series.groupby("unique_id", observed=True).tail(1).copy() + update1 = last_vals.copy() + update2 = last_vals.copy() + update1["ds"] += pd.offsets.Day() + update2["ds"] += 2 * pd.offsets.Day() + update = pd.concat([update1, update2], ignore_index=True) + update["y"] = update["y"] + 1 + return update + + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_update_validation_valid_continuous(engine): + series = generate_daily_series(3, n_static_features=2, engine=engine) + freq = "1d" if engine == "polars" else "D" + ts = TimeSeries(freq=freq, lags=[1]) + ts.fit_transform(series, id_col="unique_id", time_col="ds", target_col="y") + update = _make_valid_update(series, engine) + ts.update(update, validate_input=True) + + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_update_validation_invalid_gap(engine): + series = generate_daily_series(3, n_static_features=2, engine=engine) + freq = "1d" if engine == "polars" else "D" + ts = TimeSeries(freq=freq, lags=[1]) + ts.fit_transform(series, id_col="unique_id", time_col="ds", target_col="y") + if engine == "polars": + last_vals = series.join( + series.group_by("unique_id").agg(pl.col("ds").max()), + on=["unique_id", "ds"], + ) + update = pl.concat( + [ + last_vals.with_columns(pl.col("ds").dt.offset_by("1d")), + last_vals.with_columns(pl.col("ds").dt.offset_by("3d")), + ] + ) + else: + last_vals = series.groupby("unique_id", observed=True).tail(1).copy() + update = pd.concat( + [ + last_vals.assign(ds=last_vals["ds"] + pd.offsets.Day()), + last_vals.assign(ds=last_vals["ds"] + 3 * pd.offsets.Day()), + ], + ignore_index=True, + ) + with pytest.raises(ValueError, match="gaps or duplicate"): + ts.update(update, validate_input=True) + + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_update_validation_invalid_start(engine): + series = generate_daily_series(3, n_static_features=2, engine=engine) + freq = "1d" if engine == "polars" else "D" + ts = TimeSeries(freq=freq, lags=[1]) + ts.fit_transform(series, id_col="unique_id", time_col="ds", target_col="y") + if engine == "polars": + last_vals = series.join( + series.group_by("unique_id").agg(pl.col("ds").max()), + on=["unique_id", "ds"], + ) + update = pl.concat( + [ + last_vals, + last_vals.with_columns(pl.col("ds").dt.offset_by("1d")), + ] + ) + else: + last_vals = series.groupby("unique_id", observed=True).tail(1).copy() + update = pd.concat( + [ + last_vals, + last_vals.assign(ds=last_vals["ds"] + pd.offsets.Day()), + ], + ignore_index=True, + ) + with pytest.raises(ValueError, match="invalid start"): + ts.update(update, validate_input=True) + + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_update_validation_new_series(engine): + series = generate_daily_series(3, n_static_features=2, engine=engine) + freq = "1d" if engine == "polars" else "D" + ts = TimeSeries(freq=freq, lags=[1]) + ts.fit_transform(series, id_col="unique_id", time_col="ds", target_col="y") + update = _make_valid_update(series, engine) + if engine == "polars": + new_series = pl.DataFrame( + { + "unique_id": ["new_0", "new_0"], + "ds": [datetime.datetime(2020, 1, 1), datetime.datetime(2020, 1, 2)], + "y": [1.0, 2.0], + "static_0": [0, 0], + "static_1": [1, 1], + } + ).with_columns( + pl.col("ds").dt.cast_time_unit("ns"), + pl.col("unique_id").cast(pl.Categorical), + ) + cast_exprs = [] + for col, dtype in update.schema.items(): + if dtype == pl.Categorical and new_series.schema[col] != pl.Categorical: + cast_exprs.append( + pl.col(col).cast(pl.Utf8).cast(pl.Categorical).alias(col) + ) + else: + cast_exprs.append(pl.col(col).cast(dtype).alias(col)) + new_series = new_series.with_columns(cast_exprs) + update = pl.concat([update, new_series]) + else: + new_series = pd.DataFrame( + { + "unique_id": ["new_0", "new_0"], + "ds": pd.to_datetime(["2020-01-01", "2020-01-02"]), + "y": [1.0, 2.0], + "static_0": [0, 0], + "static_1": [1, 1], + } + ) + update = pd.concat([update, new_series], ignore_index=True) + ts.update(update, validate_input=True) + + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +def test_update_validation_frequency_mismatch(engine): + series = generate_daily_series(3, n_static_features=2, engine=engine) + freq = "1d" if engine == "polars" else "D" + ts = TimeSeries(freq=freq, lags=[1]) + ts.fit_transform(series, id_col="unique_id", time_col="ds", target_col="y") + if engine == "polars": + last_vals = series.join( + series.group_by("unique_id").agg(pl.col("ds").max()), + on=["unique_id", "ds"], + ) + update = pl.concat( + [ + last_vals.with_columns(pl.col("ds").dt.offset_by("1d")), + last_vals.with_columns( + pl.col("ds").dt.offset_by("1d").dt.offset_by("1h") + ), + ] + ) + else: + last_vals = series.groupby("unique_id", observed=True).tail(1).copy() + update = pd.concat( + [ + last_vals.assign(ds=last_vals["ds"] + pd.offsets.Day()), + last_vals.assign( + ds=last_vals["ds"] + pd.offsets.Day() + pd.offsets.Hour() + ), + ], + ignore_index=True, + ) + with pytest.raises(ValueError, match="aligned"): + ts.update(update, validate_input=True) + def test_ts_polars(): two_series = generate_daily_series(2, n_static_features=2, engine="polars") ts = TimeSeries(freq="1d", lags=[1], date_features=["weekday"]) From 376ef174424f4d2e3abcda3e7b4f180ed03b34f6 Mon Sep 17 00:00:00 2001 From: janrth Date: Thu, 15 Jan 2026 15:41:01 +0100 Subject: [PATCH 6/8] fix type annotations and pl categorical encoding bug --- mlforecast/core.py | 11 +++++++++-- mlforecast/forecast.py | 1 + 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/mlforecast/core.py b/mlforecast/core.py index 481cc133..eee8efc2 100644 --- a/mlforecast/core.py +++ b/mlforecast/core.py @@ -863,7 +863,7 @@ def load(path: Union[str, Path], protocol: Optional[str] = None) -> "TimeSeries" ts = cloudpickle.load(f) return ts - def _validate_new_df(self, df: pd.DataFrame) -> None: + def _validate_new_df(self, df: DataFrame) -> None: if isinstance(df, pl.DataFrame): stats = ( df.group_by(self.id_col) @@ -881,6 +881,8 @@ def _validate_new_df(self, df: pd.DataFrame) -> None: expected_df = last_dates_df.with_columns( pl.Series(name="_expected_start", values=expected_start) ).select([self.id_col, "_expected_start"]) + stats = stats.with_columns(pl.col(self.id_col).cast(pl.Utf8)) + expected_df = expected_df.with_columns(pl.col(self.id_col).cast(pl.Utf8)) stats = stats.join(expected_df, on=self.id_col, how="left") bad_starts = stats.filter( pl.col("_expected_start").is_not_null() @@ -972,7 +974,12 @@ def _validate_new_df(self, df: pd.DataFrame) -> None: ) def update(self, df: DataFrame, validate_input: bool = False) -> None: - """Update the values of the stored series.""" + """Update the values of the stored series. + + Args: + df: New observations to append. + validate_input: If True, validate continuity, start dates, and frequency. + """ validate_format(df, self.id_col, self.time_col, self.target_col) uids = self.uids if isinstance(uids, pd.Index): diff --git a/mlforecast/forecast.py b/mlforecast/forecast.py index b346423f..6b346c40 100644 --- a/mlforecast/forecast.py +++ b/mlforecast/forecast.py @@ -966,5 +966,6 @@ def update(self, df: DataFrame, validate_input: bool = False) -> None: Args: df (pandas or polars DataFrame): Dataframe with new observations. + validate_input (bool): If True, validate continuity, start dates, and frequency. """ self.ts.update(df, validate_input) From 8a44db11e2553f06bf36d772a9b3d256369dd3ed Mon Sep 17 00:00:00 2001 From: Saul Caballero Date: Thu, 15 Jan 2026 18:06:15 -0600 Subject: [PATCH 7/8] Adds tests for more frequencies --- tests/test_core.py | 276 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 242 insertions(+), 34 deletions(-) diff --git a/tests/test_core.py b/tests/test_core.py index 8debfaea..ca9b0597 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -562,40 +562,162 @@ def test_ts_update(series): assert 0 < np.abs(last7 / orig_last7 - 1).mean() < 0.5 -def _make_valid_update(series, engine): +def _generate_series_with_freq(n_series, freq_name, n_static_features, engine): + """Helper to generate series with specific frequency.""" + from utilsforecast.data import generate_series + from math import ceil, log10 + + freq_map = { + "hourly": "h", + "daily": "D", + "weekly": "W", + "monthly": "MS", + "yearly": "YS", + } + + series = generate_series( + n_series=n_series, + freq=freq_map[freq_name], + min_length=50, + max_length=500, + n_static_features=n_static_features, + equal_ends=False, + static_as_categorical=True, + with_trend=False, + seed=0, + engine=engine, + ) + + n_digits = ceil(log10(n_series)) + if engine == "pandas": + series["unique_id"] = ( + "id_" + series["unique_id"].astype(str).str.rjust(n_digits, "0") + ).astype("category") + else: + try: + series = series.with_columns( + ("id_" + pl.col("unique_id").cast(pl.Utf8).str.pad_start(n_digits, "0")) + .alias("unique_id") + .cast(pl.Categorical) + ) + except AttributeError: + series = series.with_columns( + ("id_" + pl.col("unique_id").cast(pl.Utf8).str.rjust(n_digits, "0")) + .alias("unique_id") + .cast(pl.Categorical) + ) + return series + + +def _get_freq_config(freq_name, engine): + """Get frequency configuration for pandas and polars.""" + freq_configs = { + "hourly": { + "pandas_freq": "h", + "polars_freq": "1h", + "polars_offset1": "1h", + "polars_offset2": "2h", + "polars_offset3": "3h", + "polars_offset_misaligned": "30m", + "pandas_offset1": pd.offsets.Hour(), + "pandas_offset2": pd.offsets.Hour(2), + "pandas_offset3": pd.offsets.Hour(3), + "pandas_offset_misaligned": pd.offsets.Minute(30), + }, + "daily": { + "pandas_freq": "D", + "polars_freq": "1d", + "polars_offset1": "1d", + "polars_offset2": "2d", + "polars_offset3": "3d", + "polars_offset_misaligned": "12h", + "pandas_offset1": pd.offsets.Day(), + "pandas_offset2": pd.offsets.Day(2), + "pandas_offset3": pd.offsets.Day(3), + "pandas_offset_misaligned": pd.offsets.Hour(12), + }, + "weekly": { + "pandas_freq": "W", + "polars_freq": "1w", + "polars_offset1": "1w", + "polars_offset2": "2w", + "polars_offset3": "3w", + "polars_offset_misaligned": "3d", + "pandas_offset1": pd.offsets.Week(), + "pandas_offset2": pd.offsets.Week(2), + "pandas_offset3": pd.offsets.Week(3), + "pandas_offset_misaligned": pd.offsets.Day(3), + }, + "monthly": { + "pandas_freq": "MS", + "polars_freq": "1mo", + "polars_offset1": "1mo", + "polars_offset2": "2mo", + "polars_offset3": "3mo", + "polars_offset_misaligned": "15d", + "pandas_offset1": pd.offsets.MonthBegin(), + "pandas_offset2": pd.offsets.MonthBegin(2), + "pandas_offset3": pd.offsets.MonthBegin(3), + "pandas_offset_misaligned": pd.offsets.Day(15), + }, + "yearly": { + "pandas_freq": "YS", + "polars_freq": "1y", + "polars_offset1": "1y", + "polars_offset2": "2y", + "polars_offset3": "3y", + "polars_offset_misaligned": "6mo", + "pandas_offset1": pd.offsets.YearBegin(), + "pandas_offset2": pd.offsets.YearBegin(2), + "pandas_offset3": pd.offsets.YearBegin(3), + "pandas_offset_misaligned": pd.offsets.MonthBegin(6), + }, + } + + config = freq_configs[freq_name] + return ( + config["polars_freq"] if engine == "polars" else config["pandas_freq"], + config, + ) + + +def _make_valid_update(series, engine, freq_config): + """Create a valid update dataframe with correct frequency alignment.""" if engine == "polars": last_vals = series.join( series.group_by("unique_id").agg(pl.col("ds").max()), on=["unique_id", "ds"], ) - update1 = last_vals.with_columns(pl.col("ds").dt.offset_by("1d")) - update2 = last_vals.with_columns(pl.col("ds").dt.offset_by("2d")) + update1 = last_vals.with_columns(pl.col("ds").dt.offset_by(freq_config["polars_offset1"])) + update2 = last_vals.with_columns(pl.col("ds").dt.offset_by(freq_config["polars_offset2"])) update = pl.concat([update1, update2]) return update.with_columns((pl.col("y") + 1).alias("y")) last_vals = series.groupby("unique_id", observed=True).tail(1).copy() update1 = last_vals.copy() update2 = last_vals.copy() - update1["ds"] += pd.offsets.Day() - update2["ds"] += 2 * pd.offsets.Day() + update1["ds"] += freq_config["pandas_offset1"] + update2["ds"] += freq_config["pandas_offset2"] update = pd.concat([update1, update2], ignore_index=True) update["y"] = update["y"] + 1 return update @pytest.mark.parametrize("engine", ["pandas", "polars"]) -def test_update_validation_valid_continuous(engine): - series = generate_daily_series(3, n_static_features=2, engine=engine) - freq = "1d" if engine == "polars" else "D" +@pytest.mark.parametrize("freq_name", ["hourly", "daily", "weekly", "monthly", "yearly"]) +def test_update_validation_valid_continuous(engine, freq_name): + series = _generate_series_with_freq(3, freq_name, n_static_features=2, engine=engine) + freq, freq_config = _get_freq_config(freq_name, engine) ts = TimeSeries(freq=freq, lags=[1]) ts.fit_transform(series, id_col="unique_id", time_col="ds", target_col="y") - update = _make_valid_update(series, engine) + update = _make_valid_update(series, engine, freq_config) ts.update(update, validate_input=True) @pytest.mark.parametrize("engine", ["pandas", "polars"]) -def test_update_validation_invalid_gap(engine): - series = generate_daily_series(3, n_static_features=2, engine=engine) - freq = "1d" if engine == "polars" else "D" +@pytest.mark.parametrize("freq_name", ["hourly", "daily", "weekly", "monthly", "yearly"]) +def test_update_validation_invalid_gap(engine, freq_name): + series = _generate_series_with_freq(3, freq_name, n_static_features=2, engine=engine) + freq, freq_config = _get_freq_config(freq_name, engine) ts = TimeSeries(freq=freq, lags=[1]) ts.fit_transform(series, id_col="unique_id", time_col="ds", target_col="y") if engine == "polars": @@ -605,16 +727,16 @@ def test_update_validation_invalid_gap(engine): ) update = pl.concat( [ - last_vals.with_columns(pl.col("ds").dt.offset_by("1d")), - last_vals.with_columns(pl.col("ds").dt.offset_by("3d")), + last_vals.with_columns(pl.col("ds").dt.offset_by(freq_config["polars_offset1"])), + last_vals.with_columns(pl.col("ds").dt.offset_by(freq_config["polars_offset3"])), ] ) else: last_vals = series.groupby("unique_id", observed=True).tail(1).copy() update = pd.concat( [ - last_vals.assign(ds=last_vals["ds"] + pd.offsets.Day()), - last_vals.assign(ds=last_vals["ds"] + 3 * pd.offsets.Day()), + last_vals.assign(ds=last_vals["ds"] + freq_config["pandas_offset1"]), + last_vals.assign(ds=last_vals["ds"] + freq_config["pandas_offset3"]), ], ignore_index=True, ) @@ -623,9 +745,10 @@ def test_update_validation_invalid_gap(engine): @pytest.mark.parametrize("engine", ["pandas", "polars"]) -def test_update_validation_invalid_start(engine): - series = generate_daily_series(3, n_static_features=2, engine=engine) - freq = "1d" if engine == "polars" else "D" +@pytest.mark.parametrize("freq_name", ["hourly", "daily", "weekly", "monthly", "yearly"]) +def test_update_validation_invalid_start(engine, freq_name): + series = _generate_series_with_freq(3, freq_name, n_static_features=2, engine=engine) + freq, freq_config = _get_freq_config(freq_name, engine) ts = TimeSeries(freq=freq, lags=[1]) ts.fit_transform(series, id_col="unique_id", time_col="ds", target_col="y") if engine == "polars": @@ -636,7 +759,7 @@ def test_update_validation_invalid_start(engine): update = pl.concat( [ last_vals, - last_vals.with_columns(pl.col("ds").dt.offset_by("1d")), + last_vals.with_columns(pl.col("ds").dt.offset_by(freq_config["polars_offset1"])), ] ) else: @@ -644,7 +767,7 @@ def test_update_validation_invalid_start(engine): update = pd.concat( [ last_vals, - last_vals.assign(ds=last_vals["ds"] + pd.offsets.Day()), + last_vals.assign(ds=last_vals["ds"] + freq_config["pandas_offset1"]), ], ignore_index=True, ) @@ -653,17 +776,31 @@ def test_update_validation_invalid_start(engine): @pytest.mark.parametrize("engine", ["pandas", "polars"]) -def test_update_validation_new_series(engine): - series = generate_daily_series(3, n_static_features=2, engine=engine) - freq = "1d" if engine == "polars" else "D" +@pytest.mark.parametrize("freq_name", ["hourly", "daily", "weekly", "monthly", "yearly"]) +def test_update_validation_new_series(engine, freq_name): + series = _generate_series_with_freq(3, freq_name, n_static_features=2, engine=engine) + freq, freq_config = _get_freq_config(freq_name, engine) ts = TimeSeries(freq=freq, lags=[1]) ts.fit_transform(series, id_col="unique_id", time_col="ds", target_col="y") - update = _make_valid_update(series, engine) + update = _make_valid_update(series, engine, freq_config) + + # Create new series with proper timestamps for the frequency if engine == "polars": + if freq_name == "hourly": + dates = [datetime.datetime(2020, 1, 1, 0, 0), datetime.datetime(2020, 1, 1, 1, 0)] + elif freq_name == "daily": + dates = [datetime.datetime(2020, 1, 1), datetime.datetime(2020, 1, 2)] + elif freq_name == "weekly": + dates = [datetime.datetime(2020, 1, 5), datetime.datetime(2020, 1, 12)] + elif freq_name == "monthly": + dates = [datetime.datetime(2020, 1, 1), datetime.datetime(2020, 2, 1)] + else: # yearly + dates = [datetime.datetime(2020, 1, 1), datetime.datetime(2021, 1, 1)] + new_series = pl.DataFrame( { "unique_id": ["new_0", "new_0"], - "ds": [datetime.datetime(2020, 1, 1), datetime.datetime(2020, 1, 2)], + "ds": dates, "y": [1.0, 2.0], "static_0": [0, 0], "static_1": [1, 1], @@ -683,10 +820,21 @@ def test_update_validation_new_series(engine): new_series = new_series.with_columns(cast_exprs) update = pl.concat([update, new_series]) else: + if freq_name == "hourly": + dates = pd.date_range("2020-01-01", periods=2, freq="h") + elif freq_name == "daily": + dates = pd.date_range("2020-01-01", periods=2, freq="D") + elif freq_name == "weekly": + dates = pd.date_range("2020-01-05", periods=2, freq="W") + elif freq_name == "monthly": + dates = pd.date_range("2020-01-01", periods=2, freq="MS") + else: # yearly + dates = pd.date_range("2020-01-01", periods=2, freq="YS") + new_series = pd.DataFrame( { "unique_id": ["new_0", "new_0"], - "ds": pd.to_datetime(["2020-01-01", "2020-01-02"]), + "ds": dates, "y": [1.0, 2.0], "static_0": [0, 0], "static_1": [1, 1], @@ -697,9 +845,10 @@ def test_update_validation_new_series(engine): @pytest.mark.parametrize("engine", ["pandas", "polars"]) -def test_update_validation_frequency_mismatch(engine): - series = generate_daily_series(3, n_static_features=2, engine=engine) - freq = "1d" if engine == "polars" else "D" +@pytest.mark.parametrize("freq_name", ["hourly", "daily", "weekly", "monthly", "yearly"]) +def test_update_validation_frequency_mismatch(engine, freq_name): + series = _generate_series_with_freq(3, freq_name, n_static_features=2, engine=engine) + freq, freq_config = _get_freq_config(freq_name, engine) ts = TimeSeries(freq=freq, lags=[1]) ts.fit_transform(series, id_col="unique_id", time_col="ds", target_col="y") if engine == "polars": @@ -707,21 +856,27 @@ def test_update_validation_frequency_mismatch(engine): series.group_by("unique_id").agg(pl.col("ds").max()), on=["unique_id", "ds"], ) + # Create one properly aligned timestamp and one misaligned update = pl.concat( [ - last_vals.with_columns(pl.col("ds").dt.offset_by("1d")), + last_vals.with_columns(pl.col("ds").dt.offset_by(freq_config["polars_offset1"])), last_vals.with_columns( - pl.col("ds").dt.offset_by("1d").dt.offset_by("1h") + pl.col("ds") + .dt.offset_by(freq_config["polars_offset1"]) + .dt.offset_by(freq_config["polars_offset_misaligned"]) ), ] ) else: last_vals = series.groupby("unique_id", observed=True).tail(1).copy() + # Create one properly aligned timestamp and one misaligned update = pd.concat( [ - last_vals.assign(ds=last_vals["ds"] + pd.offsets.Day()), + last_vals.assign(ds=last_vals["ds"] + freq_config["pandas_offset1"]), last_vals.assign( - ds=last_vals["ds"] + pd.offsets.Day() + pd.offsets.Hour() + ds=last_vals["ds"] + + freq_config["pandas_offset1"] + + freq_config["pandas_offset_misaligned"] ), ], ignore_index=True, @@ -729,6 +884,59 @@ def test_update_validation_frequency_mismatch(engine): with pytest.raises(ValueError, match="aligned"): ts.update(update, validate_input=True) + +@pytest.mark.parametrize("engine", ["pandas", "polars"]) +@pytest.mark.parametrize("freq_name", ["hourly", "daily"]) +def test_update_validation_misaligned_intermediate_timestamp(engine, freq_name): + """Test that misaligned intermediate timestamps are caught even when min/max are aligned. + + This is a critical bug: the current validation only checks min and max timestamps, + so it misses misaligned intermediate values. For example, with hourly data: + [12:00, 13:30, 14:00] should FAIL because 13:30 is not aligned, but currently + it passes because min (12:00) and max (14:00) are both aligned. + """ + series = _generate_series_with_freq(3, freq_name, n_static_features=2, engine=engine) + freq, freq_config = _get_freq_config(freq_name, engine) + ts = TimeSeries(freq=freq, lags=[1]) + ts.fit_transform(series, id_col="unique_id", time_col="ds", target_col="y") + + # Create update with aligned min/max but misaligned middle timestamp + if engine == "polars": + last_vals = series.join( + series.group_by("unique_id").agg(pl.col("ds").max()), + on=["unique_id", "ds"], + ) + # Create: [last+1h, last+1.5h, last+2h] for hourly (1.5h is misaligned) + # or [last+1d, last+1.5d, last+2d] for daily (1.5d is misaligned) + update = pl.concat( + [ + last_vals.with_columns(pl.col("ds").dt.offset_by(freq_config["polars_offset1"])), + last_vals.with_columns( + pl.col("ds") + .dt.offset_by(freq_config["polars_offset1"]) + .dt.offset_by(freq_config["polars_offset_misaligned"]) + ), + last_vals.with_columns(pl.col("ds").dt.offset_by(freq_config["polars_offset2"])), + ] + ) + else: + last_vals = series.groupby("unique_id", observed=True).tail(1).copy() + # Create three timestamps: aligned, misaligned, aligned + df1 = last_vals.assign(ds=last_vals["ds"] + freq_config["pandas_offset1"]) + df2 = last_vals.assign( + ds=last_vals["ds"] + + freq_config["pandas_offset1"] + + freq_config["pandas_offset_misaligned"] + ) + df3 = last_vals.assign(ds=last_vals["ds"] + freq_config["pandas_offset2"]) + update = pd.concat([df1, df2, df3], ignore_index=True) + + # This SHOULD raise an error but currently PASSES (bug!) + # When the validation is fixed, this test will pass by catching the error + with pytest.raises(ValueError, match="aligned|gaps or duplicate"): + ts.update(update, validate_input=True) + + def test_ts_polars(): two_series = generate_daily_series(2, n_static_features=2, engine="polars") ts = TimeSeries(freq="1d", lags=[1], date_features=["weekday"]) From f4cc87e191982b697822ecfe4e226c6b5eb61019 Mon Sep 17 00:00:00 2001 From: janrth Date: Sun, 18 Jan 2026 13:26:45 +0100 Subject: [PATCH 8/8] using offset_times from utilsforecast in core functionality for validate update function --- mlforecast/core.py | 59 +++++++++++++--------------------------------- tests/test_core.py | 7 +++--- 2 files changed, 20 insertions(+), 46 deletions(-) diff --git a/mlforecast/core.py b/mlforecast/core.py index eee8efc2..4557ed51 100644 --- a/mlforecast/core.py +++ b/mlforecast/core.py @@ -865,6 +865,7 @@ def load(path: Union[str, Path], protocol: Optional[str] = None) -> "TimeSeries" def _validate_new_df(self, df: DataFrame) -> None: if isinstance(df, pl.DataFrame): + df = df.sort([self.id_col, self.time_col]) stats = ( df.group_by(self.id_col) .agg( @@ -894,34 +895,24 @@ def _validate_new_df(self, df: DataFrame) -> None: "Series have invalid start dates. " f"Expected start at last_date + freq for: {bad_ids}." ) - if isinstance(self.freq, int): - diffs = pl.col("_max") - pl.col("_min") - misaligned = stats.filter((diffs % self.freq) != 0) - if misaligned.height: - raise ValueError( - "Found timestamps not aligned to the configured frequency." - ) - expected_count = diffs // self.freq + 1 - else: - delta = pd.Timedelta(pd.tseries.frequencies.to_offset(self.freq)) - delta_ns = delta.value - min_ns = pl.col("_min").dt.timestamp("ns") - max_ns = pl.col("_max").dt.timestamp("ns") - diffs_ns = max_ns - min_ns - misaligned = stats.filter((diffs_ns % delta_ns) != 0) - if misaligned.height: - raise ValueError( - "Found timestamps not aligned to the configured frequency." - ) - expected_count = diffs_ns // delta_ns + 1 - gaps = stats.filter(expected_count != pl.col("_size")) + expected_next = ufp.offset_times(df[self.time_col], self.freq, 1) + df_check = df.with_columns( + pl.Series(name="_expected_next", values=expected_next) + ).with_columns( + pl.col(self.time_col).shift(-1).over(self.id_col).alias("_next") + ) + gaps = df_check.filter( + pl.col("_next").is_not_null() + & (pl.col("_expected_next") != pl.col("_next")) + ) if gaps.height: - bad_ids = gaps[self.id_col].to_list() + bad_ids = gaps[self.id_col].unique().to_list() raise ValueError( "Found gaps or duplicate timestamps in the update for: " f"{bad_ids}." ) return + df = df.sort_values([self.id_col, self.time_col]) stats = ( df.groupby(self.id_col, observed=True)[self.time_col] .agg(["min", "max", "size"]) @@ -947,27 +938,11 @@ def _validate_new_df(self, df: DataFrame) -> None: "Series have invalid start dates. " f"Expected start at last_date + freq for: {bad_ids}." ) - diffs = stats["_max"] - stats["_min"] - if isinstance(self.freq, int): - delta = self.freq - remainder = diffs % delta - if (remainder != 0).any(): - raise ValueError( - "Found timestamps not aligned to the configured frequency." - ) - expected_count = diffs // delta + 1 - else: - offset = pd.tseries.frequencies.to_offset(self.freq) - delta = pd.Timedelta(offset) - remainder = diffs % delta - if (remainder != pd.Timedelta(0)).any(): - raise ValueError( - "Found timestamps not aligned to the configured frequency." - ) - expected_count = diffs // delta + 1 - gaps = expected_count != stats["_size"] + expected_next = ufp.offset_times(df[self.time_col], self.freq, 1) + next_time = df.groupby(self.id_col, observed=True)[self.time_col].shift(-1) + gaps = next_time.notna() & (expected_next != next_time) if gaps.any(): - bad_ids = stats.loc[gaps, self.id_col].tolist() + bad_ids = df.loc[gaps, self.id_col].unique().tolist() raise ValueError( "Found gaps or duplicate timestamps in the update for: " f"{bad_ids}." diff --git a/tests/test_core.py b/tests/test_core.py index ca9b0597..4a8e1016 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -567,6 +567,7 @@ def _generate_series_with_freq(n_series, freq_name, n_static_features, engine): from utilsforecast.data import generate_series from math import ceil, log10 + max_length = 200 if freq_name == "yearly" else 500 freq_map = { "hourly": "h", "daily": "D", @@ -579,7 +580,7 @@ def _generate_series_with_freq(n_series, freq_name, n_static_features, engine): n_series=n_series, freq=freq_map[freq_name], min_length=50, - max_length=500, + max_length=max_length, n_static_features=n_static_features, equal_ends=False, static_as_categorical=True, @@ -881,7 +882,7 @@ def test_update_validation_frequency_mismatch(engine, freq_name): ], ignore_index=True, ) - with pytest.raises(ValueError, match="aligned"): + with pytest.raises(ValueError, match="gaps or duplicate"): ts.update(update, validate_input=True) @@ -931,8 +932,6 @@ def test_update_validation_misaligned_intermediate_timestamp(engine, freq_name): df3 = last_vals.assign(ds=last_vals["ds"] + freq_config["pandas_offset2"]) update = pd.concat([df1, df2, df3], ignore_index=True) - # This SHOULD raise an error but currently PASSES (bug!) - # When the validation is fixed, this test will pass by catching the error with pytest.raises(ValueError, match="aligned|gaps or duplicate"): ts.update(update, validate_input=True)