Skip to content

Commit 9bda122

Browse files
authored
[FEAT] Implements direct forecasting with specific individual horizons (#556)
1 parent 8cdc4d8 commit 9bda122

4 files changed

Lines changed: 1082 additions & 101 deletions

File tree

mlforecast/core.py

Lines changed: 145 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,37 @@ def _as_tuple(x):
142142
Transforms = Dict[str, Union[Tuple[Any, ...], _BaseLagTransform]]
143143

144144

145+
def _validate_horizon_params(
146+
max_horizon: Optional[int], horizons: Optional[List[int]]
147+
) -> Tuple[Optional[List[int]], Optional[int]]:
148+
"""Validate and normalize horizon parameters.
149+
150+
Args:
151+
max_horizon: Train models for all horizons 1 to max_horizon.
152+
horizons: Train models only for specific horizons (1-indexed).
153+
154+
Returns:
155+
Tuple of (internal_horizons, effective_max_horizon):
156+
- internal_horizons: 0-indexed list of horizons, or None for recursive mode
157+
- effective_max_horizon: Maximum horizon value (for target expansion)
158+
"""
159+
if max_horizon is not None and horizons is not None:
160+
raise ValueError("Cannot specify both 'max_horizon' and 'horizons'")
161+
162+
if horizons is not None:
163+
if not horizons:
164+
raise ValueError("'horizons' cannot be empty")
165+
if not all(isinstance(h, int) and h > 0 for h in horizons):
166+
raise ValueError("All horizons must be positive integers")
167+
horizons = sorted(set(horizons)) # dedupe and sort
168+
return [h - 1 for h in horizons], max(horizons) # 0-indexed, max
169+
170+
if max_horizon is not None:
171+
return list(range(max_horizon)), max_horizon
172+
173+
return None, None
174+
175+
145176
def _parse_transforms(
146177
lags: Lags,
147178
lag_transforms: LagTransforms,
@@ -529,12 +560,27 @@ def _transform(
529560
df: DFType,
530561
dropna: bool = True,
531562
max_horizon: Optional[int] = None,
563+
horizons: Optional[List[int]] = None,
532564
return_X_y: bool = False,
533565
as_numpy: bool = False,
534566
) -> DFType:
535567
"""Add the features to `df`.
536568
537-
if `dropna=True` then all the null rows are dropped."""
569+
if `dropna=True` then all the null rows are dropped.
570+
571+
Args:
572+
df: Input dataframe
573+
dropna: Drop rows with missing values
574+
max_horizon: Train models for all horizons 1 to max_horizon
575+
horizons: Train models only for specific horizons (1-indexed)
576+
return_X_y: Return tuple of (X, y) instead of dataframe
577+
as_numpy: Convert X to numpy array
578+
"""
579+
# Validate and normalize horizon parameters
580+
self._horizons, effective_max_horizon = _validate_horizon_params(
581+
max_horizon, horizons
582+
)
583+
538584
# we need to compute all transformations in case they save state
539585
features = self._compute_transforms(
540586
transforms=self.transforms, updates_only=False
@@ -595,11 +641,11 @@ def _transform(
595641
features[k] = v[self._restore_idxs]
596642

597643
# target
598-
self.max_horizon = max_horizon
599-
if max_horizon is None:
644+
self.max_horizon = effective_max_horizon
645+
if effective_max_horizon is None:
600646
target = self.ga.data
601647
else:
602-
target = self.ga.expand_target(max_horizon)
648+
target = self.ga.expand_target(effective_max_horizon)
603649
if self._restore_idxs is not None:
604650
target = target[self._restore_idxs]
605651

@@ -698,11 +744,11 @@ def _transform(
698744
if as_numpy:
699745
X = ufp.to_numpy(X)
700746
return X, target
701-
if max_horizon is not None:
747+
if effective_max_horizon is not None:
702748
# remove original target
703749
out_cols = [c for c in df.columns if c != self.target_col]
704750
df = df[out_cols]
705-
target_names = [f"{self.target_col}{i}" for i in range(max_horizon)]
751+
target_names = [f"{self.target_col}{i}" for i in range(effective_max_horizon)]
706752
df = ufp.assign_columns(df, target_names, target)
707753
else:
708754
df = ufp.copy_if_pandas(df, deep=False)
@@ -713,11 +759,11 @@ def _transform_per_horizon(
713759
self,
714760
prep: DFType,
715761
original_df: DFType,
716-
max_horizon: int,
762+
horizons: List[int],
717763
target_col: str,
718764
as_numpy: bool = False,
719-
) -> Iterator[Tuple[Union[DFType, np.ndarray], np.ndarray]]:
720-
"""Generator that yields (X, y) tuples for each horizon.
765+
) -> Iterator[Tuple[int, Union[DFType, np.ndarray], np.ndarray]]:
766+
"""Generator that yields (h, X, y) tuples for each horizon.
721767
722768
For horizon h:
723769
- Dynamic exogenous features are aligned to predict h steps ahead
@@ -727,9 +773,12 @@ def _transform_per_horizon(
727773
Args:
728774
prep: Preprocessed dataframe with expanded targets (y0, y1, ..., y{max_horizon-1})
729775
original_df: Original input dataframe for exog feature lookup
730-
max_horizon: Number of horizons
776+
horizons: List of horizons to process (0-indexed)
731777
target_col: Name of target column
732778
as_numpy: Whether to convert X to numpy array
779+
780+
Yields:
781+
Tuple of (horizon_index, X, y) where horizon_index is 0-indexed
733782
"""
734783
exog_cols = self._get_dynamic_exog_cols(list(original_df.columns))
735784

@@ -742,18 +791,18 @@ def _transform_per_horizon(
742791
# Non-exog feature columns (lags, date features, static)
743792
non_exog_cols = [c for c in x_cols if c not in exog_cols]
744793

745-
# Target column names (y0, y1, ..., y{max_horizon-1})
746-
target_cols = [f"{target_col}{i}" for i in range(max_horizon)]
747-
748794
# Build exog lookup dictionary from original_df for efficient lookups
749795
# Key: (id, time) -> exog values
750796
if exog_cols:
751797
# Create a lookup dataframe indexed by (id, time)
752798
exog_lookup = original_df[[self.id_col, self.time_col] + exog_cols]
753799

754-
for h in range(max_horizon):
800+
for h in horizons:
801+
# Target column name for this horizon
802+
target_col_h = f"{target_col}{h}"
803+
755804
# Get target for this horizon
756-
y_h = prep[target_cols[h]].to_numpy()
805+
y_h = prep[target_col_h].to_numpy()
757806

758807
if h == 0 or not exog_cols:
759808
# No offset needed for horizon 0 or if no exog cols
@@ -809,7 +858,7 @@ def _transform_per_horizon(
809858
if as_numpy:
810859
X_h = ufp.to_numpy(X_h)
811860

812-
yield X_h, y_h
861+
yield h, X_h, y_h
813862

814863
def fit_transform(
815864
self,
@@ -821,6 +870,7 @@ def fit_transform(
821870
dropna: bool = True,
822871
keep_last_n: Optional[int] = None,
823872
max_horizon: Optional[int] = None,
873+
horizons: Optional[List[int]] = None,
824874
return_X_y: bool = False,
825875
as_numpy: bool = False,
826876
weight_col: Optional[str] = None,
@@ -830,6 +880,11 @@ def fit_transform(
830880
If not all features are static, specify which ones are in `static_features`.
831881
If you don't want to drop rows with null values after the transformations set `dropna=False`
832882
If `keep_last_n` is not None then that number of observations is kept across all series for updates.
883+
884+
Args:
885+
max_horizon: Train models for all horizons 1 to max_horizon.
886+
horizons: Train models only for specific horizons (1-indexed).
887+
Mutually exclusive with max_horizon.
833888
"""
834889
self.dropna = dropna
835890
self.as_numpy = as_numpy
@@ -846,6 +901,7 @@ def fit_transform(
846901
df=data,
847902
dropna=dropna,
848903
max_horizon=max_horizon,
904+
horizons=horizons,
849905
return_X_y=return_X_y,
850906
as_numpy=as_numpy,
851907
)
@@ -1038,7 +1094,7 @@ def _predict_recursive(
10381094

10391095
def _predict_multi(
10401096
self,
1041-
models: Dict[str, BaseEstimator],
1097+
models: Dict[str, Dict[int, BaseEstimator]],
10421098
horizon: int,
10431099
before_predict_callback: Optional[Callable] = None,
10441100
X_df: Optional[DFType] = None,
@@ -1048,27 +1104,84 @@ def _predict_multi(
10481104
raise ValueError(
10491105
f"horizon must be at most max_horizon ({self.max_horizon})"
10501106
)
1107+
1108+
# Determine horizons to predict based on _horizons (sparse) or all up to horizon
1109+
internal_horizons = getattr(self, "_horizons", None)
1110+
1111+
# Check if horizons are sparse (not a contiguous range from 0)
1112+
full_range = list(range(self.max_horizon))
1113+
is_sparse = internal_horizons is not None and internal_horizons != full_range
1114+
1115+
if is_sparse:
1116+
# Sparse horizons: filter to those <= requested horizon (0-indexed)
1117+
assert internal_horizons is not None # mypy: guaranteed by is_sparse check
1118+
horizons_to_predict = [h for h in internal_horizons if h < horizon]
1119+
if not horizons_to_predict:
1120+
raise ValueError(
1121+
f"No trained horizons available for prediction up to h={horizon}. "
1122+
f"Trained horizons (1-indexed): {[h + 1 for h in internal_horizons]}"
1123+
)
1124+
output_horizon = len(horizons_to_predict)
1125+
else:
1126+
# Full horizons: predict all from 0 to horizon-1
1127+
horizons_to_predict = list(range(horizon))
1128+
output_horizon = horizon
1129+
10511130
self._predict_setup()
1052-
uids = self._get_future_ids(horizon)
1053-
starts = ufp.offset_times(self.curr_dates, self.freq, 1)
1054-
dates = ufp.time_ranges(starts, self.freq, periods=horizon)
1055-
if isinstance(self.curr_dates, pl_Series):
1056-
df_constructor = pl_DataFrame
1131+
uids = self._get_future_ids(output_horizon)
1132+
1133+
if is_sparse:
1134+
# Generate dates only for the specific horizons we're predicting
1135+
# uids structure is [s0, s0, ..., s0 (output_horizon times), s1, s1, ..., s1, ...]
1136+
# So dates need to be [s0_h0, s0_h1, ..., s1_h0, s1_h1, ...]
1137+
if isinstance(self.curr_dates, pl_Series):
1138+
df_constructor = pl_DataFrame
1139+
# Compute dates for all horizons, then stack and flatten
1140+
dates_per_horizon = [ufp.offset_times(self.curr_dates, self.freq, h + 1) for h in horizons_to_predict]
1141+
# Stack: each row is a series, each col is a horizon
1142+
dates_matrix = pl.DataFrame(dates_per_horizon).transpose()
1143+
# Flatten row by row: [s0_h0, s0_h1, ..., s1_h0, s1_h1, ...]
1144+
dates = dates_matrix.to_numpy().ravel()
1145+
dates = pl.Series(dates)
1146+
else:
1147+
df_constructor = pd.DataFrame
1148+
# Compute dates for all horizons, then stack and flatten
1149+
dates_per_horizon = [ufp.offset_times(self.curr_dates, self.freq, h + 1) for h in horizons_to_predict]
1150+
# Stack: each row is a series, each col is a horizon
1151+
dates_matrix = np.column_stack(dates_per_horizon)
1152+
# Flatten row by row: [s0_h0, s0_h1, ..., s1_h0, s1_h1, ...]
1153+
dates = dates_matrix.ravel()
10571154
else:
1058-
df_constructor = pd.DataFrame
1155+
# Original behavior: generate contiguous date range
1156+
starts = ufp.offset_times(self.curr_dates, self.freq, 1)
1157+
dates = ufp.time_ranges(starts, self.freq, periods=horizon)
1158+
if isinstance(self.curr_dates, pl_Series):
1159+
df_constructor = pl_DataFrame
1160+
else:
1161+
df_constructor = pd.DataFrame
1162+
10591163
result = df_constructor({self.id_col: uids, self.time_col: dates})
1164+
10601165
for name, model in models.items():
10611166
with self._backup():
10621167
self._predict_setup()
1063-
predictions = np.empty((len(self.uids), horizon))
1064-
for i in range(horizon):
1065-
new_x = self._get_features_for_next_step(X_df)
1168+
predictions = np.empty((len(self.uids), output_horizon))
1169+
1170+
for out_idx, h in enumerate(horizons_to_predict):
1171+
# Advance features to the correct horizon step
1172+
# We need to step through all horizons up to h to maintain state
1173+
while self._h <= h:
1174+
new_x = self._get_features_for_next_step(X_df)
1175+
10661176
if before_predict_callback is not None:
10671177
new_x = before_predict_callback(new_x)
1068-
preds = model[i].predict(new_x)
1178+
1179+
horizon_model = model[h]
1180+
preds = horizon_model.predict(new_x)
10691181
if len(preds) != len(self.uids):
10701182
raise ValueError(f"Model returned {len(preds)} predictions but expected {len(self.uids)}")
1071-
predictions[:, i] = preds
1183+
predictions[:, out_idx] = preds
1184+
10721185
raw_preds = predictions.ravel()
10731186
result = ufp.assign_columns(result, name, raw_preds)
10741187
return result
@@ -1116,7 +1229,7 @@ def _maybe_subset(self, idxs: Optional[np.ndarray]) -> Iterator[None]:
11161229

11171230
def predict(
11181231
self,
1119-
models: Dict[str, Union[BaseEstimator, List[BaseEstimator]]],
1232+
models: Dict[str, Union[BaseEstimator, Dict[int, BaseEstimator]]],
11201233
horizon: int,
11211234
before_predict_callback: Optional[Callable] = None,
11221235
after_predict_callback: Optional[Callable] = None,
@@ -1210,7 +1323,9 @@ def predict(
12101323
for c in preds.columns
12111324
if c not in (self.id_col, self.time_col)
12121325
]
1213-
indptr = np.arange(0, horizon * (len(self.uids) + 1), horizon)
1326+
# Calculate actual predictions per series (handles sparse horizons)
1327+
preds_per_series = len(preds) // len(self.uids)
1328+
indptr = np.arange(0, preds_per_series * (len(self.uids) + 1), preds_per_series)
12141329
for tfm in self.target_transforms[::-1]:
12151330
if isinstance(tfm, _BaseGroupedArrayTargetTransform):
12161331
for col in model_cols:

0 commit comments

Comments
 (0)