Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,19 @@
# Changelog

## 4.3.8 - 2024-11-16

- ThetaData panda fetches now short-circuit once coverage metadata says the requested start/end are already in cache, and incremental requests append only the missing tail of bars instead of reprocessing the full frame.
- Daily strategies skip the expensive minute-quote merge/forward-fill path entirely, eliminating unnecessary `pd.concat`/transpose work for slow runs such as GoogleMomentum.
- The 09:30 open correction routine memoizes per (symbol, date) and groups contiguous days so the Theta minute downloader is hit exactly once per uncovered window.
- `_combine_duplicate_columns` now deduplicates columns in a single pass (no repeated transpose/drop cycles) and `pd.to_datetime` is skipped whenever frames already carry a `DatetimeIndex`, cutting GoogleMomentum’s Theta run time from ~109 s (dev) to ~75 s on feature.
- Added regression coverage for both the fast-path/append flow and the memoized open-correction behavior.

## 4.3.7 - 2024-11-16

- ThetaData backtests now keep an incremental, processed-frame cache so repeated `get_historical_prices` calls append only the missing bars instead of reloading and re-normalizing the full dataset on every iteration.
- The EOD 09:30 open correction flow reuses cached minute bars per asset/date, fetching only uncovered days and logging clearer diagnostics when Theta rejects the override window.
- Added regression coverage for the caching/retry code plus a downloader smoke script (`scripts/check_eod_chunking.py`) to validate prod chunking.

## 4.3.6 - 2024-11-16

- Fixed ThetaData EOD corrections by fetching a real 09:30–09:31 minute window for each trading day, preventing zero-length requests and the resulting terminal hangs.
Expand Down
179 changes: 150 additions & 29 deletions lumibot/tools/thetadata_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,41 @@ def _coerce_skip_flag(raw: Optional[str], base_url: str) -> bool:
False: ("09:30:00", "16:00:00"), # regular session only
}

OPEN_CORRECTION_CACHE: Dict[str, Dict[date, float]] = {}
OPEN_CORRECTION_CACHE_LIMIT = int(os.environ.get("THETADATA_OPEN_CACHE_LIMIT", "512"))


def reset_open_correction_cache() -> None:
"""Utility hook for tests to clear cached 09:30 open prices."""
OPEN_CORRECTION_CACHE.clear()


def _open_cache_key(asset: Asset) -> str:
expiration = getattr(asset, "expiration", None)
suffix = f":{expiration}" if expiration else ""
return f"{getattr(asset, 'asset_type', 'stock')}::{asset.symbol}{suffix}"


def _trim_open_cache(asset_key: str) -> None:
cache = OPEN_CORRECTION_CACHE.get(asset_key)
if not cache:
return
overflow = len(cache) - OPEN_CORRECTION_CACHE_LIMIT
if overflow <= 0:
return
for stale_date in sorted(cache.keys())[:overflow]:
cache.pop(stale_date, None)


def _localize_trade_midnight(trade_date: date, reference: Optional[datetime]) -> datetime:
base = datetime.combine(trade_date, datetime.min.time())
tzinfo = getattr(reference, "tzinfo", None)
if tzinfo is None:
return pytz.UTC.localize(base)
if base.tzinfo is not None:
return base.astimezone(tzinfo)
return tzinfo.localize(base)


def _build_request_headers(base: Optional[Dict[str, str]] = None) -> Dict[str, str]:
request_headers: Dict[str, str] = dict(base or {})
Expand Down Expand Up @@ -2426,52 +2461,138 @@ def combine_datetime(row):
df = df.drop(columns=["bid_size", "bid_exchange", "bid", "bid_condition",
"ask_size", "ask_exchange", "ask", "ask_condition"], errors='ignore')

# FIX: ThetaData's EOD endpoint returns incorrect open/high/low prices for STOCKS and OPTIONS
# that don't match Polygon/Yahoo. We fix this by using minute bar data.
# Solution: Fetch minute bars for each trading day and aggregate to get correct OHLC
# NOTE: Indexes don't need this fix since they are calculated values, not traded securities
if asset_type in ("stock", "option"):
logger.info(f"Fetching 9:30 AM minute bars to correct EOD open prices...")
# Fix incorrect stock/option opens by reusing cached 09:30 minute bars when necessary.
if asset_type in ("stock", "option") and "open" in df.columns:
df = _maybe_correct_eod_opens(
df=df,
asset=asset,
start_dt=start_dt,
end_dt=end_dt,
username=username,
password=password,
datastyle=datastyle,
)

return df


def _maybe_correct_eod_opens(
df: pd.DataFrame,
asset: Asset,
start_dt: datetime,
end_dt: datetime,
username: str,
password: str,
datastyle: str,
):
asset_key = _open_cache_key(asset)
asset_cache = OPEN_CORRECTION_CACHE.setdefault(asset_key, {})

if asset_cache:
for idx in df.index:
cached_value = asset_cache.get(idx.date())
if cached_value is not None:
df.at[idx, "open"] = cached_value

open_series = df.get("open")
if open_series is None:
return df

trade_dates = sorted({idx.date() for idx in df.index})
invalid_dates = {
idx.date()
for idx, value in zip(df.index, open_series)
if pd.isna(value) or value <= 0
}
missing_cache_dates = [trade_date for trade_date in trade_dates if trade_date not in asset_cache]
dates_to_fetch = sorted(set(missing_cache_dates) | invalid_dates)

if not dates_to_fetch:
if invalid_dates:
logger.debug(
"[THETA][EOD][OPEN] invalid opens for %s resolved from cache; skipping 09:30 correction",
asset.symbol,
)
else:
logger.debug(
"[THETA][EOD][OPEN] opens already cached for %s; skipping 09:30 correction",
asset.symbol,
)
for idx in df.index:
cached_value = asset_cache.get(idx.date())
if cached_value is not None:
df.at[idx, "open"] = cached_value
return df

logger.info(
"Fetching 9:30 AM minute bars to correct EOD open prices (asset=%s missing_days=%d)...",
asset.symbol,
len(dates_to_fetch),
)

# Get minute data for the date range to extract 9:30 AM opens
minute_df = None
correction_window = ("09:30:00", "09:31:00")
correction_window = ("09:30:00", "09:31:00")

def _store_opens(minute_df: Optional[pd.DataFrame]) -> int:
if minute_df is None or minute_df.empty:
return 0
minute_df_copy = minute_df.copy()
minute_df_copy["date"] = minute_df_copy.index.date
grouped = minute_df_copy.groupby("date").first()
stored = 0
for trade_date, row in grouped.iterrows():
open_price = row.get("open")
if open_price is None:
continue
asset_cache[trade_date] = float(open_price)
stored += 1
return stored

spans: List[Tuple[date, date]] = []
span_start = dates_to_fetch[0]
previous = span_start
for trade_date in dates_to_fetch[1:]:
if (trade_date - previous).days > 1:
spans.append((span_start, previous))
span_start = trade_date
previous = trade_date
spans.append((span_start, previous))

for span_start, span_end in spans:
start_fetch = _localize_trade_midnight(span_start, start_dt)
end_fetch = _localize_trade_midnight(span_end + timedelta(days=1), end_dt)
try:
minute_df = get_historical_data(
asset=asset,
start_dt=start_dt,
end_dt=end_dt,
ivl=60000, # 1 minute
start_dt=start_fetch,
end_dt=end_fetch,
ivl=60000,
username=username,
password=password,
datastyle=datastyle,
include_after_hours=False, # RTH only
include_after_hours=False,
session_time_override=correction_window,
)
except ThetaRequestError as exc:
body_text = (exc.body or "").lower()
if "start must be before end" in body_text:
logger.warning(
"ThetaData rejected 09:30 correction window for %s; skipping open fix this chunk (%s)",
"ThetaData rejected 09:30 correction window for %s; skipping open fix window %s-%s (%s)",
asset.symbol,
span_start,
span_end,
exc.body,
)
else:
raise
continue
raise

_store_opens(minute_df)

_trim_open_cache(asset_key)

if minute_df is not None and not minute_df.empty:
# Group by date and get the first bar's open for each day
minute_df_copy = minute_df.copy()
minute_df_copy['date'] = minute_df_copy.index.date

# For each date in df, find the corresponding 9:30 AM open from minute data
for idx in df.index:
trade_date = idx.date()
day_minutes = minute_df_copy[minute_df_copy['date'] == trade_date]
if len(day_minutes) > 0:
# Use the first minute bar's open (9:30 AM opening auction)
correct_open = day_minutes.iloc[0]['open']
df.loc[idx, 'open'] = correct_open
for idx in df.index:
cached_value = asset_cache.get(idx.date())
if cached_value is not None:
df.at[idx, "open"] = cached_value

return df

Expand Down
110 changes: 110 additions & 0 deletions scripts/compare_backtest_outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#!/usr/bin/env python3
"""
Compare two backtest output directories (typically dev vs feature) while ignoring
known volatile columns (UUID-based identifiers, run ids, etc.). The script exits
with a non-zero status code if any comparison fails so it can guard parity checks.
"""
from __future__ import annotations

import argparse
import sys
from pathlib import Path

import pandas as pd
from pandas.testing import assert_frame_equal

VOLATILE_COLUMNS = {"identifier", "strategy_run_id", "order_id"}
TEXT_ONLY_SUFFIXES = ("logs.csv",)


def _locate_file(directory: Path, filename: str) -> Path:
candidate = directory / filename
if candidate.exists():
return candidate
matches = sorted(directory.glob(f"*{filename}"))
if not matches:
raise FileNotFoundError(f"Missing file matching {filename!r} in {directory}")
if len(matches) > 1:
raise FileExistsError(
f"Multiple candidates for {filename!r} in {directory}: "
+ ", ".join(str(match.name) for match in matches)
)
return matches[0]


def _load_csv(path: Path) -> pd.DataFrame:
df = pd.read_csv(path)
drop_cols = [col for col in VOLATILE_COLUMNS if col in df.columns]
if drop_cols:
df = df.drop(columns=drop_cols)
df = df.reindex(sorted(df.columns), axis=1)
return df


def _compare_text(name: str, dev_file: Path, feature_file: Path) -> bool:
dev_text = dev_file.read_text()
feature_text = feature_file.read_text()
if dev_text == feature_text:
print(f"[OK] {name}")
return True
print(f"[DIFF] {name}: contents differ")
return False


def compare_file(name: str, dev_file: Path, feature_file: Path) -> bool:
if any(name.endswith(suffix) for suffix in TEXT_ONLY_SUFFIXES):
return _compare_text(name, dev_file, feature_file)
df_dev = _load_csv(dev_file)
df_feature = _load_csv(feature_file)
try:
assert_frame_equal(
df_dev,
df_feature,
check_dtype=False,
check_like=False,
atol=0,
rtol=0,
)
print(f"[OK] {name}")
return True
except AssertionError as exc:
print(f"[DIFF] {name}: {exc}")
return False


def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Compare two backtest artifact folders")
parser.add_argument("dev_dir", type=Path, help="Directory containing dev CSV outputs")
parser.add_argument("feature_dir", type=Path, help="Directory containing feature CSV outputs")
parser.add_argument(
"--files",
nargs="+",
default=("stats.csv", "trades.csv"),
help="Filenames to compare (default: %(default)s)",
)
parser.add_argument(
"--include-logs",
action="store_true",
help="Include logs.csv comparison (fails if timestamps differ)",
)
return parser.parse_args()


def main() -> int:
args = _parse_args()

file_list = list(args.files)
if args.include_logs and "logs.csv" not in file_list:
file_list.append("logs.csv")

success = True
for filename in file_list:
dev_file = _locate_file(args.dev_dir, filename)
feature_file = _locate_file(args.feature_dir, filename)
success &= compare_file(filename, dev_file, feature_file)

return 0 if success else 1


if __name__ == "__main__":
sys.exit(main())
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def _copy_theta_terminal(self):

setuptools.setup(
name="lumibot",
version="4.3.6",
version="4.3.8",
author="Robert Grzesik",
author_email="[email protected]",
description="Backtesting and Trading Library, Made by Lumiwealth",
Expand Down
Loading
Loading