Skip to content

Commit 92dc103

Browse files
authored
Refactor reading of feather files in catalog (#3114)
1 parent 4233cc1 commit 92dc103

File tree

3 files changed

+166
-118
lines changed

3 files changed

+166
-118
lines changed

nautilus_trader/persistence/catalog/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,9 @@ def list_live_runs(self) -> list[str]:
205205
raise NotImplementedError
206206

207207
@abstractmethod
208-
def read_live_run(self, instance_id: str, **kwargs: Any) -> list[str]:
208+
def read_live_run(self, instance_id: str, **kwargs: Any) -> list[str] | dict[str, list[Data]]:
209209
raise NotImplementedError
210210

211211
@abstractmethod
212-
def read_backtest(self, instance_id: str, **kwargs: Any) -> list[str]:
212+
def read_backtest(self, instance_id: str, **kwargs: Any) -> list[str] | dict[str, list[Data]]:
213213
raise NotImplementedError

nautilus_trader/persistence/catalog/parquet.py

Lines changed: 163 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -1962,45 +1962,6 @@ def _query_files(
19621962

19631963
return file_paths
19641964

1965-
@staticmethod
1966-
def _handle_table_nautilus(
1967-
table: pa.Table | pd.DataFrame,
1968-
data_cls: type,
1969-
convert_bar_type_to_external: bool = False,
1970-
) -> list[Data]:
1971-
if isinstance(table, pd.DataFrame):
1972-
table = pa.Table.from_pandas(table)
1973-
1974-
# Convert metadata from INTERNAL to EXTERNAL if requested
1975-
if convert_bar_type_to_external and table.schema.metadata:
1976-
metadata = dict(table.schema.metadata)
1977-
1978-
# Convert bar_type metadata (for Bar data)
1979-
if b"bar_type" in metadata:
1980-
bar_type_str = metadata[b"bar_type"].decode()
1981-
1982-
if bar_type_str.endswith("-INTERNAL"):
1983-
metadata[b"bar_type"] = bar_type_str.replace("-INTERNAL", "-EXTERNAL").encode()
1984-
1985-
# Replace schema with updated metadata (shallow copy)
1986-
table = table.replace_schema_metadata(metadata)
1987-
1988-
data = ArrowSerializer.deserialize(data_cls=data_cls, batch=table)
1989-
module = data[0].__class__.__module__
1990-
1991-
if "nautilus_pyo3" in module:
1992-
cython_cls = {
1993-
"OrderBookDelta": OrderBookDelta,
1994-
"OrderBookDeltas": OrderBookDelta,
1995-
"OrderBookDepth10": OrderBookDepth10,
1996-
"QuoteTick": QuoteTick,
1997-
"TradeTick": TradeTick,
1998-
"Bar": Bar,
1999-
}.get(data_cls.__name__, data_cls.__name__)
2000-
data = cython_cls.from_pyo3_list(data)
2001-
2002-
return data
2003-
20041965
def query_last_timestamp(
20051966
self,
20061967
data_cls: type,
@@ -2134,11 +2095,6 @@ def _make_path(
21342095

21352096
# -- OVERLOADED BASE METHODS ------------------------------------------------------------------
21362097

2137-
def _list_directory_stems(self, subdirectory: str) -> list[str]:
2138-
glob_path = f"{self.path}/{subdirectory}/*"
2139-
2140-
return [Path(p).stem for p in self.fs.glob(glob_path)]
2141-
21422098
def list_data_types(self) -> list[str]:
21432099
"""
21442100
List all data types available in the catalog.
@@ -2175,7 +2131,7 @@ def list_live_runs(self) -> list[str]:
21752131
"""
21762132
return self._list_directory_stems("live")
21772133

2178-
def read_live_run(self, instance_id: str, **kwargs: Any) -> list[Data]:
2134+
def read_live_run(self, instance_id: str, **kwargs: Any) -> list[Data] | dict[str, list[Data]]:
21792135
"""
21802136
Read data from a live run.
21812137
@@ -2197,7 +2153,7 @@ def read_live_run(self, instance_id: str, **kwargs: Any) -> list[Data]:
21972153
"""
21982154
return self._read_feather(kind="live", instance_id=instance_id, **kwargs)
21992155

2200-
def read_backtest(self, instance_id: str, **kwargs: Any) -> list[Data]:
2156+
def read_backtest(self, instance_id: str, **kwargs: Any) -> list[Data] | dict[str, list[Data]]:
22012157
"""
22022158
Read data from a backtest run.
22032159
@@ -2224,11 +2180,14 @@ def _read_feather(
22242180
kind: str,
22252181
instance_id: str,
22262182
raise_on_failed_deserialize: bool = False,
2227-
) -> list[Data]:
2183+
data_cls: type | None = None,
2184+
identifiers: list[str] | None = None,
2185+
return_as_dict: bool = False,
2186+
) -> list[Data] | dict[str, list[Data]]:
22282187
class_mapping: dict[str, type] = {class_to_filename(cls): cls for cls in list_schemas()}
22292188
data = defaultdict(list)
22302189

2231-
for feather_file in self._list_feather_files(kind=kind, instance_id=instance_id):
2190+
for feather_file in self._list_feather_files(kind, instance_id, data_cls, identifiers):
22322191
path = feather_file.path
22332192
cls_name = feather_file.class_name
22342193
table: pa.Table = self._read_feather_file(path=path)
@@ -2242,53 +2201,18 @@ def _read_feather(
22422201

22432202
try:
22442203
data_cls = class_mapping[cls_name]
2245-
objs = self._handle_table_nautilus(table=table, data_cls=data_cls)
2246-
data[cls_name].extend(objs)
2204+
data_objects = self._handle_table_nautilus(table=table, data_cls=data_cls)
2205+
data[cls_name].extend(data_objects)
22472206
except Exception as e:
22482207
if raise_on_failed_deserialize:
22492208
raise
22502209

22512210
print(f"Failed to deserialize {cls_name}: {e}")
22522211

2253-
return sorted(itertools.chain.from_iterable(data.values()), key=lambda x: x.ts_init)
2254-
2255-
def _list_feather_files(
2256-
self,
2257-
kind: str,
2258-
instance_id: str,
2259-
) -> Generator[FeatherFile, None, None]:
2260-
prefix = f"{self.path}/{kind}/{urisafe_identifier(instance_id)}"
2261-
2262-
# Non-instrument feather files
2263-
for path_str in self.fs.glob(f"{prefix}/*.feather"):
2264-
if not self.fs.isfile(path_str):
2265-
continue
2266-
2267-
file_name = path_str.replace(prefix + "/", "").replace(".feather", "")
2268-
cls_name = "_".join(file_name.split("_")[:-1])
2269-
2270-
if not cls_name:
2271-
raise ValueError(f"`cls_name` was empty when a value was expected: {path_str}")
2272-
2273-
yield FeatherFile(path=path_str, class_name=cls_name)
2274-
2275-
# Per-instrument feather files (organized in subdirectories)
2276-
for path_str in self.fs.glob(f"{prefix}/**/*.feather"):
2277-
if not self.fs.isfile(path_str):
2278-
continue
2279-
2280-
file_name = path_str.replace(prefix + "/", "").replace(".feather", "")
2281-
path_parts = Path(file_name).parts
2282-
2283-
if len(path_parts) >= 2:
2284-
cls_name = path_parts[0] # cls_name is the first directory
2285-
else:
2286-
continue
2287-
2288-
if not cls_name:
2289-
continue
2212+
if return_as_dict:
2213+
return data
22902214

2291-
yield FeatherFile(path=path_str, class_name=cls_name)
2215+
return sorted(itertools.chain.from_iterable(data.values()), key=lambda x: x.ts_init)
22922216

22932217
def convert_stream_to_data(
22942218
self,
@@ -2319,35 +2243,15 @@ def convert_stream_to_data(
23192243
Filter to only include data containing these identifiers in their instrument_ids or bar_types.
23202244
23212245
"""
2322-
feather_dir = Path(self.path) / subdirectory / instance_id
2323-
data_name = class_to_filename(data_cls)
2324-
data_dir = feather_dir / data_name
2325-
2326-
if self.fs.isdir(str(data_dir)):
2327-
sub_dirs = [d for d in self.fs.glob(str(data_dir / "*")) if self.fs.isdir(d)]
2328-
feather_files = []
2329-
2330-
if not identifiers:
2331-
for sub_dir in sub_dirs:
2332-
feather_files.extend(sorted(self.fs.glob(str(Path(sub_dir) / "*.feather"))))
2333-
else:
2334-
for sub_dir in sub_dirs:
2335-
sub_dir_name = Path(sub_dir).name
2336-
2337-
for identifier in identifiers:
2338-
if identifier in sub_dir_name:
2339-
feather_files.extend(
2340-
sorted(self.fs.glob(str(Path(sub_dir) / "*.feather"))),
2341-
)
2342-
else:
2343-
# Data is in flat files (old format or non-per-instrument data)
2344-
feather_files = sorted(self.fs.glob(f"{feather_dir}/{data_name}_*.feather"))
2345-
23462246
used_catalog = self if other_catalog is None else other_catalog
23472247

2348-
for feather_file in feather_files:
2349-
feather_table = self._read_feather_file(str(feather_file))
2350-
2248+
for feather_file in self._list_feather_data_files(
2249+
kind=subdirectory,
2250+
instance_id=instance_id,
2251+
data_cls=data_cls,
2252+
identifiers=identifiers,
2253+
):
2254+
feather_table = self._read_feather_file(feather_file.path)
23512255
if feather_table is None:
23522256
continue
23532257

@@ -2364,13 +2268,157 @@ def _read_feather_file(
23642268
) -> pa.Table | None:
23652269
if not self.fs.exists(path):
23662270
return None
2271+
23672272
try:
23682273
with self.fs.open(path) as f:
23692274
reader = pa.ipc.open_stream(f)
23702275
return reader.read_all()
23712276
except (pa.ArrowInvalid, OSError):
23722277
return None
23732278

2279+
@staticmethod
2280+
def _handle_table_nautilus(
2281+
table: pa.Table | pd.DataFrame,
2282+
data_cls: type,
2283+
convert_bar_type_to_external: bool = False,
2284+
) -> list[Data]:
2285+
if isinstance(table, pd.DataFrame):
2286+
table = pa.Table.from_pandas(table)
2287+
2288+
# Convert metadata from INTERNAL to EXTERNAL if requested
2289+
if convert_bar_type_to_external and table.schema.metadata:
2290+
metadata = dict(table.schema.metadata)
2291+
2292+
# Convert bar_type metadata (for Bar data)
2293+
if b"bar_type" in metadata:
2294+
bar_type_str = metadata[b"bar_type"].decode()
2295+
2296+
if bar_type_str.endswith("-INTERNAL"):
2297+
metadata[b"bar_type"] = bar_type_str.replace("-INTERNAL", "-EXTERNAL").encode()
2298+
2299+
# Replace schema with updated metadata (shallow copy)
2300+
table = table.replace_schema_metadata(metadata)
2301+
2302+
data = ArrowSerializer.deserialize(data_cls=data_cls, batch=table)
2303+
module = data[0].__class__.__module__
2304+
2305+
if "nautilus_pyo3" in module:
2306+
cython_cls = {
2307+
"OrderBookDelta": OrderBookDelta,
2308+
"OrderBookDeltas": OrderBookDelta,
2309+
"OrderBookDepth10": OrderBookDepth10,
2310+
"QuoteTick": QuoteTick,
2311+
"TradeTick": TradeTick,
2312+
"Bar": Bar,
2313+
}.get(data_cls.__name__, data_cls.__name__)
2314+
data = cython_cls.from_pyo3_list(data)
2315+
2316+
return data
2317+
2318+
def _list_feather_files(
2319+
self,
2320+
kind: str,
2321+
instance_id: str,
2322+
data_cls: type | None = None,
2323+
identifiers: list[str] | None = None,
2324+
) -> Generator[FeatherFile, None, None]:
2325+
"""
2326+
List feather files for a given instance.
2327+
2328+
Parameters
2329+
----------
2330+
kind : str
2331+
The kind of data (e.g., "backtest", "live").
2332+
instance_id : str
2333+
The instance ID.
2334+
data_cls : type, optional
2335+
If provided, only list files for this specific data class.
2336+
identifiers : list[str], optional
2337+
If provided (and data_cls is also provided), only include files
2338+
from subdirectories containing these identifiers.
2339+
2340+
Yields
2341+
------
2342+
FeatherFile
2343+
Named tuple containing path and class_name.
2344+
2345+
"""
2346+
if data_cls is not None:
2347+
yield from self._list_feather_data_files(kind, instance_id, data_cls, identifiers)
2348+
else:
2349+
base_dir = Path(self.path) / kind / urisafe_identifier(instance_id)
2350+
discovered_classes: set[str] = set()
2351+
2352+
# Discover data classes from flat files
2353+
for path_str in self.fs.glob(str(base_dir / "*.feather")):
2354+
if not self.fs.isfile(path_str):
2355+
continue
2356+
2357+
# Extract class name by removing .feather extension and timestamp suffix
2358+
path = Path(path_str)
2359+
cls_name = "_".join(path.stem.split("_")[:-1])
2360+
2361+
if not cls_name:
2362+
raise ValueError(f"`cls_name` was empty when a value was expected: {path_str}")
2363+
2364+
discovered_classes.add(cls_name)
2365+
2366+
# Discover data classes from per-instrument subdirectories
2367+
# Use _list_directory_stems to get subdirectory names (data class names)
2368+
subdirs = self._list_directory_stems(f"{kind}/{urisafe_identifier(instance_id)}")
2369+
discovered_classes.update(subdirs)
2370+
2371+
# Use _list_feather_data_files for each discovered class
2372+
class_mapping: dict[str, type] = {class_to_filename(cls): cls for cls in list_schemas()}
2373+
2374+
for cls_name in discovered_classes:
2375+
if cls_name in class_mapping:
2376+
yield from self._list_feather_data_files(
2377+
kind,
2378+
instance_id,
2379+
class_mapping[cls_name],
2380+
identifiers,
2381+
)
2382+
2383+
def _list_directory_stems(self, subdirectory: str) -> list[str]:
2384+
glob_path = f"{self.path}/{subdirectory}/*"
2385+
2386+
return [Path(p).stem for p in self.fs.glob(glob_path)]
2387+
2388+
def _list_feather_data_files(
2389+
self,
2390+
kind: str,
2391+
instance_id: str,
2392+
data_cls: type,
2393+
identifiers: list[str] | None = None,
2394+
) -> Generator[FeatherFile, None, None]:
2395+
"""
2396+
List feather files for a specific data class.
2397+
"""
2398+
base_dir = Path(self.path) / kind / instance_id
2399+
data_name = class_to_filename(data_cls)
2400+
data_dir = base_dir / data_name
2401+
2402+
if self.fs.isdir(str(data_dir)):
2403+
# Per-instrument feather files organized in subdirectories
2404+
sub_dirs = [d for d in self.fs.glob(str(data_dir / "*")) if self.fs.isdir(d)]
2405+
2406+
for sub_dir in sub_dirs:
2407+
# Apply identifier filter if provided
2408+
if identifiers:
2409+
sub_dir_name = Path(sub_dir).name
2410+
2411+
if not any(identifier in sub_dir_name for identifier in identifiers):
2412+
continue
2413+
2414+
# Yield all feather files in this subdirectory
2415+
for path_str in sorted(self.fs.glob(str(Path(sub_dir) / "*.feather"))):
2416+
yield FeatherFile(path=path_str, class_name=data_name)
2417+
else:
2418+
# Data is in flat files (old format or non-per-instrument data)
2419+
for path_str in sorted(self.fs.glob(str(base_dir / f"{data_name}_*.feather"))):
2420+
yield FeatherFile(path=path_str, class_name=data_name)
2421+
23742422

23752423
def _timestamps_to_filename(timestamp_1: int, timestamp_2: int) -> str:
23762424
datetime_1 = _iso_timestamp_to_file_timestamp(unix_nanos_to_iso8601(timestamp_1))

tests/unit_tests/persistence/test_streaming.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def test_feather_writer(self, catalog_betfair: ParquetDataCatalog) -> None:
7474
instance_id=instance_id,
7575
raise_on_failed_deserialize=True,
7676
)
77-
result = dict(Counter([r.__class__.__name__ for r in result])) # type: ignore [assignment]
77+
result = dict(Counter([r.__class__.__name__ for r in result])) # type: ignore [arg-type]
7878

7979
# TODO: Backtest needs to be reconfigured to use either deltas or trades
8080
expected = {

0 commit comments

Comments
 (0)