Skip to content

Commit 8876133

Browse files
committed
Optimize BacktestEngine data loading with deferred sorting
- Add `_sorted` flag and `sort=False` parameter to defer sorting - Add presorted parameter to data iterator to skip redundant sorts - Optimize `BacktestNode` to sort once after loading all data configs - Add test coverage and documentation for deferred sorting
1 parent aaa38e8 commit 8876133

File tree

6 files changed

+606
-19
lines changed

6 files changed

+606
-19
lines changed

docs/concepts/backtesting.md

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,109 @@ An instantiated `BacktestEngine` can accept the following:
3939

4040
This approach offers detailed control over the backtesting process, allowing you to manually configure each component.
4141

42+
### Loading large datasets efficiently
43+
44+
When working with large amounts of data across multiple instruments, the way you load data
45+
can significantly impact performance.
46+
47+
#### The performance consideration
48+
49+
By default, `BacktestEngine.add_data()` sorts the entire data stream (existing data + newly
50+
added data) on each call when `sort=True` (the default). This means:
51+
52+
- First call with 1M bars: sorts 1M bars.
53+
- Second call with 1M bars: sorts 2M bars.
54+
- Third call with 1M bars: sorts 3M bars.
55+
- And so on...
56+
57+
This repeated sorting of increasingly large datasets can become a bottleneck when loading
58+
data for multiple instruments.
59+
60+
#### Optimization strategies
61+
62+
**Strategy 1: Defer sorting until the end (recommended for multiple instruments)**
63+
64+
```python
65+
from nautilus_trader.backtest.engine import BacktestEngine
66+
67+
engine = BacktestEngine()
68+
69+
# Setup venue and instruments
70+
engine.add_venue(...)
71+
engine.add_instrument(instrument1)
72+
engine.add_instrument(instrument2)
73+
engine.add_instrument(instrument3)
74+
75+
# Load all data WITHOUT sorting on each call
76+
engine.add_data(instrument1_bars, sort=False)
77+
engine.add_data(instrument2_bars, sort=False)
78+
engine.add_data(instrument3_bars, sort=False)
79+
80+
# Sort once at the end - much more efficient!
81+
engine.sort_data()
82+
83+
# Now run your backtest
84+
engine.add_strategy(strategy)
85+
engine.run()
86+
```
87+
88+
**Strategy 2: Collect and add in a single batch**
89+
90+
```python
91+
# Collect all data first
92+
all_bars = []
93+
all_bars.extend(instrument1_bars)
94+
all_bars.extend(instrument2_bars)
95+
all_bars.extend(instrument3_bars)
96+
97+
# Add once with sorting
98+
engine.add_data(all_bars, sort=True)
99+
```
100+
101+
**Strategy 3: Use streaming API for very large datasets**
102+
103+
For datasets that don't fit in memory, use the streaming API:
104+
105+
```python
106+
def data_generator():
107+
# Yield chunks of pre-sorted data
108+
yield load_chunk_1()
109+
yield load_chunk_2()
110+
yield load_chunk_3()
111+
112+
engine.add_data_iterator(
113+
data_name="my_data_stream",
114+
generator=data_generator(),
115+
)
116+
```
117+
118+
:::tip Performance impact
119+
For a backtest with 10 instruments, each with 1M bars:
120+
121+
- Sorting on each call: ~10 sorts of increasing size (1M, 2M, 3M, ... 10M bars).
122+
- Sorting once at the end: 1 sort of 10M bars.
123+
124+
The deferred sorting approach can be **orders of magnitude faster** for large datasets.
125+
:::
126+
127+
### Data loading contract
128+
129+
The `BacktestEngine` enforces important invariants to ensure data integrity:
130+
131+
**Requirements:**
132+
133+
- All data must be sorted and synced to the internal iterator before calling `run()`.
134+
- When using `sort=False`, you **must** call `sort_data()` or add more data with `sort=True` before running.
135+
- The engine validates this requirement and raises `RuntimeError` if violated.
136+
137+
**Safety guarantees:**
138+
139+
- Data lists are always copied internally to prevent external mutations from affecting engine state.
140+
- You can safely clear or modify data lists after passing them to `add_data()`.
141+
- Adding data with `sort=True` makes it immediately available for backtesting.
142+
143+
This design ensures data integrity while enabling performance optimizations for large datasets.
144+
42145
## High-level API
43146

44147
The high-level API centers around a `BacktestNode`, which orchestrates the management of multiple `BacktestEngine` instances,

nautilus_trader/backtest/engine.pxd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ cdef class BacktestEngine:
114114
cdef object _data_iterator
115115
cdef uint64_t _last_ns
116116
cdef uint64_t _end_ns
117+
cdef bint _sorted
117118
cdef dict[str, RequestData] _data_requests
118119
cdef set[str] _backtest_subscription_names
119120
cdef dict[str, uint64_t] _last_subscription_ts
@@ -176,7 +177,7 @@ cdef class BacktestDataIterator:
176177
cdef dict[str, uint64_t] _stream_chunk_duration_ns
177178

178179
cpdef void _reset_single_data(self)
179-
cdef void _add_data(self, str data_name, list data_list, bint append_data=*)
180+
cdef void _add_data(self, str data_name, list data_list, bint append_data=*, bint presorted=*)
180181
cpdef void remove_data(self, str data_name, bint complete_remove=*)
181182
cpdef void _activate_single_data(self)
182183
cpdef void _deactivate_single_data(self)

nautilus_trader/backtest/engine.pyx

Lines changed: 86 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ cdef class BacktestEngine:
230230
self._iteration: uint64_t = 0
231231
self._last_ns : uint64_t = 0
232232
self._end_ns : uint64_t = 0
233+
self._sorted: bint = True
233234

234235
# Timing
235236
self._run_started: pd.Timestamp | None = None
@@ -767,6 +768,30 @@ cdef class BacktestEngine:
767768
Caution if adding data without `sort` being True, as this could lead to running backtests
768769
on a stream which does not have monotonically increasing timestamps.
769770

771+
Notes
772+
-----
773+
For optimal performance when loading large datasets, consider using `sort=False` for all
774+
calls to `add_data()`, then calling `sort_data()` once after all data has been added:
775+
776+
.. code-block:: python
777+
778+
# Add multiple data streams without sorting
779+
engine.add_data(instrument1_bars, sort=False)
780+
engine.add_data(instrument2_bars, sort=False)
781+
engine.add_data(instrument3_bars, sort=False)
782+
783+
# Sort once at the end
784+
engine.sort_data()
785+
786+
This approach avoids repeatedly sorting the entire data stream on each call,
787+
significantly reducing load time for large datasets.
788+
789+
**Contract invariants:**
790+
791+
- When `sort=True`: Data is immediately available for backtesting via `run()`.
792+
- When `sort=False`: You **must** call `sort_data()` or add data with `sort=True` before `run()`.
793+
- The provided `data` list is always copied internally to prevent external mutations from affecting the engine state.
794+
770795
"""
771796
Condition.not_empty(data, "data")
772797
Condition.list_type(data, Data, "data")
@@ -822,8 +847,10 @@ cdef class BacktestEngine:
822847

823848
if sort:
824849
self._data = sorted(self._data, key=lambda x: x.ts_init)
825-
826-
self._data_iterator.add_data("backtest_data", self._data)
850+
self._data_iterator.add_data("backtest_data", self._data, append_data=True, presorted=True)
851+
self._sorted = True
852+
else:
853+
self._sorted = False
827854

828855
for data_point in data:
829856
data_type = type(data_point)
@@ -1049,6 +1076,8 @@ cdef class BacktestEngine:
10491076
"""
10501077
Condition.not_none(data, "data")
10511078
self._data = pickle.loads(data)
1079+
self._data_iterator.add_data("backtest_data", self._data, append_data=True, presorted=True)
1080+
self._sorted = True
10521081

10531082
self._log.info(
10541083
f"Loaded {len(self._data):_} data "
@@ -1191,7 +1220,10 @@ cdef class BacktestEngine:
11911220
# Reset timing
11921221
self._iteration = 0
11931222
self._data_iterator = BacktestDataIterator()
1194-
self._data_iterator.add_data("backtest_data", self._data)
1223+
1224+
if self._sorted:
1225+
self._data_iterator.add_data("backtest_data", self._data, append_data=True, presorted=True)
1226+
11951227
self._run_started = None
11961228
self._run_finished = None
11971229
self._backtest_start = None
@@ -1204,7 +1236,9 @@ cdef class BacktestEngine:
12041236
Sort the engines internal data stream.
12051237

12061238
"""
1207-
self._data.sort()
1239+
self._data = sorted(self._data, key=lambda x: x.ts_init)
1240+
self._data_iterator.add_data("backtest_data", self._data, append_data=True, presorted=True)
1241+
self._sorted = True
12081242

12091243
def clear_data(self) -> None:
12101244
"""
@@ -1218,6 +1252,7 @@ cdef class BacktestEngine:
12181252
self._data.clear()
12191253
self._data_len = 0
12201254
self._data_iterator = BacktestDataIterator()
1255+
self._sorted = True
12211256

12221257
def clear_actors(self) -> None:
12231258
"""
@@ -1294,6 +1329,16 @@ cdef class BacktestEngine:
12941329
If no data has been added to the engine.
12951330
ValueError
12961331
If the `start` is >= the `end` datetime.
1332+
RuntimeError
1333+
If data has been added with `sort=False` but `sort_data()` has not been called.
1334+
1335+
Notes
1336+
-----
1337+
**Contract invariants:**
1338+
1339+
- All data added via `add_data()` must be sorted and synced to the internal iterator before calling `run()`.
1340+
- If any data was added with `sort=False`, you must call `sort_data()` or add data with `sort=True` before this method.
1341+
- The engine validates this requirement and will raise `RuntimeError` if unsorted data is detected.
12971342

12981343
"""
12991344
self._run(start, end, run_config_id, streaming)
@@ -1393,6 +1438,13 @@ cdef class BacktestEngine:
13931438
run_config_id: str | None = None,
13941439
bint streaming = False,
13951440
):
1441+
# Validate data has been sorted and synced to iterator
1442+
if self._data and not self._sorted:
1443+
raise RuntimeError(
1444+
"Data has been added but not sorted, "
1445+
"call `engine.sort_data()` or use `engine.add_data(..., sort=True)` before running"
1446+
)
1447+
13961448
# Validate data
13971449
cdef:
13981450
SimulatedExchange exchange
@@ -1962,7 +2014,13 @@ cdef class BacktestDataIterator:
19622014
self._single_data_index = 0
19632015
self._is_single_data = False
19642016
1965-
def add_data(self, data_name, list data, bint append_data=True):
2017+
def add_data(
2018+
self,
2019+
str data_name,
2020+
list data,
2021+
bint append_data = True,
2022+
bint presorted = False,
2023+
) -> None:
19662024
"""
19672025
Add (or replace) a named, pre-sorted data list for static data loading.
19682026

@@ -1979,6 +2037,9 @@ cdef class BacktestDataIterator:
19792037
Controls stream priority for timestamp ties:
19802038
``True`` – lower priority (appended).
19812039
``False`` – higher priority (prepended).
2040+
presorted : bool, default ``False``
2041+
If the data is guaranteed to be pre-sorted by `ts_init`.
2042+
When ``True``, skips internal sorting for better performance.
19822043

19832044
Raises
19842045
------
@@ -1991,13 +2052,14 @@ cdef class BacktestDataIterator:
19912052
if not data:
19922053
return
19932054
1994-
def data_generator():
1995-
yield data
1996-
# Generator ends after yielding once
1997-
1998-
self.init_data(data_name, data_generator(), append_data)
2055+
self._add_data(data_name, data, append_data, presorted)
19992056
2000-
def init_data(self, str data_name, data_generator, bint append_data=True):
2057+
def init_data(
2058+
self,
2059+
str data_name,
2060+
data_generator,
2061+
bint append_data = True,
2062+
) -> None:
20012063
"""
20022064
Add (or replace) a named data generator for streaming large datasets.
20032065

@@ -2042,7 +2104,13 @@ cdef class BacktestDataIterator:
20422104
# Generator is already exhausted, nothing to add
20432105
pass
20442106
2045-
cdef void _add_data(self, str data_name, list data_list, bint append_data=True):
2107+
cdef void _add_data(
2108+
self,
2109+
str data_name,
2110+
list data_list,
2111+
bint append_data = True,
2112+
bint presorted = False,
2113+
):
20462114
if len(data_list) == 0:
20472115
return
20482116
@@ -2062,7 +2130,12 @@ cdef class BacktestDataIterator:
20622130
if self._is_single_data:
20632131
self._deactivate_single_data()
20642132
2065-
self._data[data_priority] = sorted(data_list, key=lambda data: data.ts_init)
2133+
# Copy and optionally sort to avoid aliasing caller's list
2134+
if presorted:
2135+
self._data[data_priority] = list(data_list)
2136+
else:
2137+
self._data[data_priority] = sorted(data_list, key=lambda data: data.ts_init)
2138+
20662139
self._data_name[data_priority] = data_name
20672140
self._data_priority[data_name] = data_priority
20682141
self._data_len[data_priority] = len(data_list)

nautilus_trader/backtest/node.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ def _run_oneshot(
592592
start: str | int | None = None,
593593
end: str | int | None = None,
594594
) -> None:
595-
# Load data
595+
# Load data - defer sorting until all data is loaded for better performance
596596
for config in data_configs:
597597
t0 = pd.Timestamp.now()
598598
used_instrument_ids = get_instrument_ids(config)
@@ -616,11 +616,12 @@ def _run_oneshot(
616616
f"Read {len(result.data):,} events from parquet in {pd.Timedelta(t1 - t0)}s",
617617
)
618618

619-
self._load_engine_data(engine=engine, result=result)
619+
self._load_engine_data(engine=engine, result=result, sort=False) # sort before run
620620

621621
t2 = pd.Timestamp.now()
622622
engine.logger.info(f"Engine load took {pd.Timedelta(t2 - t1)}s")
623623

624+
engine.sort_data()
624625
engine.run(start=start, end=end, run_config_id=run_config_id)
625626

626627
@classmethod
@@ -669,11 +670,16 @@ def load_catalog(cls, config: BacktestDataConfig) -> ParquetDataCatalog:
669670
fs_rust_storage_options=config.catalog_fs_rust_storage_options,
670671
)
671672

672-
def _load_engine_data(self, engine: BacktestEngine, result: CatalogDataResult) -> None:
673+
def _load_engine_data(
674+
self,
675+
engine: BacktestEngine,
676+
result: CatalogDataResult,
677+
sort: bool = True,
678+
) -> None:
673679
if is_nautilus_class(result.data_cls):
674680
engine.add_data(
675681
data=result.data,
676-
sort=True, # Already sorted from backend
682+
sort=sort,
677683
)
678684
else:
679685
if not result.client_id:
@@ -684,7 +690,7 @@ def _load_engine_data(self, engine: BacktestEngine, result: CatalogDataResult) -
684690
engine.add_data(
685691
data=result.data,
686692
client_id=result.client_id,
687-
sort=True, # Already sorted from backend
693+
sort=sort,
688694
)
689695

690696
def log_backtest_exception(self, e: Exception, config: BacktestRunConfig) -> None:

0 commit comments

Comments
 (0)