Refactor streaming writer to support per-bar-type persistence #3078
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Pull Request
NautilusTrader prioritizes correctness and reliability, please follow existing patterns for validation and testing.
CONTRIBUTING.mdand followed the established practicesSummary
Refactor streaming writer to support per-bar-type persistence and improve catalog conversion
Overview
This PR refactors the streaming writer infrastructure to properly handle per-bar-type file organization and enhances the catalog's ability to convert streamed data back to parquet format. The changes enable better organization of streamed data and support for converting internal bars to external bars during catalog conversion.
Key changes
Streaming writer improvements:
StreamingFeatherWriterto support per-bar-type file organization forBardata, similar to existing per-instrument organization for ticks and order book data._rotate_identifier_file()method that handles both per-instrument and per-bar-type cases._create_instrument_writer()to_create_identifier_writer()to better reflect its dual purpose (instrument_id and bar_type).self.loggertoself.logfor consistency with codebase conventions.Catalog conversion enhancements:
identifiersparameter toconvert_stream_to_data()to allow filtering by specific instrument_ids or bar_types.convert_bar_type_to_externalparameter to_handle_table_nautilus()to convert INTERNAL bar types to EXTERNAL during deserialization.convert_stream_to_data()to handle both subdirectory-organized files (per-instrument/per-bar-type) and flat files.Data engine fix:
bars()catalog query to usebar_typesparameter (list) instead ofbar_type(string).Example updates:
databento_option_greeks.pyexample to demonstrate streaming bothGreeksDataandBardata.Minor improvements:
_extract_sql_safe_filename()implementation.Testing
New tests added:
test_feather_writer_per_bar_type: Tests per-bar-type file organization with separate subdirectories for different bar types.test_convert_stream_to_data_with_identifiers: Tests filtering by identifiers when converting stream data to catalog.test_convert_stream_to_data_internal_to_external: Tests conversion of INTERNAL bar types to EXTERNAL during catalog conversion.Test results:
test_streaming.pypass successfully.Related Issues/PRs
Type of change
Breaking change details (if applicable)
Documentation
docs/developer_guide/docs.md)Release notes
RELEASES.mdthat follows the existing conventions (when applicable)Testing
Ensure new or changed logic is covered by tests.
Tested all functionalities in databento_option_greeks.py (updated)