diff --git a/setup.py b/setup.py index 41c8a53e367..f6844d461a4 100644 --- a/setup.py +++ b/setup.py @@ -187,6 +187,7 @@ "Pillow>=9.4.0", # When PIL.Image.ExifTags was introduced "torchcodec>=0.7.0; python_version < '3.14'", # minium version to get windows support, torchcodec doesn't have wheels for 3.14 yet "nibabel>=5.3.1", + "tsfile>=2.0.0", ] NUMPY2_INCOMPATIBLE_LIBRARIES = [ @@ -210,6 +211,8 @@ NIBABEL_REQUIRE = ["nibabel>=5.3.2", "ipyniivue==2.4.2"] +TSFILE_REQUIRE = ["tsfile>=2.0.0"] + EXTRAS_REQUIRE = { "audio": AUDIO_REQUIRE, "vision": VISION_REQUIRE, @@ -228,6 +231,7 @@ "docs": DOCS_REQUIRE, "pdfs": PDFS_REQUIRE, "nibabel": NIBABEL_REQUIRE, + "tsfile": TSFILE_REQUIRE, } setup( diff --git a/src/datasets/packaged_modules/__init__.py b/src/datasets/packaged_modules/__init__.py index c9a32ff71f0..fd93c98282c 100644 --- a/src/datasets/packaged_modules/__init__.py +++ b/src/datasets/packaged_modules/__init__.py @@ -18,6 +18,7 @@ from .pdffolder import pdffolder from .sql import sql from .text import text +from .tsfile import tsfile from .videofolder import videofolder from .webdataset import webdataset from .xml import xml @@ -52,6 +53,7 @@ def _hash_python_lines(lines: list[str]) -> str: "webdataset": (webdataset.__name__, _hash_python_lines(inspect.getsource(webdataset).splitlines())), "xml": (xml.__name__, _hash_python_lines(inspect.getsource(xml).splitlines())), "hdf5": (hdf5.__name__, _hash_python_lines(inspect.getsource(hdf5).splitlines())), + "tsfile": (tsfile.__name__, _hash_python_lines(inspect.getsource(tsfile).splitlines())), "eval": (eval.__name__, _hash_python_lines(inspect.getsource(eval).splitlines())), } @@ -84,6 +86,7 @@ def _hash_python_lines(lines: list[str]) -> str: ".xml": ("xml", {}), ".hdf5": ("hdf5", {}), ".h5": ("hdf5", {}), + ".tsfile": ("tsfile", {}), ".eval": ("eval", {}), } _EXTENSION_TO_MODULE.update({ext: ("imagefolder", {}) for ext in imagefolder.ImageFolder.EXTENSIONS}) diff --git a/src/datasets/packaged_modules/tsfile/__init__.py b/src/datasets/packaged_modules/tsfile/__init__.py new file mode 100644 index 00000000000..6a9840cd306 --- /dev/null +++ b/src/datasets/packaged_modules/tsfile/__init__.py @@ -0,0 +1 @@ +# empty module initializer diff --git a/src/datasets/packaged_modules/tsfile/tsfile.py b/src/datasets/packaged_modules/tsfile/tsfile.py new file mode 100644 index 00000000000..ecdc5d1f0d7 --- /dev/null +++ b/src/datasets/packaged_modules/tsfile/tsfile.py @@ -0,0 +1,233 @@ +# Copyright 2025 The HuggingFace Datasets Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Apache TsFile dataset builder for time-series data.""" + +import itertools +from dataclasses import dataclass +from typing import Optional + +import pyarrow as pa + +import datasets +from datasets.builder import Key + + +logger = datasets.utils.logging.get_logger(__name__) + +EXTENSIONS = [".tsfile"] + + +@dataclass +class TsFileConfig(datasets.BuilderConfig): + """BuilderConfig for Apache TsFile. + + Args: + batch_size (`int`, *optional*, defaults to 10000): + Number of rows per batch when reading from TsFile. + features (`Features`, *optional*): + Cast the data to `features`. + table_name (`str`, *optional*): + Name of the table to query in table-model TsFiles. + If None, the first table found will be used. + columns (`list[str]`, *optional*): + List of columns to load. If None, all columns are loaded. + start_time (`int`, *optional*): + Start timestamp for time-range filtering (inclusive). + If None, reads from the beginning. + end_time (`int`, *optional*): + End timestamp for time-range filtering (inclusive). + If None, reads until the end. + + Example: + + Load a TsFile dataset: + + ```python + >>> ds = load_dataset("tsfile", data_files=["data.tsfile"]) + ``` + + Load with time-range filtering: + + ```python + >>> ds = load_dataset("tsfile", data_files=["data.tsfile"], start_time=1609459200000, end_time=1640995200000) + ``` + + Load specific columns: + + ```python + >>> ds = load_dataset("tsfile", data_files=["data.tsfile"], columns=["temperature", "humidity"]) + ``` + """ + + batch_size: int = 10000 + features: Optional[datasets.Features] = None + table_name: Optional[str] = None + columns: Optional[list[str]] = None + start_time: Optional[int] = None + end_time: Optional[int] = None + + def __post_init__(self): + super().__post_init__() + + +class TsFile(datasets.ArrowBasedBuilder): + """ArrowBasedBuilder for Apache TsFile time-series data format. + + TsFile is a columnar storage file format designed for time-series data, + providing efficient compression and high query performance. + """ + + BUILDER_CONFIG_CLASS = TsFileConfig + + def _info(self): + return datasets.DatasetInfo(features=self.config.features) + + def _split_generators(self, dl_manager): + """Handle string, list and dicts in datafiles.""" + + if not self.config.data_files: + raise ValueError(f"At least one data file must be specified, but got data_files={self.config.data_files}") + + dl_manager.download_config.extract_on_the_fly = True + data_files = dl_manager.download_and_extract(self.config.data_files) + splits = [] + + for split_name, files in data_files.items(): + if isinstance(files, str): + files = [files] + + files = [dl_manager.iter_files(file) for file in files] + + # Infer features from first file if not provided + if self.info.features is None: + for first_file in itertools.chain.from_iterable(files): + try: + self.info.features = self._infer_features_from_file(first_file) + break + except Exception as e: + logger.warning(f"Failed to infer features from '{first_file}': {e}") + + if self.info.features is None: + raise ValueError( + "Could not infer features from data files. " + "Please specify features explicitly or ensure data files are valid TsFiles." + ) + + splits.append(datasets.SplitGenerator(name=split_name, gen_kwargs={"files": files})) + + return splits + + def _infer_features_from_file(self, file_path: str) -> datasets.Features: + """Infer features from a TsFile's schema.""" + from tsfile import TSDataType, TsFileReader + + # Map TsFile data type values (integers) to HuggingFace datasets Value types + # TSDataType enum values: BOOLEAN=0, INT32=1, INT64=2, FLOAT=3, DOUBLE=4, TEXT=5, etc. + dtype_mapping = { + TSDataType.BOOLEAN.value: "bool", + TSDataType.INT32.value: "int32", + TSDataType.INT64.value: "int64", + TSDataType.FLOAT.value: "float32", + TSDataType.DOUBLE.value: "float64", + TSDataType.TEXT.value: "string", + TSDataType.STRING.value: "string", + TSDataType.TIMESTAMP.value: "int64", + TSDataType.DATE.value: "int32", + TSDataType.BLOB.value: "binary", + } + + features_dict = {} + + with TsFileReader(file_path) as reader: + # Try table-model first + table_schemas = reader.get_all_table_schemas() + if table_schemas: + # Use specified table or first available + table_name = self.config.table_name + if table_name is None: + table_name = list(table_schemas.keys())[0] + + if table_name not in table_schemas: + raise ValueError(f"Table '{table_name}' not found. Available tables: {list(table_schemas.keys())}") + + schema = table_schemas[table_name] + for column in schema.columns: + col_name = column.column_name + if self.config.columns is None or col_name in self.config.columns: + # data_type is an integer matching TSDataType enum values + hf_dtype = dtype_mapping.get(column.data_type, "string") + features_dict[col_name] = datasets.Value(hf_dtype) + + # Add time column + features_dict["time"] = datasets.Value("int64") + + else: + # Fall back to tree-model (timeseries) + timeseries_schemas = reader.get_all_timeseries_schemas() + for ts in timeseries_schemas: + column_name = f"{ts.device_id}.{ts.measurement_id}" + if self.config.columns is None or column_name in self.config.columns: + hf_dtype = dtype_mapping.get(ts.data_type, "string") + features_dict[column_name] = datasets.Value(hf_dtype) + + # Add time column + features_dict["time"] = datasets.Value("int64") + + if not features_dict: + raise ValueError(f"No features could be inferred from '{file_path}'") + + return datasets.Features(features_dict) + + def _generate_tables(self, files): + """Generate PyArrow tables from TsFile data.""" + from tsfile import to_dataframe + + batch_size = self.config.batch_size or 10000 + + for file_idx, file in enumerate(itertools.chain.from_iterable(files)): + try: + # Use to_dataframe with iterator for memory efficiency + df_iterator = to_dataframe( + file_path=file, + table_name=self.config.table_name, + column_names=self.config.columns, + start_time=self.config.start_time, + end_time=self.config.end_time, + max_row_num=batch_size, + as_iterator=True, + ) + + for batch_idx, df in enumerate(df_iterator): + if df.empty: + continue + + # Convert pandas DataFrame to PyArrow Table + pa_table = pa.Table.from_pandas(df, preserve_index=False) + + # Cast to features if specified + if self.info.features is not None: + # Filter columns to match expected features + available_cols = set(pa_table.column_names) + expected_cols = set(self.info.features.keys()) + cols_to_keep = available_cols & expected_cols + + if cols_to_keep: + pa_table = pa_table.select(list(cols_to_keep)) + + yield Key(file_idx, batch_idx), pa_table + + except Exception as e: + logger.error(f"Failed to read file '{file}' with error {type(e).__name__}: {e}") + raise diff --git a/tests/packaged_modules/test_tsfile.py b/tests/packaged_modules/test_tsfile.py new file mode 100644 index 00000000000..c823982b40c --- /dev/null +++ b/tests/packaged_modules/test_tsfile.py @@ -0,0 +1,199 @@ +"""Tests for the TsFile packaged module.""" + +import pytest + +from datasets import Features, Value, load_dataset +from datasets.builder import InvalidConfigName +from datasets.data_files import DataFilesList +from datasets.packaged_modules.tsfile.tsfile import TsFile, TsFileConfig + + +# ============================================================================ +# Fixtures for creating test TsFile files +# ============================================================================ + + +@pytest.fixture +def tsfile_table_model(tmp_path): + """Create a TsFile with table-model data.""" + from tsfile import ColumnCategory, ColumnSchema, TableSchema, Tablet, TSDataType, TsFileTableWriter + + filename = tmp_path / "table_model.tsfile" + n_rows = 10 + + # Define schema + table_schema = TableSchema( + table_name="sensor_data", + columns=[ + ColumnSchema("temperature", TSDataType.FLOAT, ColumnCategory.FIELD), + ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("status", TSDataType.BOOLEAN, ColumnCategory.FIELD), + ], + ) + + with TsFileTableWriter(str(filename), table_schema) as writer: + # Create a tablet with data + tablet = Tablet( + column_name_list=["temperature", "humidity", "status"], + type_list=[TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.BOOLEAN], + max_row_num=n_rows, + ) + tablet.set_table_name("sensor_data") + + for i in range(n_rows): + tablet.add_timestamp(i, i * 1000) # timestamps in milliseconds + tablet.add_value_by_name("temperature", i, 20.0 + i * 0.5) + tablet.add_value_by_name("humidity", i, 45.0 + i * 1.0) + tablet.add_value_by_name("status", i, i % 2 == 0) + + writer.write_table(tablet) + + return str(filename) + + +@pytest.fixture +def simple_tsfile(tmp_path): + """Create a simple TsFile for basic tests.""" + from tsfile import ColumnCategory, ColumnSchema, TableSchema, Tablet, TSDataType, TsFileTableWriter + + filename = tmp_path / "simple.tsfile" + n_rows = 5 + + table_schema = TableSchema( + table_name="test_table", + columns=[ + ColumnSchema("value_int", TSDataType.INT32, ColumnCategory.FIELD), + ColumnSchema("value_float", TSDataType.FLOAT, ColumnCategory.FIELD), + ], + ) + + with TsFileTableWriter(str(filename), table_schema) as writer: + tablet = Tablet( + column_name_list=["value_int", "value_float"], + type_list=[TSDataType.INT32, TSDataType.FLOAT], + max_row_num=n_rows, + ) + tablet.set_table_name("test_table") + + for i in range(n_rows): + tablet.add_timestamp(i, i * 1000) + tablet.add_value_by_name("value_int", i, i * 10) + tablet.add_value_by_name("value_float", i, float(i) * 1.5) + + writer.write_table(tablet) + + return str(filename) + + +# ============================================================================ +# Configuration Tests +# ============================================================================ + + +def test_config_raises_when_invalid_name(): + """Test that invalid config names raise an error.""" + with pytest.raises(InvalidConfigName, match="Bad characters"): + _ = TsFileConfig(name="name-with-*-invalid-character") + + +@pytest.mark.parametrize("data_files", ["str_path", ["str_path"], DataFilesList(["str_path"], [()])]) +def test_config_raises_when_invalid_data_files(data_files): + """Test that invalid data_files parameter raises an error.""" + with pytest.raises(ValueError, match="Expected a DataFilesDict"): + _ = TsFileConfig(name="name", data_files=data_files) + + +def test_config_default_values(): + """Test that TsFileConfig has expected default values.""" + config = TsFileConfig() + assert config.batch_size == 10000 + assert config.features is None + assert config.table_name is None + assert config.columns is None + assert config.start_time is None + assert config.end_time is None + + +# ============================================================================ +# Basic Functionality Tests +# ============================================================================ + + +def test_tsfile_load_simple(simple_tsfile): + """Test loading a simple TsFile.""" + dataset = load_dataset("tsfile", data_files=[simple_tsfile], split="train") + + assert len(dataset) == 5 + assert "value_int" in dataset.column_names + assert "value_float" in dataset.column_names + assert "time" in dataset.column_names + + +def test_tsfile_load_table_model(tsfile_table_model): + """Test loading a table-model TsFile.""" + dataset = load_dataset("tsfile", data_files=[tsfile_table_model], split="train") + + assert len(dataset) == 10 + assert "temperature" in dataset.column_names + assert "humidity" in dataset.column_names + assert "status" in dataset.column_names + assert "time" in dataset.column_names + + +def test_tsfile_data_values(simple_tsfile): + """Test that data values are correctly loaded from TsFile.""" + dataset = load_dataset("tsfile", data_files=[simple_tsfile], split="train") + + # Check timestamps + assert dataset["time"] == [0, 1000, 2000, 3000, 4000] + + # Check int values + assert dataset["value_int"] == [0, 10, 20, 30, 40] + + # Check float values (with tolerance) + import numpy as np + + np.testing.assert_allclose(dataset["value_float"], [0.0, 1.5, 3.0, 4.5, 6.0], rtol=1e-5) + + +# ============================================================================ +# Feature Tests +# ============================================================================ + + +def test_tsfile_feature_inference(simple_tsfile): + """Test automatic feature inference from TsFile schema.""" + dataset = load_dataset("tsfile", data_files=[simple_tsfile], split="train") + + assert dataset.features is not None + assert "value_int" in dataset.features + assert "value_float" in dataset.features + assert "time" in dataset.features + + # Check inferred types + assert dataset.features["value_int"].dtype == "int32" + assert dataset.features["value_float"].dtype == "float32" + assert dataset.features["time"].dtype == "int64" + + +def test_tsfile_custom_features(simple_tsfile): + """Test loading with explicit feature specification.""" + features = Features({"value_int": Value("int64"), "value_float": Value("float64"), "time": Value("int64")}) + dataset = load_dataset("tsfile", data_files=[simple_tsfile], split="train", features=features) + + assert dataset.features is not None + + +# ============================================================================ +# Error Handling Tests +# ============================================================================ + + +def test_tsfile_no_data_files_error(): + """Test that missing data_files raises an error.""" + config = TsFileConfig(name="test", data_files=None) + tsfile_builder = TsFile() + tsfile_builder.config = config + + with pytest.raises(ValueError, match="At least one data file must be specified"): + tsfile_builder._split_generators(None)