diff --git a/docs/source/package_reference/main_classes.mdx b/docs/source/package_reference/main_classes.mdx index 3601cfe23d1..32d09325d7f 100644 --- a/docs/source/package_reference/main_classes.mdx +++ b/docs/source/package_reference/main_classes.mdx @@ -175,7 +175,12 @@ The base class [`IterableDataset`] implements an iterable Dataset backed by pyth - take - shard - repeat + - to_csv + - to_pandas + - to_dict + - to_json - to_parquet + - to_sql - push_to_hub - load_state_dict - state_dict diff --git a/docs/source/process.mdx b/docs/source/process.mdx index 2a2ea3c0cba..e41767a295f 100644 --- a/docs/source/process.mdx +++ b/docs/source/process.mdx @@ -808,17 +808,28 @@ The example below uses the [`pydub`](http://pydub.com/) package to open an audio ## Save -Once you are done processing your dataset, you can save and reuse it later with [`~Dataset.save_to_disk`]. +Once your dataset is ready, you can save it as a Hugging Face Dataset in Parquet format and reuse it later with [`load_dataset`]. -Save your dataset by providing the path to the directory you wish to save it to: +Save your dataset by providing the name of the dataset repository on Hugging Face you wish to save it to to [`~IterableDataset.push_to_hub`]: -```py ->>> encoded_dataset.save_to_disk("path/of/my/dataset/directory") +```python +encoded_dataset.push_to_hub("username/my_dataset") ``` -Use the [`load_from_disk`] function to reload the dataset: +Use the [`load_dataset`] function to reload the dataset (in streaming mode or not): + +```python +from datasets import load_dataset +reloaded_dataset = load_dataset("username/my_dataset", streaming=True) +``` + +Alternatively, you can save it locally in Arrow format on disk. Compared to Parquet, Arrow is uncompressed which makes it much faster to reload which is great for local use on disk and ephemeral caching. But since it's larger and with less metadata, it is slower to upload/download/query than Parquet and less suited for long term storage. + +Use the [`~Dataset.save_to_disk`] and [`load_from_disk`] function to reload the dataset from your disk: ```py +>>> encoded_dataset.save_to_disk("path/of/my/dataset/directory") +>>> # later >>> from datasets import load_from_disk >>> reloaded_dataset = load_from_disk("path/of/my/dataset/directory") ``` diff --git a/docs/source/stream.mdx b/docs/source/stream.mdx index 13450a4d971..4b6cfeebdcb 100644 --- a/docs/source/stream.mdx +++ b/docs/source/stream.mdx @@ -51,6 +51,17 @@ You can find more details in the [Dataset vs. IterableDataset guide](./about_map + +## Column indexing + +Sometimes it is convenient to iterate over values of a specific column. Fortunately, an [`IterableDataset`] supports column indexing: +```python +>>> from datasets import load_dataset +>>> dataset = load_dataset("allenai/c4", "en", streaming=True, split="train") +>>> print(next(iter(dataset["text"]))) +Beginners BBQ Class Taking Place in Missoula!... +``` + ## Convert from a Dataset If you have an existing [`Dataset`] object, you can convert it to an [`IterableDataset`] with the [`~Dataset.to_iterable_dataset`] function. This is actually faster than setting the `streaming=True` argument in [`load_dataset`] because the data is streamed from local files. @@ -495,12 +506,47 @@ Resuming returns exactly where the checkpoint was saved except if `.shuffle()` i -## Column indexing -Sometimes it is convenient to iterate over values of a specific column. Fortunately, an [`IterableDataset`] supports column indexing: +## Save + +Once your iterable dataset is ready, you can save it as a Hugging Face Dataset in Parquet format and reuse it later with [`load_dataset`]. + +Save your dataset by providing the name of the dataset repository on Hugging Face you wish to save it to to [`~Dataset.push_to_hub`]. This iterates over the dataset and progressively uploads the data to Hugging Face: + ```python ->>> from datasets import load_dataset ->>> dataset = load_dataset("allenai/c4", "en", streaming=True, split="train") ->>> print(next(iter(dataset["text"]))) -Beginners BBQ Class Taking Place in Missoula!... -``` \ No newline at end of file +dataset.push_to_hub("username/my_dataset") +``` + +Use the [`load_dataset`] function to reload the dataset: + +```python +from datasets import load_dataset +reloaded_dataset = load_dataset("username/my_dataset") +``` + +## Export + +🤗 Datasets supports exporting as well so you can work with your dataset in other applications. The following table shows currently supported file formats you can export to: + +| File type | Export method | +|-------------------------|----------------------------------------------------------------| +| CSV | [`IterableDataset.to_csv`] | +| JSON | [`IterableDataset.to_json`] | +| Parquet | [`IterableDataset.to_parquet`] | +| SQL | [`IterableDataset.to_sql`] | +| In-memory Python object | [`IterableDataset.to_pandas`], [`IterableDataset.to_polars`] or [`IterableDataset.to_dict`] | + +For example, export your dataset to a CSV file like this: + +```py +>>> dataset.to_csv("path/of/my/dataset.csv") +``` + +If you have a large dataset, you can save one file per shard, e.g. + +```py +>>> num_shards = dataset.num_shards +>>> for index in range(num_shards): +... shard = dataset.shard(index, num_shards) +... shard.to_parquet(f"path/of/my/dataset/data-{index:05d}.parquet") +``` diff --git a/src/datasets/arrow_dataset.py b/src/datasets/arrow_dataset.py index 25ec3a77727..b1d4f0c9230 100644 --- a/src/datasets/arrow_dataset.py +++ b/src/datasets/arrow_dataset.py @@ -4937,12 +4937,15 @@ def to_csv( **to_csv_kwargs, ).write() - def to_dict(self, batch_size: Optional[int] = None) -> Union[dict, Iterator[dict]]: + def to_dict(self, batch_size: Optional[int] = None, batched: bool = False) -> Union[dict, Iterator[dict]]: """Returns the dataset as a Python dict. Can also return a generator for large datasets. Args: batch_size (`int`, *optional*): The size (number of rows) of the batches if `batched` is `True`. Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`. + batched (`bool`): + Set to `True` to return a generator that yields the dataset as batches + of `batch_size` rows. Defaults to `False` (returns the whole datasets once). Returns: `dict` or `Iterator[dict]` @@ -5045,12 +5048,12 @@ def to_pandas( """Returns the dataset as a `pandas.DataFrame`. Can also return a generator for large datasets. Args: - batched (`bool`): - Set to `True` to return a generator that yields the dataset as batches - of `batch_size` rows. Defaults to `False` (returns the whole datasets once). batch_size (`int`, *optional*): The size (number of rows) of the batches if `batched` is `True`. Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`. + batched (`bool`): + Set to `True` to return a generator that yields the dataset as batches + of `batch_size` rows. Defaults to `False` (returns the whole datasets once). Returns: `pandas.DataFrame` or `Iterator[pandas.DataFrame]` @@ -5088,12 +5091,12 @@ def to_polars( """Returns the dataset as a `polars.DataFrame`. Can also return a generator for large datasets. Args: - batched (`bool`): - Set to `True` to return a generator that yields the dataset as batches - of `batch_size` rows. Defaults to `False` (returns the whole datasets once). batch_size (`int`, *optional*): The size (number of rows) of the batches if `batched` is `True`. Defaults to `genomicsml.datasets.config.DEFAULT_MAX_BATCH_SIZE`. + batched (`bool`): + Set to `True` to return a generator that yields the dataset as batches + of `batch_size` rows. Defaults to `False` (returns the whole datasets once). schema_overrides (`dict`, *optional*): Support type specification or override of one or more columns; note that any dtypes inferred from the schema param will be overridden. diff --git a/src/datasets/iterable_dataset.py b/src/datasets/iterable_dataset.py index cd855ee4e52..17dfdcb88e0 100644 --- a/src/datasets/iterable_dataset.py +++ b/src/datasets/iterable_dataset.py @@ -60,6 +60,10 @@ if TYPE_CHECKING: + import sqlite3 + + import polars as pl + import sqlalchemy import torch logger = get_logger(__name__) @@ -3408,6 +3412,249 @@ def batch_fn(unbatched): batch_fn, batched=True, batch_size=batch_size, drop_last_batch=drop_last_batch, features=features ) + def to_dict(self, batch_size: Optional[int] = None, batched: bool = False) -> Union[dict, Iterator[dict]]: + """Returns the dataset as a Python dict. Can also return a generator for large datasets. + + Args: + batch_size (`int`, *optional*): The size (number of rows) of the batches if `batched` is `True`. + Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`. + + Returns: + `dict` or `Iterator[dict]` + + Example: + + ```py + >>> ds.to_dict() + ``` + """ + if batched: + for table in self.with_format("arrow").iter(batch_size=batch_size): + yield Dataset(table, fingerprint="unset").to_dict() + else: + table = pa.concat_tables(list(self.with_format("arrow").iter(batch_size=1000))) + return Dataset(table, fingerprint="unset").to_dict() + + def to_list(self) -> list: + """Returns the dataset as a Python list. + + Returns: + `list` + + Example: + + ```py + >>> ds.to_list() + ``` + """ + table = pa.concat_tables(list(self.with_format("arrow").iter(batch_size=1000))) + return Dataset(table, fingerprint="unset").to_list() + + def to_pandas( + self, batch_size: Optional[int] = None, batched: bool = False + ) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]: + """Returns the dataset as a `pandas.DataFrame`. Can also return a generator for large datasets. + + Args: + batch_size (`int`, *optional*): + The size (number of rows) of the batches if `batched` is `True`. + Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`. + batched (`bool`): + Set to `True` to return a generator that yields the dataset as batches + of `batch_size` rows. Defaults to `False` (returns the whole datasets once). + + Returns: + `pandas.DataFrame` or `Iterator[pandas.DataFrame]` + + Example: + + ```py + >>> ds.to_pandas() + ``` + """ + if batched: + for table in self.with_format("arrow").iter(batch_size=batch_size): + yield Dataset(table, fingerprint="unset").to_pandas() + else: + table = pa.concat_tables(list(self.with_format("arrow").iter(batch_size=1000))) + return Dataset(table, fingerprint="unset").to_pandas() + + def to_polars( + self, + batch_size: Optional[int] = None, + batched: bool = False, + schema_overrides: Optional[dict] = None, + rechunk: bool = True, + ) -> Union["pl.DataFrame", Iterator["pl.DataFrame"]]: + """Returns the dataset as a `polars.DataFrame`. Can also return a generator for large datasets. + + Args: + batch_size (`int`, *optional*): + The size (number of rows) of the batches if `batched` is `True`. + Defaults to `genomicsml.datasets.config.DEFAULT_MAX_BATCH_SIZE`. + batched (`bool`): + Set to `True` to return a generator that yields the dataset as batches + of `batch_size` rows. Defaults to `False` (returns the whole datasets once). + schema_overrides (`dict`, *optional*): + Support type specification or override of one or more columns; note that + any dtypes inferred from the schema param will be overridden. + rechunk (`bool`): + Make sure that all data is in contiguous memory. Defaults to `True`. + Returns: + `polars.DataFrame` or `Iterator[polars.DataFrame]` + + Example: + + ```py + >>> ds.to_polars() + ``` + """ + if batched: + for table in self.with_format("arrow").iter(batch_size=batch_size): + yield Dataset(table, fingerprint="unset").to_polars(schema_overrides=schema_overrides, rechunk=rechunk) + else: + table = pa.concat_tables(list(self.with_format("arrow").iter(batch_size=1000))) + return Dataset(table, fingerprint="unset").to_polars(schema_overrides=schema_overrides, rechunk=rechunk) + + def to_csv( + self, + path_or_buf: Union[PathLike, BinaryIO], + batch_size: Optional[int] = None, + storage_options: Optional[dict] = None, + **to_csv_kwargs, + ) -> int: + """Exports the dataset to csv. + + This iterates on the dataset and loads it completely in memory before writing it. + + Args: + path_or_buf (`PathLike` or `FileOrBuffer`): + Either a path to a file (e.g. `file.csv`), a remote URI (e.g. `hf://datasets/username/my_dataset_name/data.csv`), + or a BinaryIO, where the dataset will be saved to in the specified format. + batch_size (`int`, *optional*): + Size of the batch to load in memory and write at once. + Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`. + storage_options (`dict`, *optional*): + Key/value pairs to be passed on to the file-system backend, if any. + **to_csv_kwargs (additional keyword arguments): + Parameters to pass to pandas's [`pandas.DataFrame.to_csv`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html). + The parameter `index` defaults to `False` if not specified. + If you would like to write the index, pass `index=True` and also set a name for the index column by + passing `index_label`. + + Returns: + `int`: The number of characters or bytes written. + + Example: + + ```py + >>> ds.to_csv("path/to/dataset/directory") + ``` + """ + table = pa.concat_tables(list(self.with_format("arrow").iter(batch_size=1000))) + return Dataset(table, fingerprint="unset").to_csv( + path_or_buf, + batch_size=batch_size, + storage_options=storage_options, + **to_csv_kwargs, + ) + + def to_json( + self, + path_or_buf: Union[PathLike, BinaryIO], + batch_size: Optional[int] = None, + storage_options: Optional[dict] = None, + **to_json_kwargs, + ) -> int: + """Export the dataset to JSON Lines or JSON. + + This iterates on the dataset and loads it completely in memory before writing it. + + The default output format is [JSON Lines](https://jsonlines.org/). + To export to [JSON](https://www.json.org), pass `lines=False` argument and the desired `orient`. + + Args: + path_or_buf (`PathLike` or `FileOrBuffer`): + Either a path to a file (e.g. `file.json`), a remote URI (e.g. `hf://datasets/username/my_dataset_name/data.json`), + or a BinaryIO, where the dataset will be saved to in the specified format. + batch_size (`int`, *optional*): + Size of the batch to load in memory and write at once. + Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`. + storage_options (`dict`, *optional*): + Key/value pairs to be passed on to the file-system backend, if any. + **to_json_kwargs (additional keyword arguments): + Parameters to pass to pandas's [`pandas.DataFrame.to_json`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_json.html). + Default arguments are `lines=True` and `orient="records". + The parameter `index` defaults to `False` if `orient` is `"split"` or `"table"`. + If you would like to write the index, pass `index=True`. + + Returns: + `int`: The number of characters or bytes written. + + Example: + + ```py + >>> ds.to_json("path/to/dataset/directory/filename.jsonl") + ``` + + ```py + >>> num_shards = dataset.num_shards + >>> for index in range(num_shards): + ... shard = dataset.shard(index, num_shards) + ... shard.to_json(f"path/of/my/dataset/data-{index:05d}.jsonl") + ``` + + """ + table = pa.concat_tables(list(self.with_format("arrow").iter(batch_size=1000))) + return Dataset(table, fingerprint="unset").to_json( + path_or_buf, + batch_size=batch_size, + storage_options=storage_options, + **to_json_kwargs, + ) + + def to_sql( + self, + name: str, + con: Union[str, "sqlalchemy.engine.Connection", "sqlalchemy.engine.Engine", "sqlite3.Connection"], + batch_size: Optional[int] = None, + **sql_writer_kwargs, + ) -> int: + """Exports the dataset to a SQL database. + + Args: + name (`str`): + Name of SQL table. + con (`str` or `sqlite3.Connection` or `sqlalchemy.engine.Connection` or `sqlalchemy.engine.Connection`): + A [URI string](https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls) or a SQLite3/SQLAlchemy connection object used to write to a database. + batch_size (`int`, *optional*): + Size of the batch to load in memory and write at once. + Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`. + **sql_writer_kwargs (additional keyword arguments): + Parameters to pass to pandas's [`pandas.DataFrame.to_sql`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html). + The parameter `index` defaults to `False` if not specified. + If you would like to write the index, pass `index=True` and also set a name for the index column by + passing `index_label`. + + + Returns: + `int`: The number of records written. + + Example: + + ```py + >>> # con provided as a connection URI string + >>> ds.to_sql("data", "sqlite:///my_own_db.sql") + >>> # con provided as a sqlite3 connection object + >>> import sqlite3 + >>> con = sqlite3.connect("my_own_db.sql") + >>> with con: + ... ds.to_sql("data", con) + ``` + """ + table = pa.concat_tables(list(self.with_format("arrow").iter(batch_size=1000))) + return Dataset(table, fingerprint="unset").to_sql(name, con, batch_size=batch_size, **sql_writer_kwargs) + def to_parquet( self, path_or_buf: Union[PathLike, BinaryIO], @@ -3439,16 +3686,22 @@ def to_parquet( ```py >>> ds.to_parquet("path/to/dataset/directory") ``` + + ```py + >>> num_shards = dataset.num_shards + >>> for index in range(num_shards): + ... shard = dataset.shard(index, num_shards) + ... shard.to_parquet(f"path/of/my/dataset/data-{index:05d}.parquet") + ``` + """ - # Dynamic import to avoid circular dependency - from .io.parquet import ParquetDatasetWriter, get_writer_batch_size + from .io.parquet import get_writer_batch_size batch_size = get_writer_batch_size(self.features) table = pa.concat_tables(list(self.with_format("arrow").iter(batch_size=batch_size))) - dataset = Dataset(table, fingerprint="unset") - return ParquetDatasetWriter( - dataset, path_or_buf, batch_size=batch_size, storage_options=storage_options, **parquet_writer_kwargs - ).write() + return Dataset(table, fingerprint="unset").to_parquet( + path_or_buf, batch_size=batch_size, storage_options=storage_options, **parquet_writer_kwargs + ) def _push_parquet_shards_to_hub( self,