Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 57 additions & 20 deletions nixtla/nixtla_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def _maybe_add_date_features(
else:
date_features = _date_features_by_freq.get(freq, [])
if not date_features:
warnings.warn(
logger.warning(
f"Non default date features for {freq} "
"please provide a list of date features"
)
Expand Down Expand Up @@ -404,7 +404,7 @@ def _validate_exog(
# all exogs must be historic
ignored_exogs = [c for c in exogs if c not in hist_exog]
if ignored_exogs:
warnings.warn(
logger.warning(
f"`df` contains the following exogenous features: {ignored_exogs}, "
"but `X_df` was not provided and they were not declared in `hist_exog_list`. "
"They will be ignored."
Expand All @@ -418,7 +418,7 @@ def _validate_exog(
declared_exogs = {*hist_exog, *futr_exog}
ignored_exogs = [c for c in exogs if c not in declared_exogs]
if ignored_exogs:
warnings.warn(
logger.warning(
f"`df` contains the following exogenous features: {ignored_exogs}, "
"but they were not found in `X_df` nor declared in `hist_exog_list`. "
"They will be ignored."
Expand All @@ -435,7 +435,7 @@ def _validate_exog(
# features are provided through X_df but declared as historic
futr_and_hist = set(futr_exog) & set(hist_exog)
if futr_and_hist:
warnings.warn(
logger.warning(
"The following features were declared as historic but found in `X_df`: "
f"{futr_and_hist}, they will be considered as historic."
)
Expand Down Expand Up @@ -541,15 +541,40 @@ def _preprocess(
return processed, X_future, x_cols, futr_cols


def _forecast_payload_to_in_sample(payload):
in_sample_payload = {
k: v
for k, v in payload.items()
if k not in ("h", "finetune_steps", "finetune_loss", "finetune_depth")
}
del in_sample_payload["series"]["X_future"]
return in_sample_payload
def _forecast_payload_to_in_sample(payload: dict, h: int, n_windows: int) -> dict:
# No finetuning for in-sample
payload["finetune_steps"] = 0

# historic exogenous features
hist_exog = None
if payload["series"]["X"] is not None:
n_features = len(payload["series"]["X"])
hist_exog = list(range(n_features))
if payload["series"]["X_future"] is not None:
n_futr_exog = len(payload["series"]["X_future"])
hist_exog = hist_exog[n_futr_exog:]
payload["hist_exog"] = hist_exog
del payload["series"]["X_future"]

# in-sample horizon and number of windows
payload["h"] = h
payload["step_size"] = h
payload["n_windows"] = n_windows

return payload

def _get_in_sample_horizon_and_windows(
sizes: np.ndarray,
model_horizon: int,
model_input_size: int,
) -> tuple[int, int]:

# in-sample horizon and number of windows
min_size = min(sizes)
h = min(model_horizon, min_size - 1)
n_windows = max((min_size - model_input_size) // model_horizon, 1)

return h, n_windows

def _maybe_add_intervals(
df: DFType,
Expand Down Expand Up @@ -1007,7 +1032,7 @@ def _make_partitioned_requests(

def _maybe_override_model(self, model: _Model) -> _Model:
if self._is_azure and model != "azureai":
warnings.warn("Azure endpoint detected, setting `model` to 'azureai'.")
logger.warning("Azure endpoint detected, setting `model` to 'azureai'.")
model = "azureai"
return model

Expand Down Expand Up @@ -1059,7 +1084,7 @@ def _maybe_assign_feature_contributions(
return
if "feature_contributions" not in resp:
if self._is_azure:
warnings.warn("feature_contributions aren't implemented in Azure yet.")
logger.warning("feature_contributions aren't implemented in Azure yet.")
return
else:
raise RuntimeError(
Expand Down Expand Up @@ -1633,7 +1658,7 @@ def forecast(
if finetune_steps > 0:
_validate_input_size(processed, 1, 1)
if add_history:
_validate_input_size(processed, model_input_size, model_horizon)
_validate_input_size(processed, 1, 1)
if h > model_horizon:
logger.warning(
'The specified horizon "h" exceeds the model horizon, '
Expand All @@ -1660,10 +1685,11 @@ def forecast(
X = None

logger.info("Calling Forecast Endpoint...")
sizes = np.diff(processed.indptr)
payload = {
"series": {
"y": processed.data[:, 0],
"sizes": np.diff(processed.indptr),
"sizes": sizes,
"X": X,
"X_future": X_future,
},
Expand All @@ -1687,10 +1713,15 @@ def forecast(
if num_partitions is None:
resp = self._make_request_with_retries(client, "v2/forecast", payload)
if add_history:
in_sample_payload = _forecast_payload_to_in_sample(payload)
insample_h, n_windows = _get_in_sample_horizon_and_windows(
sizes,
model_horizon,
model_input_size,
)
in_sample_payload = _forecast_payload_to_in_sample(payload, insample_h, n_windows)
logger.info("Calling Historical Forecast Endpoint...")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw keeping the same logging message here to not confuse users

in_sample_resp = self._make_request_with_retries(
client, "v2/historic_forecast", in_sample_payload
client, "v2/cross_validation", in_sample_payload
)
insample_feat_contributions = in_sample_resp.get(
"feature_contributions", None
Expand All @@ -1699,12 +1730,17 @@ def forecast(
payloads = _partition_series(payload, num_partitions, h)
resp = self._make_partitioned_requests(client, "v2/forecast", payloads)
if add_history:
insample_h, n_windows = _get_in_sample_horizon_and_windows(
sizes,
model_horizon,
model_input_size,
)
in_sample_payloads = [
_forecast_payload_to_in_sample(p) for p in payloads
_forecast_payload_to_in_sample(p, insample_h, n_windows) for p in payloads
]
logger.info("Calling Historical Forecast Endpoint...")
in_sample_resp = self._make_partitioned_requests(
client, "v2/historic_forecast", in_sample_payloads
client, "v2/cross_validation", in_sample_payloads
)
insample_feat_contributions = in_sample_resp.get(
"feature_contributions", None
Expand All @@ -1719,6 +1755,7 @@ def forecast(
id_col=id_col,
time_col=time_col,
)
# assert 1==2, f"Debugging: {resp["mean"].shape}, {out.shape}"
out = ufp.assign_columns(out, "TimeGPT", resp["mean"])
out = _maybe_add_intervals(out, resp["intervals"])
if add_history:
Expand Down
7 changes: 6 additions & 1 deletion nixtla_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
# note that scope="session" will result in failed test
@pytest.fixture(scope="module")
def nixtla_test_client():
client = NixtlaClient()
# client = NixtlaClient()
client = NixtlaClient(
base_url="http://localhost:8000",
api_key="12345",
timeout=600,
)
yield client

try:
Expand Down
8 changes: 4 additions & 4 deletions nixtla_tests/nixtla_client/test_nixtla_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,19 +255,19 @@ def test_forecast_quantiles_output(
("cross_validation", {"h": 7, "n_windows": 2}, False),
("forecast", {"h": 7, "add_history": True}, False),
("detect_anomalies", {"level": 98}, True),
("cross_validation", {"h": 7, "n_windows": 2}, False),
("forecast", {"h": 7, "add_history": True}, False),
("cross_validation", {"h": 7, "n_windows": 2}, True),
("forecast", {"h": 7, "add_history": True}, True),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these two tests should be "True", otherwise they are identical to the ones above it.

],
)
def test_num_partitions_same_results_parametrized(
nixtla_test_client, df_freq_generator, method_name, method_kwargs, freq, exog
):
mathod_mapper = {
method_mapper = {
"detect_anomalies": nixtla_test_client.detect_anomalies,
"cross_validation": nixtla_test_client.cross_validation,
"forecast": nixtla_test_client.forecast,
}
method = mathod_mapper[method_name]
method = method_mapper[method_name]

df_freq = df_freq_generator(n_series=10, min_length=500, max_length=550, freq=freq)
df_freq["ds"] = df_freq.groupby("unique_id", observed=True)["ds"].transform(
Expand Down
Loading