diff --git a/src/datasets/arrow_dataset.py b/src/datasets/arrow_dataset.py index 95126664561..9c49378471a 100644 --- a/src/datasets/arrow_dataset.py +++ b/src/datasets/arrow_dataset.py @@ -4729,13 +4729,15 @@ def to_csv( path_or_buf: Union[PathLike, BinaryIO], batch_size: Optional[int] = None, num_proc: Optional[int] = None, + storage_options: Optional[dict] = None, **to_csv_kwargs, ) -> int: """Exports the dataset to csv Args: path_or_buf (`PathLike` or `FileOrBuffer`): - Either a path to a file or a BinaryIO. + Either a path to a file (e.g. `file.csv`), a remote URI (e.g. `hf://datasets/username/my_dataset_name/data.csv`), + or a BinaryIO, where the dataset will be saved to in the specified format. batch_size (`int`, *optional*): Size of the batch to load in memory and write at once. Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`. @@ -4744,6 +4746,10 @@ def to_csv( use multiprocessing. `batch_size` in this case defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE` but feel free to make it 5x or 10x of the default value if you have sufficient compute power. + storage_options (`dict`, *optional*): + Key/value pairs to be passed on to the file-system backend, if any. + + **to_csv_kwargs (additional keyword arguments): Parameters to pass to pandas's [`pandas.DataFrame.to_csv`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_json.html). @@ -4768,7 +4774,14 @@ def to_csv( # Dynamic import to avoid circular dependency from .io.csv import CsvDatasetWriter - return CsvDatasetWriter(self, path_or_buf, batch_size=batch_size, num_proc=num_proc, **to_csv_kwargs).write() + return CsvDatasetWriter( + self, + path_or_buf, + batch_size=batch_size, + num_proc=num_proc, + storage_options=storage_options, + **to_csv_kwargs, + ).write() def to_dict(self, batch_size: Optional[int] = None, batched="deprecated") -> Union[dict, Iterator[dict]]: """Returns the dataset as a Python dict. Can also return a generator for large datasets. @@ -4844,13 +4857,15 @@ def to_json( path_or_buf: Union[PathLike, BinaryIO], batch_size: Optional[int] = None, num_proc: Optional[int] = None, + storage_options: Optional[dict] = None, **to_json_kwargs, ) -> int: """Export the dataset to JSON Lines or JSON. Args: path_or_buf (`PathLike` or `FileOrBuffer`): - Either a path to a file or a BinaryIO. + Either a path to a file (e.g. `file.json`), a remote URI (e.g. `hf://datasets/username/my_dataset_name/data.json`), + or a BinaryIO, where the dataset will be saved to in the specified format. batch_size (`int`, *optional*): Size of the batch to load in memory and write at once. Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`. @@ -4859,6 +4874,10 @@ def to_json( use multiprocessing. `batch_size` in this case defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE` but feel free to make it 5x or 10x of the default value if you have sufficient compute power. + storage_options (`dict`, *optional*): + Key/value pairs to be passed on to the file-system backend, if any. + + **to_json_kwargs (additional keyword arguments): Parameters to pass to pandas's [`pandas.DataFrame.to_json`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_json.html). @@ -4882,7 +4901,14 @@ def to_json( # Dynamic import to avoid circular dependency from .io.json import JsonDatasetWriter - return JsonDatasetWriter(self, path_or_buf, batch_size=batch_size, num_proc=num_proc, **to_json_kwargs).write() + return JsonDatasetWriter( + self, + path_or_buf, + batch_size=batch_size, + num_proc=num_proc, + storage_options=storage_options, + **to_json_kwargs, + ).write() def to_pandas( self, batch_size: Optional[int] = None, batched: bool = False @@ -4927,16 +4953,22 @@ def to_parquet( self, path_or_buf: Union[PathLike, BinaryIO], batch_size: Optional[int] = None, + storage_options: Optional[dict] = None, **parquet_writer_kwargs, ) -> int: """Exports the dataset to parquet Args: path_or_buf (`PathLike` or `FileOrBuffer`): - Either a path to a file or a BinaryIO. + Either a path to a file (e.g. `file.parquet`), a remote URI (e.g. `hf://datasets/username/my_dataset_name/data.parquet`), + or a BinaryIO, where the dataset will be saved to in the specified format. batch_size (`int`, *optional*): Size of the batch to load in memory and write at once. Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`. + storage_options (`dict`, *optional*): + Key/value pairs to be passed on to the file-system backend, if any. + + **parquet_writer_kwargs (additional keyword arguments): Parameters to pass to PyArrow's `pyarrow.parquet.ParquetWriter`. @@ -4952,7 +4984,9 @@ def to_parquet( # Dynamic import to avoid circular dependency from .io.parquet import ParquetDatasetWriter - return ParquetDatasetWriter(self, path_or_buf, batch_size=batch_size, **parquet_writer_kwargs).write() + return ParquetDatasetWriter( + self, path_or_buf, batch_size=batch_size, storage_options=storage_options, **parquet_writer_kwargs + ).write() def to_sql( self, diff --git a/src/datasets/io/csv.py b/src/datasets/io/csv.py index f5091e1352e..3aa11303e91 100644 --- a/src/datasets/io/csv.py +++ b/src/datasets/io/csv.py @@ -2,6 +2,8 @@ import os from typing import BinaryIO, Optional, Union +import fsspec + from .. import Dataset, Features, NamedSplit, config from ..formatting import query_table from ..packaged_modules.csv.csv import Csv @@ -72,6 +74,7 @@ def __init__( path_or_buf: Union[PathLike, BinaryIO], batch_size: Optional[int] = None, num_proc: Optional[int] = None, + storage_options: Optional[dict] = None, **to_csv_kwargs, ): if num_proc is not None and num_proc <= 0: @@ -82,6 +85,7 @@ def __init__( self.batch_size = batch_size if batch_size else config.DEFAULT_MAX_BATCH_SIZE self.num_proc = num_proc self.encoding = "utf-8" + self.storage_options = storage_options or {} self.to_csv_kwargs = to_csv_kwargs def write(self) -> int: @@ -90,7 +94,7 @@ def write(self) -> int: index = self.to_csv_kwargs.pop("index", False) if isinstance(self.path_or_buf, (str, bytes, os.PathLike)): - with open(self.path_or_buf, "wb+") as buffer: + with fsspec.open(self.path_or_buf, "wb", **(self.storage_options or {})) as buffer: written = self._write(file_obj=buffer, header=header, index=index, **self.to_csv_kwargs) else: written = self._write(file_obj=self.path_or_buf, header=header, index=index, **self.to_csv_kwargs) diff --git a/src/datasets/io/json.py b/src/datasets/io/json.py index 2d4698df966..c1d89b0fd04 100644 --- a/src/datasets/io/json.py +++ b/src/datasets/io/json.py @@ -77,6 +77,7 @@ def __init__( path_or_buf: Union[PathLike, BinaryIO], batch_size: Optional[int] = None, num_proc: Optional[int] = None, + storage_options: Optional[dict] = None, **to_json_kwargs, ): if num_proc is not None and num_proc <= 0: @@ -87,6 +88,7 @@ def __init__( self.batch_size = batch_size if batch_size else config.DEFAULT_MAX_BATCH_SIZE self.num_proc = num_proc self.encoding = "utf-8" + self.storage_options = storage_options or {} self.to_json_kwargs = to_json_kwargs def write(self) -> int: @@ -104,7 +106,9 @@ def write(self) -> int: raise NotImplementedError(f"`datasets` currently does not support {compression} compression") if isinstance(self.path_or_buf, (str, bytes, os.PathLike)): - with fsspec.open(self.path_or_buf, "wb", compression=compression) as buffer: + with fsspec.open( + self.path_or_buf, "wb", compression=compression, **(self.storage_options or {}) + ) as buffer: written = self._write(file_obj=buffer, orient=orient, lines=lines, **self.to_json_kwargs) else: if compression: diff --git a/src/datasets/io/parquet.py b/src/datasets/io/parquet.py index 97245a36204..2a04285259f 100644 --- a/src/datasets/io/parquet.py +++ b/src/datasets/io/parquet.py @@ -1,6 +1,7 @@ import os from typing import BinaryIO, Optional, Union +import fsspec import numpy as np import pyarrow.parquet as pq @@ -112,18 +113,20 @@ def __init__( dataset: Dataset, path_or_buf: Union[PathLike, BinaryIO], batch_size: Optional[int] = None, + storage_options: Optional[dict] = None, **parquet_writer_kwargs, ): self.dataset = dataset self.path_or_buf = path_or_buf self.batch_size = batch_size or get_writer_batch_size(dataset.features) + self.storage_options = storage_options or {} self.parquet_writer_kwargs = parquet_writer_kwargs def write(self) -> int: batch_size = self.batch_size if self.batch_size else config.DEFAULT_MAX_BATCH_SIZE if isinstance(self.path_or_buf, (str, bytes, os.PathLike)): - with open(self.path_or_buf, "wb+") as buffer: + with fsspec.open(self.path_or_buf, "wb", **(self.storage_options or {})) as buffer: written = self._write(file_obj=buffer, batch_size=batch_size, **self.parquet_writer_kwargs) else: written = self._write(file_obj=self.path_or_buf, batch_size=batch_size, **self.parquet_writer_kwargs) diff --git a/tests/io/test_csv.py b/tests/io/test_csv.py index 5f75fec0fa3..f8bd68ee4f6 100644 --- a/tests/io/test_csv.py +++ b/tests/io/test_csv.py @@ -1,6 +1,7 @@ import csv import os +import fsspec import pytest from datasets import Dataset, DatasetDict, Features, NamedSplit, Value @@ -162,3 +163,13 @@ def test_dataset_to_csv_invalidproc(csv_path, tmp_path): dataset = CsvDatasetReader({"train": csv_path}, cache_dir=cache_dir).read() with pytest.raises(ValueError): CsvDatasetWriter(dataset["train"], output_csv, num_proc=0) + + +def test_dataset_to_csv_fsspec(dataset, mockfs): + dataset_path = "mock://my_dataset.csv" + writer = CsvDatasetWriter(dataset, dataset_path, storage_options=mockfs.storage_options) + assert writer.write() > 0 + assert mockfs.isfile(dataset_path) + + with fsspec.open(dataset_path, "rb", **mockfs.storage_options) as f: + assert f.read() diff --git a/tests/io/test_json.py b/tests/io/test_json.py index fd71e510dcb..671bbcb0cdc 100644 --- a/tests/io/test_json.py +++ b/tests/io/test_json.py @@ -268,3 +268,12 @@ def test_dataset_to_json_compression(self, shared_datadir, tmp_path_factory, ext with fsspec.open(original_path, "rb", compression="infer") as f: original_content = f.read() assert exported_content == original_content + + def test_dataset_to_json_fsspec(self, dataset, mockfs): + dataset_path = "mock://my_dataset.json" + writer = JsonDatasetWriter(dataset, dataset_path, storage_options=mockfs.storage_options) + assert writer.write() > 0 + assert mockfs.isfile(dataset_path) + + with fsspec.open(dataset_path, "rb", **mockfs.storage_options) as f: + assert f.read() diff --git a/tests/io/test_parquet.py b/tests/io/test_parquet.py index 3e5ddee113e..5466e633f04 100644 --- a/tests/io/test_parquet.py +++ b/tests/io/test_parquet.py @@ -1,3 +1,4 @@ +import fsspec import pyarrow.parquet as pq import pytest @@ -213,3 +214,13 @@ def test_dataset_to_parquet_keeps_features(shared_datadir, tmp_path): ) def test_get_writer_batch_size(feature, expected): assert get_writer_batch_size(feature) == expected + + +def test_dataset_to_parquet_fsspec(dataset, mockfs): + dataset_path = "mock://my_dataset.csv" + writer = ParquetDatasetWriter(dataset, dataset_path, storage_options=mockfs.storage_options) + assert writer.write() > 0 + assert mockfs.isfile(dataset_path) + + with fsspec.open(dataset_path, "rb", **mockfs.storage_options) as f: + assert f.read()