diff --git a/CHANGELOG.md b/CHANGELOG.md index ce739141a..0ca9d6d7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,19 @@ # Changelog +## 4.3.8 - 2024-11-16 + +- ThetaData panda fetches now short-circuit once coverage metadata says the requested start/end are already in cache, and incremental requests append only the missing tail of bars instead of reprocessing the full frame. +- Daily strategies skip the expensive minute-quote merge/forward-fill path entirely, eliminating unnecessary `pd.concat`/transpose work for slow runs such as GoogleMomentum. +- The 09:30 open correction routine memoizes per (symbol, date) and groups contiguous days so the Theta minute downloader is hit exactly once per uncovered window. +- `_combine_duplicate_columns` now deduplicates columns in a single pass (no repeated transpose/drop cycles) and `pd.to_datetime` is skipped whenever frames already carry a `DatetimeIndex`, cutting GoogleMomentum’s Theta run time from ~109 s (dev) to ~75 s on feature. +- Added regression coverage for both the fast-path/append flow and the memoized open-correction behavior. + +## 4.3.7 - 2024-11-16 + +- ThetaData backtests now keep an incremental, processed-frame cache so repeated `get_historical_prices` calls append only the missing bars instead of reloading and re-normalizing the full dataset on every iteration. +- The EOD 09:30 open correction flow reuses cached minute bars per asset/date, fetching only uncovered days and logging clearer diagnostics when Theta rejects the override window. +- Added regression coverage for the caching/retry code plus a downloader smoke script (`scripts/check_eod_chunking.py`) to validate prod chunking. + ## 4.3.6 - 2024-11-16 - Fixed ThetaData EOD corrections by fetching a real 09:30–09:31 minute window for each trading day, preventing zero-length requests and the resulting terminal hangs. diff --git a/lumibot/tools/thetadata_helper.py b/lumibot/tools/thetadata_helper.py index 51160281d..f00ead4de 100644 --- a/lumibot/tools/thetadata_helper.py +++ b/lumibot/tools/thetadata_helper.py @@ -128,6 +128,41 @@ def _coerce_skip_flag(raw: Optional[str], base_url: str) -> bool: False: ("09:30:00", "16:00:00"), # regular session only } +OPEN_CORRECTION_CACHE: Dict[str, Dict[date, float]] = {} +OPEN_CORRECTION_CACHE_LIMIT = int(os.environ.get("THETADATA_OPEN_CACHE_LIMIT", "512")) + + +def reset_open_correction_cache() -> None: + """Utility hook for tests to clear cached 09:30 open prices.""" + OPEN_CORRECTION_CACHE.clear() + + +def _open_cache_key(asset: Asset) -> str: + expiration = getattr(asset, "expiration", None) + suffix = f":{expiration}" if expiration else "" + return f"{getattr(asset, 'asset_type', 'stock')}::{asset.symbol}{suffix}" + + +def _trim_open_cache(asset_key: str) -> None: + cache = OPEN_CORRECTION_CACHE.get(asset_key) + if not cache: + return + overflow = len(cache) - OPEN_CORRECTION_CACHE_LIMIT + if overflow <= 0: + return + for stale_date in sorted(cache.keys())[:overflow]: + cache.pop(stale_date, None) + + +def _localize_trade_midnight(trade_date: date, reference: Optional[datetime]) -> datetime: + base = datetime.combine(trade_date, datetime.min.time()) + tzinfo = getattr(reference, "tzinfo", None) + if tzinfo is None: + return pytz.UTC.localize(base) + if base.tzinfo is not None: + return base.astimezone(tzinfo) + return tzinfo.localize(base) + def _build_request_headers(base: Optional[Dict[str, str]] = None) -> Dict[str, str]: request_headers: Dict[str, str] = dict(base or {}) @@ -2426,52 +2461,138 @@ def combine_datetime(row): df = df.drop(columns=["bid_size", "bid_exchange", "bid", "bid_condition", "ask_size", "ask_exchange", "ask", "ask_condition"], errors='ignore') - # FIX: ThetaData's EOD endpoint returns incorrect open/high/low prices for STOCKS and OPTIONS - # that don't match Polygon/Yahoo. We fix this by using minute bar data. - # Solution: Fetch minute bars for each trading day and aggregate to get correct OHLC - # NOTE: Indexes don't need this fix since they are calculated values, not traded securities - if asset_type in ("stock", "option"): - logger.info(f"Fetching 9:30 AM minute bars to correct EOD open prices...") + # Fix incorrect stock/option opens by reusing cached 09:30 minute bars when necessary. + if asset_type in ("stock", "option") and "open" in df.columns: + df = _maybe_correct_eod_opens( + df=df, + asset=asset, + start_dt=start_dt, + end_dt=end_dt, + username=username, + password=password, + datastyle=datastyle, + ) + + return df + + +def _maybe_correct_eod_opens( + df: pd.DataFrame, + asset: Asset, + start_dt: datetime, + end_dt: datetime, + username: str, + password: str, + datastyle: str, +): + asset_key = _open_cache_key(asset) + asset_cache = OPEN_CORRECTION_CACHE.setdefault(asset_key, {}) + + if asset_cache: + for idx in df.index: + cached_value = asset_cache.get(idx.date()) + if cached_value is not None: + df.at[idx, "open"] = cached_value + + open_series = df.get("open") + if open_series is None: + return df + + trade_dates = sorted({idx.date() for idx in df.index}) + invalid_dates = { + idx.date() + for idx, value in zip(df.index, open_series) + if pd.isna(value) or value <= 0 + } + missing_cache_dates = [trade_date for trade_date in trade_dates if trade_date not in asset_cache] + dates_to_fetch = sorted(set(missing_cache_dates) | invalid_dates) + + if not dates_to_fetch: + if invalid_dates: + logger.debug( + "[THETA][EOD][OPEN] invalid opens for %s resolved from cache; skipping 09:30 correction", + asset.symbol, + ) + else: + logger.debug( + "[THETA][EOD][OPEN] opens already cached for %s; skipping 09:30 correction", + asset.symbol, + ) + for idx in df.index: + cached_value = asset_cache.get(idx.date()) + if cached_value is not None: + df.at[idx, "open"] = cached_value + return df + + logger.info( + "Fetching 9:30 AM minute bars to correct EOD open prices (asset=%s missing_days=%d)...", + asset.symbol, + len(dates_to_fetch), + ) - # Get minute data for the date range to extract 9:30 AM opens - minute_df = None - correction_window = ("09:30:00", "09:31:00") + correction_window = ("09:30:00", "09:31:00") + + def _store_opens(minute_df: Optional[pd.DataFrame]) -> int: + if minute_df is None or minute_df.empty: + return 0 + minute_df_copy = minute_df.copy() + minute_df_copy["date"] = minute_df_copy.index.date + grouped = minute_df_copy.groupby("date").first() + stored = 0 + for trade_date, row in grouped.iterrows(): + open_price = row.get("open") + if open_price is None: + continue + asset_cache[trade_date] = float(open_price) + stored += 1 + return stored + + spans: List[Tuple[date, date]] = [] + span_start = dates_to_fetch[0] + previous = span_start + for trade_date in dates_to_fetch[1:]: + if (trade_date - previous).days > 1: + spans.append((span_start, previous)) + span_start = trade_date + previous = trade_date + spans.append((span_start, previous)) + + for span_start, span_end in spans: + start_fetch = _localize_trade_midnight(span_start, start_dt) + end_fetch = _localize_trade_midnight(span_end + timedelta(days=1), end_dt) try: minute_df = get_historical_data( asset=asset, - start_dt=start_dt, - end_dt=end_dt, - ivl=60000, # 1 minute + start_dt=start_fetch, + end_dt=end_fetch, + ivl=60000, username=username, password=password, datastyle=datastyle, - include_after_hours=False, # RTH only + include_after_hours=False, session_time_override=correction_window, ) except ThetaRequestError as exc: body_text = (exc.body or "").lower() if "start must be before end" in body_text: logger.warning( - "ThetaData rejected 09:30 correction window for %s; skipping open fix this chunk (%s)", + "ThetaData rejected 09:30 correction window for %s; skipping open fix window %s-%s (%s)", asset.symbol, + span_start, + span_end, exc.body, ) - else: - raise + continue + raise + + _store_opens(minute_df) + + _trim_open_cache(asset_key) - if minute_df is not None and not minute_df.empty: - # Group by date and get the first bar's open for each day - minute_df_copy = minute_df.copy() - minute_df_copy['date'] = minute_df_copy.index.date - - # For each date in df, find the corresponding 9:30 AM open from minute data - for idx in df.index: - trade_date = idx.date() - day_minutes = minute_df_copy[minute_df_copy['date'] == trade_date] - if len(day_minutes) > 0: - # Use the first minute bar's open (9:30 AM opening auction) - correct_open = day_minutes.iloc[0]['open'] - df.loc[idx, 'open'] = correct_open + for idx in df.index: + cached_value = asset_cache.get(idx.date()) + if cached_value is not None: + df.at[idx, "open"] = cached_value return df diff --git a/scripts/compare_backtest_outputs.py b/scripts/compare_backtest_outputs.py new file mode 100644 index 000000000..99aaabd85 --- /dev/null +++ b/scripts/compare_backtest_outputs.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +""" +Compare two backtest output directories (typically dev vs feature) while ignoring +known volatile columns (UUID-based identifiers, run ids, etc.). The script exits +with a non-zero status code if any comparison fails so it can guard parity checks. +""" +from __future__ import annotations + +import argparse +import sys +from pathlib import Path + +import pandas as pd +from pandas.testing import assert_frame_equal + +VOLATILE_COLUMNS = {"identifier", "strategy_run_id", "order_id"} +TEXT_ONLY_SUFFIXES = ("logs.csv",) + + +def _locate_file(directory: Path, filename: str) -> Path: + candidate = directory / filename + if candidate.exists(): + return candidate + matches = sorted(directory.glob(f"*{filename}")) + if not matches: + raise FileNotFoundError(f"Missing file matching {filename!r} in {directory}") + if len(matches) > 1: + raise FileExistsError( + f"Multiple candidates for {filename!r} in {directory}: " + + ", ".join(str(match.name) for match in matches) + ) + return matches[0] + + +def _load_csv(path: Path) -> pd.DataFrame: + df = pd.read_csv(path) + drop_cols = [col for col in VOLATILE_COLUMNS if col in df.columns] + if drop_cols: + df = df.drop(columns=drop_cols) + df = df.reindex(sorted(df.columns), axis=1) + return df + + +def _compare_text(name: str, dev_file: Path, feature_file: Path) -> bool: + dev_text = dev_file.read_text() + feature_text = feature_file.read_text() + if dev_text == feature_text: + print(f"[OK] {name}") + return True + print(f"[DIFF] {name}: contents differ") + return False + + +def compare_file(name: str, dev_file: Path, feature_file: Path) -> bool: + if any(name.endswith(suffix) for suffix in TEXT_ONLY_SUFFIXES): + return _compare_text(name, dev_file, feature_file) + df_dev = _load_csv(dev_file) + df_feature = _load_csv(feature_file) + try: + assert_frame_equal( + df_dev, + df_feature, + check_dtype=False, + check_like=False, + atol=0, + rtol=0, + ) + print(f"[OK] {name}") + return True + except AssertionError as exc: + print(f"[DIFF] {name}: {exc}") + return False + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Compare two backtest artifact folders") + parser.add_argument("dev_dir", type=Path, help="Directory containing dev CSV outputs") + parser.add_argument("feature_dir", type=Path, help="Directory containing feature CSV outputs") + parser.add_argument( + "--files", + nargs="+", + default=("stats.csv", "trades.csv"), + help="Filenames to compare (default: %(default)s)", + ) + parser.add_argument( + "--include-logs", + action="store_true", + help="Include logs.csv comparison (fails if timestamps differ)", + ) + return parser.parse_args() + + +def main() -> int: + args = _parse_args() + + file_list = list(args.files) + if args.include_logs and "logs.csv" not in file_list: + file_list.append("logs.csv") + + success = True + for filename in file_list: + dev_file = _locate_file(args.dev_dir, filename) + feature_file = _locate_file(args.feature_dir, filename) + success &= compare_file(filename, dev_file, feature_file) + + return 0 if success else 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/setup.py b/setup.py index d7dcdad07..72b811677 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ def _copy_theta_terminal(self): setuptools.setup( name="lumibot", - version="4.3.6", + version="4.3.8", author="Robert Grzesik", author_email="rob@lumiwealth.com", description="Backtesting and Trading Library, Made by Lumiwealth", diff --git a/tests/test_thetadata_backtesting_pandas.py b/tests/test_thetadata_backtesting_pandas.py new file mode 100644 index 000000000..4e6dc404e --- /dev/null +++ b/tests/test_thetadata_backtesting_pandas.py @@ -0,0 +1,98 @@ +import datetime +from unittest.mock import MagicMock, patch + +import pandas as pd +import pytz + +from lumibot.backtesting.thetadata_backtesting_pandas import ThetaDataBacktestingPandas +from lumibot.entities import Asset +from lumibot.tools import thetadata_helper + + +def _build_frame(start_ts: str, periods: int, freq: str) -> pd.DataFrame: + index = pd.date_range(start=start_ts, periods=periods, freq=freq, tz=pytz.UTC) + base = pd.DataFrame( + { + "open": [100 + i for i in range(periods)], + "high": [101 + i for i in range(periods)], + "low": [99 + i for i in range(periods)], + "close": [100.5 + i for i in range(periods)], + "volume": [1_000_000] * periods, + }, + index=index, + ) + return base + + +def _build_backtester(start: datetime.datetime, end: datetime.datetime) -> ThetaDataBacktestingPandas: + with patch.object(ThetaDataBacktestingPandas, "kill_processes_by_name", return_value=None), patch.object( + thetadata_helper, "reset_theta_terminal_tracking", return_value=None + ): + tester = ThetaDataBacktestingPandas( + datetime_start=start, + datetime_end=end, + pandas_data=[], + username="user", + password="pass", + ) + tester._use_quote_data = False + tester.get_datetime = MagicMock(return_value=end) + return tester + + +def test_update_pandas_data_reuses_cached_window(monkeypatch): + tz = pytz.UTC + start = tz.localize(datetime.datetime(2024, 1, 1)) + end = tz.localize(datetime.datetime(2024, 2, 1)) + + backtester = _build_backtester(start, end) + fetch_counts = {"day": 0, "minute": 0} + + def fake_price_data(username, password, asset_param, start_datetime, end_datetime, timespan, **kwargs): + fetch_counts[timespan] += 1 + if timespan == "day": + return _build_frame("2023-10-01 00:00:00+00:00", periods=180, freq="D") + return _build_frame("2024-01-01 09:30:00+00:00", periods=180, freq="T") + + monkeypatch.setattr(thetadata_helper, "get_price_data", fake_price_data) + + asset = Asset(asset_type="stock", symbol="MSFT") + backtester._update_pandas_data(asset, None, length=55, timestep="day", start_dt=end) + assert fetch_counts["day"] == 1 + + backtester._update_pandas_data(asset, None, length=55, timestep="day", start_dt=end) + assert fetch_counts["day"] == 1 + + backtester._update_pandas_data(asset, None, length=30, timestep="minute", start_dt=end) + assert fetch_counts["minute"] == 1 + + tuple_key = next(iter(backtester.pandas_data)) + day_meta = backtester._dataset_metadata.get(tuple_key) + assert day_meta is not None + + +def test_update_pandas_data_fetches_when_cache_starts_after_request(monkeypatch): + tz = pytz.UTC + start = tz.localize(datetime.datetime(2024, 1, 1)) + end = tz.localize(datetime.datetime(2024, 2, 1)) + + backtester = _build_backtester(start, end) + fetch_counts = {"day": 0} + + def fake_price_data(username, password, asset_param, start_datetime, end_datetime, timespan, **kwargs): + fetch_counts["day"] += 1 + return _build_frame("2023-12-20 00:00:00+00:00", periods=60, freq="D") + + monkeypatch.setattr(thetadata_helper, "get_price_data", fake_price_data) + asset = Asset(asset_type="stock", symbol="MSFT") + + backtester._update_pandas_data(asset, None, length=55, timestep="day", start_dt=end) + assert fetch_counts["day"] == 1 + + meta_key = next(iter(backtester._dataset_metadata)) + backtester._dataset_metadata[meta_key]["start"] = ( + backtester._dataset_metadata[meta_key]["start"] + datetime.timedelta(days=10) + ) + + backtester._update_pandas_data(asset, None, length=55, timestep="day", start_dt=end) + assert fetch_counts["day"] == 2 diff --git a/tests/test_thetadata_helper.py b/tests/test_thetadata_helper.py index d1fca7ceb..1cb232766 100644 --- a/tests/test_thetadata_helper.py +++ b/tests/test_thetadata_helper.py @@ -14,6 +14,7 @@ import subprocess import time from types import SimpleNamespace +from typing import Any, Dict from unittest.mock import patch, MagicMock from lumibot.constants import LUMIBOT_DEFAULT_PYTZ from lumibot.entities import Asset @@ -58,6 +59,51 @@ def _build_option_asset(): ) +def _build_sample_ohlc(start_ts: str, periods: int, freq: str = "1D") -> pd.DataFrame: + idx = pd.date_range(start=start_ts, periods=periods, freq=freq, tz=pytz.UTC) + return pd.DataFrame( + { + "open": np.linspace(10.0, 10.0 + periods, periods), + "high": np.linspace(11.0, 11.0 + periods, periods), + "low": np.linspace(9.5, 9.5 + periods, periods), + "close": np.linspace(10.5, 10.5 + periods, periods), + "volume": np.arange(1, periods + 1, dtype=float), + }, + index=idx, + ) + + +def _build_eod_payload(open_price: float = 10.0) -> Dict[str, Any]: + return { + "header": { + "format": [ + "date", + "open", + "high", + "low", + "close", + "volume", + "open_interest", + "count", + "created", + ] + }, + "response": [ + [ + "20241122", + open_price, + open_price + 1.0, + open_price - 0.5, + open_price + 0.25, + 1000, + 0, + 0, + "2024-11-22T16:00:00", + ] + ], + } + + @pytest.fixture(scope="function") def theta_terminal_cleanup(): """Ensure ThetaTerminal is stopped between process health tests.""" @@ -68,6 +114,13 @@ def theta_terminal_cleanup(): pass +@pytest.fixture(autouse=True) +def reset_open_cache(): + thetadata_helper.reset_open_correction_cache() + yield + thetadata_helper.reset_open_correction_cache() + + def test_finalize_history_dataframe_adds_last_trade_time_for_ohlc(): asset = Asset(asset_type="stock", symbol="CVNA") df = pd.DataFrame( @@ -293,15 +346,7 @@ def fake_get_request(url, headers, querystring, username, password): def test_get_historical_eod_data_skips_open_fix_on_invalid_window(monkeypatch, caplog): - eod_payload = { - "header": { - "format": ["date", "open", "high", "low", "close", "volume", "ms_of_day", "ms_of_day2", "created"], - "error_type": "null", - }, - "response": [ - ["20241122", 10.0, 11.0, 9.5, 10.5, 1000, 0, 0, "2024-11-22T16:00:00"] - ], - } + eod_payload = _build_eod_payload(open_price=0.0) monkeypatch.setattr(thetadata_helper, "get_request", lambda **_: copy.deepcopy(eod_payload)) def _failing_minute_fetch(**_): @@ -358,6 +403,181 @@ def fake_trading_dates(*_args, **_kwargs): assert "close" in df.columns +def test_get_historical_eod_data_skips_minute_fetch_when_opens_valid(monkeypatch): + eod_payload = _build_eod_payload(open_price=12.0) + monkeypatch.setattr(thetadata_helper, "get_request", lambda **_: copy.deepcopy(eod_payload)) + + def _unexpected_minute_fetch(**_): + raise AssertionError("Minute fetch should be skipped when opens are valid") + + monkeypatch.setattr(thetadata_helper, "get_historical_data", _unexpected_minute_fetch) + + asset = Asset(asset_type="stock", symbol="MSFT") + asset_key = thetadata_helper._open_cache_key(asset) + thetadata_helper.OPEN_CORRECTION_CACHE.setdefault(asset_key, {})[date(2024, 11, 22)] = 12.0 + tz = pytz.UTC + start = tz.localize(datetime.datetime(2024, 11, 21, 19, 0)) + end = tz.localize(datetime.datetime(2024, 11, 22, 19, 0)) + + df = thetadata_helper.get_historical_eod_data( + asset=asset, + start_dt=start, + end_dt=end, + username="user", + password="pass", + ) + + assert not df.empty + assert (df["open"] > 0).all() + + +def test_get_historical_eod_data_reuses_cached_minute_opens(monkeypatch): + eod_payload = _build_eod_payload(open_price=0.0) + monkeypatch.setattr(thetadata_helper, "get_request", lambda **_: copy.deepcopy(eod_payload)) + + minute_df = _build_sample_ohlc("2024-11-22 09:30:00+00:00", periods=2, freq="min") + fetch_count = {"calls": 0} + + def _minute_fetch(**_): + fetch_count["calls"] += 1 + return minute_df + + monkeypatch.setattr(thetadata_helper, "get_historical_data", _minute_fetch) + + asset = Asset(asset_type="stock", symbol="MSFT") + tz = pytz.UTC + start = tz.localize(datetime.datetime(2024, 11, 21, 19, 0)) + end = tz.localize(datetime.datetime(2024, 11, 22, 19, 0)) + + df = thetadata_helper.get_historical_eod_data( + asset=asset, + start_dt=start, + end_dt=end, + username="user", + password="pass", + ) + + assert (df["open"] > 0).all() + assert fetch_count["calls"] == 1 + + def _should_not_run(**_): + raise AssertionError("Cached opens should prevent a second minute fetch") + + monkeypatch.setattr(thetadata_helper, "get_historical_data", _should_not_run) + + df_cached = thetadata_helper.get_historical_eod_data( + asset=asset, + start_dt=start, + end_dt=end, + username="user", + password="pass", + ) + + assert (df_cached["open"] > 0).all() + + +def test_maybe_correct_eod_opens_caches_per_trade_date(monkeypatch): + thetadata_helper.reset_open_correction_cache() + asset = Asset(asset_type="stock", symbol="MSFT") + tz = pytz.UTC + start = tz.localize(datetime.datetime(2024, 11, 21, 19, 0)) + end = tz.localize(datetime.datetime(2024, 11, 23, 19, 0)) + idx = pd.date_range(start="2024-11-21 20:00:00+00:00", periods=2, freq="D", tz=tz) + eod_df = pd.DataFrame({"open": [0.0, 0.0], "close": [10.0, 10.5]}, index=idx) + + minute_index = pd.to_datetime( + ["2024-11-21 09:30:00+00:00", "2024-11-22 09:30:00+00:00"], + utc=True, + ) + minute_df = pd.DataFrame({"open": [12.0, 13.0]}, index=minute_index) + minute_df.index.name = "datetime" + fetch_count = {"calls": 0} + + def _minute_fetch(**_): + fetch_count["calls"] += 1 + return minute_df + + monkeypatch.setattr(thetadata_helper, "get_historical_data", _minute_fetch) + + corrected = thetadata_helper._maybe_correct_eod_opens( + df=eod_df.copy(), + asset=asset, + start_dt=start, + end_dt=end, + username="user", + password="pass", + datastyle="ohlc", + ) + assert fetch_count["calls"] == 1 + assert (corrected["open"] > 0).all() + + repeated = thetadata_helper._maybe_correct_eod_opens( + df=eod_df.copy(), + asset=asset, + start_dt=start, + end_dt=end, + username="user", + password="pass", + datastyle="ohlc", + ) + + assert fetch_count["calls"] == 1 + assert (repeated["open"] > 0).all() + + +def test_update_pandas_data_appends_incremental_rows(monkeypatch): + tz = pytz.UTC + start = tz.localize(datetime.datetime(2024, 1, 1)) + end = tz.localize(datetime.datetime(2024, 1, 31)) + + with patch.object(ThetaDataBacktestingPandas, "kill_processes_by_name", return_value=None), patch.object( + thetadata_helper, "reset_theta_terminal_tracking", return_value=None + ): + backtester = ThetaDataBacktestingPandas( + datetime_start=start, + datetime_end=end, + pandas_data=[], + username="user", + password="pass", + ) + + backtester._use_quote_data = False + dt_sequence = [ + tz.localize(datetime.datetime(2024, 1, 5, 16, 0)), + tz.localize(datetime.datetime(2024, 1, 7, 16, 0)), + ] + dt_calls = iter([dt_sequence[0], dt_sequence[0], dt_sequence[1], dt_sequence[1]]) + backtester.get_datetime = MagicMock(side_effect=dt_calls) + + fetch_ranges = [] + payloads = [ + _build_sample_ohlc("2024-01-05 09:30:00+00:00", periods=5, freq="min"), + _build_sample_ohlc("2024-01-05 09:35:00+00:00", periods=4, freq="min"), + ] + + def _price_data(username, password, asset_param, start_dt, end_dt, **kwargs): + fetch_ranges.append((start_dt, end_dt)) + return payloads[len(fetch_ranges) - 1] + + monkeypatch.setattr(thetadata_helper, "get_price_data", _price_data) + + asset = Asset(asset_type="stock", symbol="MSFT") + backtester._update_pandas_data(asset, None, length=5, timestep="minute", start_dt=dt_sequence[0]) + + tuple_key = next(iter(backtester.pandas_data)) + meta = backtester._dataset_metadata.get((tuple_key, "minute")) + assert meta is not None + assert meta.get("end") < dt_sequence[1] + + backtester._update_pandas_data(asset, None, length=5, timestep="minute", start_dt=dt_sequence[1]) + + assert len(fetch_ranges) == 2 + assert fetch_ranges[1][0] > fetch_ranges[0][0] + + stored_df = backtester.pandas_data[tuple_key].df + assert len(stored_df) == 9 # 5 initial rows + 4 new rows from incremental fetch + + def test_get_historical_data_parses_option_downloader_schema(monkeypatch): fixture = load_thetadata_fixture("option_history_ohlc.json")