Skip to content

Conversation

@xandie985
Copy link

@xandie985 xandie985 commented Mar 14, 2025

This pull request introduces experimental updates to the D1 and D2 layers of the time series data pipeline, focusing on handling large CSV files and improving data processing efficiency. Main changes include works in the:

  1. D1 layer
  2. D2 layer
  3. Unit test cases

@xandie985
Copy link
Author

Summary of Changes

This MR introduces experimental updates to handle large CSV files for the D1 and D2 layers of the time series data pipeline. Key changes include:

  1. New Implementations:

    • Added MultiFileTSDataset (D1 Layer) to handle multiple CSV files with chunked processing, global label encoding, and group-specific data handling.
    • Added MultiFileZarrTSDataset (D1 Layer) for handling large datasets split across multiple Zarr stores, with support for categorical encoding and chunked processing.
    • Added TSDataProcessor (D2 Layer) to process D1 output into sliding windows for model consumption, with support for past and future steps.
  2. Key Features:

    • Chunked Processing: Both D1 implementations process data in chunks to handle large files efficiently.
    • Global Label Encoding: Ensures consistent encoding of categorical variables across all files.
    • Group Handling: Supports grouped time series data, with metadata tracking for each group.
    • Sliding Windows: D2 layer creates input/output windows for training and inference.
  3. Code Structure:

    • Added new files:
      • MultiFileTSDataset.py: Handles CSV files with chunked processing.
      • MultiFileZarrTSDataset.py: Handles Zarr stores with chunked processing.
      • time_series_dataset.py: Combines D1 and D2 layers with support for training, validation, and testing splits.
    • Updated test notebooks (TEST.ipynb, TEST_v2.ipynb, etc.) to validate the new implementations.
  4. Improvements:

    • Scalability: Designed to handle datasets that don’t fit into memory by processing data in chunks.
    • Flexibility: Supports both CSV and Zarr formats, with configurable chunk sizes and group handling.
    • Validation: Ensures data integrity by validating time intervals and handling missing values.
  5. Future Work:

    • Integration with D1D2 baseline modeling.
    • Implementation of normalization for large datasets.
    • Exploration of PyTorch Lightning for data preparation.

@xandie985
Copy link
Author

time_series_dataset.py output logs look like:


Test Logs

Starting test_dataset...

Initializing D1 Dataset...
Processing files to build metadata...

Processing file 1/2: file1.csv
Processing file 2/2: file2.csv

Found 4 unique file-group combinations
Representing 3 unique group identifiers

D1 Dataset Info:
  Total Groups: 4
  Group IDs: [(0, 'A'), (0, 'B'), (1, 'B'), (1, 'C')]
  Group lengths: {(0, 'A'): 50, (0, 'B'): 50, (1, 'B'): 50, (1, 'C'): 50}
  Feature columns: ['category']
  Total data points: 200

Loading sample group data (group 'A')...
  Group ID: A
  Time points: 50
  Features shape: torch.Size([50, 1])
  Target shape: torch.Size([50, 1])
--------------------------------------------------------------------------------

Initializing D2 Processor...

D2 Processor Info:
  Total samples: 188

Attempting to get first sample from D2 Processor...

Sample structure:
  past_time: Array of shape (3,)
  past_target: Tensor of shape torch.Size([3, 1])
  past_features: Tensor of shape torch.Size([3, 1])
  past_weights: Tensor of shape torch.Size([3])
  future_time: Array of shape (1,)
  future_target: Tensor of shape torch.Size([1, 1])
  future_features: Tensor of shape torch.Size([1, 1])
  future_weights: Tensor of shape torch.Size([1])
  group: A
  static: {}

Sample output:
{
  'past_time': Array of shape (3,) with values: ['2025-01-01', '2025-01-02', '2025-01-03']
  'past_target': Tensor of shape torch.Size([3, 1]) with values: tensor([[-1.1611], [-1.7427], [-0.0379]])
  'past_features': Tensor of shape torch.Size([3, 1]) with values: tensor([[0.], [1.], [1.]])
  'past_weights': Tensor of shape torch.Size([3]) with values: tensor([1., 1., 1.])
  'future_time': Array of shape (1,) with values: ['2025-01-04']
  'future_target': Tensor of shape torch.Size([1, 1]) with values: tensor([[0.6696]])
  'future_features': Tensor of shape torch.Size([1, 1]) with values: tensor([[0.]])
  'future_weights': Tensor of shape torch.Size([1]) with values: tensor([1.])
  'group': A
  'static': {}
}

Test completed successfully!

Key Observations

  1. D1 Dataset:

    • Successfully processed 2 files (file1.csv and file2.csv).
    • Identified 4 unique file-group combinations and 3 unique group identifiers.
    • Group A has 50 time points, with features and targets correctly loaded as tensors.
  2. D2 Processor:

    • Created 188 valid samples from the dataset.
    • First sample output shows correct sliding window behavior:
      • Past window: 3 time steps with features, targets, and weights.
      • Future window: 1 time step with features, targets, and weights.
    • Data consistency validated across time, features, and targets.
  3. Test Status:

    • All tests completed successfully, confirming the functionality of the D1 and D2 layers.

@xandie985
Copy link
Author

Logs:

=== Time Series Dataset Test ===
Creating example files with multiple groups...
  Creating file1.csv with groups A and B
  Creating file2.csv with groups B and C
  Creating file3.csv with groups C, D and E
  Creating file4.csv with groups F, G and H
  Example files created successfully

=== Testing with memory_efficient=False (preload all data) ===
Processing files to build metadata...

Processing file 1/4: file1.csv

Processing file 2/4: file2.csv

Processing file 3/4: file3.csv

Processing file 4/4: file4.csv

Found 10 unique file-group combinations
Representing 8 unique group identifiers
Preloading data into memory...
Loading file 1/4: file1.csv
Loading file 2/4: file2.csv
Loading file 3/4: file3.csv
Loading file 4/4: file4.csv
Data preloading complete.

D1 Dataset Stats:
  Number of groups: 10
  Feature columns: ['x1', 'x2']
  Target columns: ['y1']
  Static columns: ['static1', 'static2']
  Memory mode: Preloaded

Sample from D1 dataset (first group):
  x: Tensor of shape torch.Size([50, 2]), dtype=torch.float32
  y: Tensor of shape torch.Size([50, 1]), dtype=torch.float32
  t: ['2025-01-01' '2025-01-02' '2025-01-03' '2025-01-04' '2025-01-05'
 '2025-01-06' '2025-01-07' '2025-01-08' '2025-01-09' '2025-01-10'
 '2025-01-11' '2025-01-12' '2025-01-13' '2025-01-14' '2025-01-15'
 '2025-01-16' '2025-01-17' '2025-01-18' '2025-01-19' '2025-01-20'
 '2025-01-21' '2025-01-22' '2025-01-23' '2025-01-24' '2025-01-25'
 '2025-01-26' '2025-01-27' '2025-01-28' '2025-01-29' '2025-01-30'
 '2025-01-31' '2025-02-01' '2025-02-02' '2025-02-03' '2025-02-04'
 '2025-02-05' '2025-02-06' '2025-02-07' '2025-02-08' '2025-02-09'
 '2025-02-10' '2025-02-11' '2025-02-12' '2025-02-13' '2025-02-14'
 '2025-02-15' '2025-02-16' '2025-02-17' '2025-02-18' '2025-02-19']
  w: Tensor of shape torch.Size([50]), dtype=torch.float32
  v: Tensor of shape torch.Size([50]), dtype=torch.bool
  group_id: A
  st: {'static1': 's2', 'static2': np.int64(9)}

=== Testing D2 Module with Percentage Splits ===
Precomputing valid indices and mappings...
Found 470 valid windows across 10 groups
Percentage-based split - Train: 320, Val: 70, Test: 80 samples
Split statistics: Train: 320, Validation: 70, Test: 80
D2 Module Stats (Percentage Split):
  Total samples: 470
  Past steps: 3
  Future steps: 1
  Train samples: 320
  Validation samples: 70
  Test samples: 80

Sample from D2 train split (index 27):
  Past time shape: (3,)
  Past features shape: torch.Size([3, 2])
  Future target shape: torch.Size([1, 1])

=== Testing D2 Module with Group-Based Splits ===
Precomputing valid indices and mappings...
Found 470 valid windows across 10 groups
Group-based split - Train: 282 (4 groups), Val: 47 (1 groups), Test: 141 (3 groups) samples
Split statistics: Train: 282, Validation: 47, Test: 141
D2 Module Stats (Group Split):
  Total samples: 470
  Train groups: ['A', 'B', 'C', 'D']
  Train samples: 282
  Validation groups: ['E']
  Validation samples: 47
  Test groups: ['F', 'G', 'H']
  Test samples: 141

=== Testing with memory_efficient=True (chunked processing) ===
Processing files to build metadata...

Processing file 1/4: file1.csv

Processing file 2/4: file2.csv

Processing file 3/4: file3.csv

Processing file 4/4: file4.csv

Found 10 unique file-group combinations
Representing 8 unique group identifiers

=== Testing PyTorch Lightning Integration ===
Train DataLoader: 40 batches of size 8
Validation DataLoader: 9 batches of size 8
Test DataLoader: 10 batches of size 8

Batch 1 structure:
  past_time: [array(['2025-07-26', '2025-07-27', '2025-07-28'], dtype=object), array(['2025-07-20', '2025-07-21', '2025-07-22'], dtype=object), array(['2025-01-25', '2025-01-26', '2025-01-27'], dtype=object), array(['2025-03-28', '2025-03-29', '2025-03-30'], dtype=object), array(['2025-01-14', '2025-01-15', '2025-01-16'], dtype=object), array(['2025-04-21', '2025-04-22', '2025-04-23'], dtype=object), array(['2025-03-12', '2025-03-13', '2025-03-14'], dtype=object), array(['2025-04-26', '2025-04-27', '2025-04-28'], dtype=object)]
  past_target: Tensor of shape torch.Size([8, 3, 1])
  past_features: Tensor of shape torch.Size([8, 3, 2])
  past_weights: Tensor of shape torch.Size([8, 3])
  future_time: [array(['2025-07-29'], dtype=object), array(['2025-07-23'], dtype=object), array(['2025-01-28'], dtype=object), array(['2025-03-31'], dtype=object), array(['2025-01-17'], dtype=object), array(['2025-04-24'], dtype=object), array(['2025-03-15'], dtype=object), array(['2025-04-29'], dtype=object)]
  future_target: Tensor of shape torch.Size([8, 1, 1])
  future_features: Tensor of shape torch.Size([8, 1, 2])
  future_weights: Tensor of shape torch.Size([8, 1])
  group: ['H', 'E', 'A', 'B', 'A', 'C', 'C', 'D']
  static: [{'static1': 's7', 'static2': np.int64(2)}, {'static1': 's4', 'static2': np.int64(7)}, {'static1': 's2', 'static2': np.int64(9)}, {'static1': 's2', 'static2': np.int64(5)}, {'static1': 's2', 'static2': np.int64(9)}, {'static1': 's3', 'static2': np.int64(6)}, {'static1': 's4', 'static2': np.int64(8)}, {'static1': 's4', 'static2': np.int64(3)}]

Test completed successfully!

@xandie985
Copy link
Author

Current state of code:

  1. Consolidation of D2 Layer:

    • The TSDataProcessor class was removed, and its functionality was merged into the TSDataModule class.
    • This simplifies the architecture by reducing the number of classes and grouping related functionality together.
    • The TSDataModule now directly handles both data processing and PyTorch Lightning integration.
  2. Precompute Handling:

    • Added an _initialize() method in TSDataModule to handle precomputation of valid indices and mappings.
    • This method is called either during initialization (when precompute=True) or lazily during setup, ensuring efficient handling of both precomputed and on-demand modes.
  3. Weight Handling Improvements:

    • Removed the logic that added a default weight column (_default_weight) to the DataFrame in _load_group_data.
    • Weight handling is now moved to the __getitem__ method, where a tensor of ones is created if no weight column is specified or if the column is missing.
    • This prevents unexpected modifications to the user's DataFrame and ensures cleaner data handling.
  4. Caching Optimization:

    • Improved caching logic in TSDataModule.__getitem__ to cache loaded group data regardless of the precompute setting.
    • This avoids reloading the same group multiple times, improving performance even in memory-efficient mode.
  5. Splitting Logic Optimization:

    • Optimized the _create_splits method in TSDataModule to build splits directly from the mapping without repeatedly calling self.mapping.index().
    • This reduces redundant lookups and iterations, making the splitting process more efficient.
  6. Memory Management:

    • Enhanced memory management by ensuring that data is only loaded when needed, and caching is used effectively to avoid redundant data loading.
    • Both memory-efficient and preloaded modes are supported, providing flexibility based on the user's memory constraints.
  7. Code Cleanup and Organization:

    • The code has been reorganized to group related functionality together, making it easier to understand and maintain.
    • The interface remains consistent, ensuring that existing functionality is preserved while improving the internal implementation.

Benefits of the Changes:

  • Simplified API: Users only need to interact with two main classes (MultiSourceTSDataSet and TSDataModule), reducing complexity.
  • Improved Performance: Optimizations in caching and splitting logic reduce computational overhead.
  • Cleaner Data Handling: Weight handling is now more predictable and does not modify the user's DataFrame.
  • Better Memory Usage: Enhanced caching and memory management provide a good balance between memory efficiency and performance.
  • Maintained Functionality: All existing functionality is preserved, ensuring backward compatibility.

@xandie985
Copy link
Author

Key changes:

  • Set max_cached_groups to 50 by default
  • Implemented initial data sorting for both memory modes:
    • Full sorting in non-memory-efficient mode
    • Chunk sorting in memory-efficient mode
  • Maintained categorical feature handling with label encoder
  • Simplified caching:
    • Implemented FIFO strategy
    • Removed access counting
    • Added load order tracking
  • Removed unnecessary parameters:
    • cache_dir from MultiSourceTSDataSet
    • nan_handling from TSDataModule
  • Enhanced metadata handling:
    • Added _add_max_classes_to_metadata()
    • Added known/unknown columns support
  • Optimized _compute_valid_indices:
    • Improved skipping mechanism
    • Removed GPU usage
    • Enhanced early termination
  • Updated examples with realistic data including NaNs
These changes improve:
1. Performance through better caching and sorting
2. Maintainability by removing unused code
3. Reliability with consistent cross-environment behavior
4. Usability with better examples

…series regularization using extend_time_df() | enhancing metadata with detailed column categorizatoin and some optimizatoins
@xandie985
Copy link
Author

Key Changes

TSDataModule Updates

  1. Caching Removed: Caching mechanism and related variables removed from TSDataModule._get_group_data(). Data is now fetched directly from the D1 dataset.

  2. Improved Sorting:

    • Memory Mode: _preload_data() now sorts by group columns, then time column.
    • On-the-fly Mode: _load_group_data() maintains time-only sorting (already filtered by group). Comments added to explain strategy.
  3. Time Series Regularization:

    • extend_time_df integrated into _load_group_data().
    • Automatic frequency detection added (for both datetime and numeric time columns).
    • Regularization applied per group.
    • Error handling added for regularization failures.
  4. Enhanced Metadata:

    • D1 metadata updated to include known_cat_cols, known_num_cols, unknown_cat_cols, and unknown_num_cols.
    • D2 layer now correctly maintains these categories.
  5. Categorical Max Classes:

    • Max classes calculation moved to D1 layer's _prepare_metadata().
    • D2 layer's _add_max_classes_to_metadata() now references D1 metadata.

Unit Tests (in progress)

  • TestExtendTimeDF: Tests time series regularization (numeric/datetime, group handling, gap filling).
  • TestMultiSourceTSDataSet: Tests memory modes, data loading, known/unknown column specification, metadata, and categorical feature handling.
  • TestTSDataModule: Tests data splits, window creation, overriding known/unknown columns (D2), and metadata propagation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants