Skip to content
46 changes: 40 additions & 6 deletions src/datasets/arrow_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4729,13 +4729,15 @@ def to_csv(
path_or_buf: Union[PathLike, BinaryIO],
batch_size: Optional[int] = None,
num_proc: Optional[int] = None,
storage_options: Optional[dict] = None,
**to_csv_kwargs,
) -> int:
"""Exports the dataset to csv

Args:
path_or_buf (`PathLike` or `FileOrBuffer`):
Either a path to a file or a BinaryIO.
Either a path to a file (e.g. `file.csv`), a remote URI (e.g. `hf://datasets/username/my_dataset_name/data.csv`),
or a BinaryIO, where the dataset will be saved to in the specified format.
batch_size (`int`, *optional*):
Size of the batch to load in memory and write at once.
Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`.
Expand All @@ -4744,6 +4746,10 @@ def to_csv(
use multiprocessing. `batch_size` in this case defaults to
`datasets.config.DEFAULT_MAX_BATCH_SIZE` but feel free to make it 5x or 10x of the default
value if you have sufficient compute power.
storage_options (`dict`, *optional*):
Key/value pairs to be passed on to the file-system backend, if any.

<Added version="2.19.0"/>
**to_csv_kwargs (additional keyword arguments):
Parameters to pass to pandas's [`pandas.DataFrame.to_csv`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_json.html).

Expand All @@ -4768,7 +4774,14 @@ def to_csv(
# Dynamic import to avoid circular dependency
from .io.csv import CsvDatasetWriter

return CsvDatasetWriter(self, path_or_buf, batch_size=batch_size, num_proc=num_proc, **to_csv_kwargs).write()
return CsvDatasetWriter(
self,
path_or_buf,
batch_size=batch_size,
num_proc=num_proc,
storage_options=storage_options,
**to_csv_kwargs,
).write()

def to_dict(self, batch_size: Optional[int] = None, batched="deprecated") -> Union[dict, Iterator[dict]]:
"""Returns the dataset as a Python dict. Can also return a generator for large datasets.
Expand Down Expand Up @@ -4844,13 +4857,15 @@ def to_json(
path_or_buf: Union[PathLike, BinaryIO],
batch_size: Optional[int] = None,
num_proc: Optional[int] = None,
storage_options: Optional[dict] = None,
**to_json_kwargs,
) -> int:
"""Export the dataset to JSON Lines or JSON.

Args:
path_or_buf (`PathLike` or `FileOrBuffer`):
Either a path to a file or a BinaryIO.
Either a path to a file (e.g. `file.json`), a remote URI (e.g. `hf://datasets/username/my_dataset_name/data.json`),
or a BinaryIO, where the dataset will be saved to in the specified format.
batch_size (`int`, *optional*):
Size of the batch to load in memory and write at once.
Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`.
Expand All @@ -4859,6 +4874,10 @@ def to_json(
use multiprocessing. `batch_size` in this case defaults to
`datasets.config.DEFAULT_MAX_BATCH_SIZE` but feel free to make it 5x or 10x of the default
value if you have sufficient compute power.
storage_options (`dict`, *optional*):
Key/value pairs to be passed on to the file-system backend, if any.

<Added version="2.19.0"/>
**to_json_kwargs (additional keyword arguments):
Parameters to pass to pandas's [`pandas.DataFrame.to_json`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_json.html).

Expand All @@ -4882,7 +4901,14 @@ def to_json(
# Dynamic import to avoid circular dependency
from .io.json import JsonDatasetWriter

return JsonDatasetWriter(self, path_or_buf, batch_size=batch_size, num_proc=num_proc, **to_json_kwargs).write()
return JsonDatasetWriter(
self,
path_or_buf,
batch_size=batch_size,
num_proc=num_proc,
storage_options=storage_options,
**to_json_kwargs,
).write()

def to_pandas(
self, batch_size: Optional[int] = None, batched: bool = False
Expand Down Expand Up @@ -4927,16 +4953,22 @@ def to_parquet(
self,
path_or_buf: Union[PathLike, BinaryIO],
batch_size: Optional[int] = None,
storage_options: Optional[dict] = None,
**parquet_writer_kwargs,
) -> int:
"""Exports the dataset to parquet

Args:
path_or_buf (`PathLike` or `FileOrBuffer`):
Either a path to a file or a BinaryIO.
Either a path to a file (e.g. `file.parquet`), a remote URI (e.g. `hf://datasets/username/my_dataset_name/data.parquet`),
or a BinaryIO, where the dataset will be saved to in the specified format.
batch_size (`int`, *optional*):
Size of the batch to load in memory and write at once.
Defaults to `datasets.config.DEFAULT_MAX_BATCH_SIZE`.
storage_options (`dict`, *optional*):
Key/value pairs to be passed on to the file-system backend, if any.

<Added version="2.19.0"/>
**parquet_writer_kwargs (additional keyword arguments):
Parameters to pass to PyArrow's `pyarrow.parquet.ParquetWriter`.

Expand All @@ -4952,7 +4984,9 @@ def to_parquet(
# Dynamic import to avoid circular dependency
from .io.parquet import ParquetDatasetWriter

return ParquetDatasetWriter(self, path_or_buf, batch_size=batch_size, **parquet_writer_kwargs).write()
return ParquetDatasetWriter(
self, path_or_buf, batch_size=batch_size, storage_options=storage_options, **parquet_writer_kwargs
).write()

def to_sql(
self,
Expand Down
6 changes: 5 additions & 1 deletion src/datasets/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import os
from typing import BinaryIO, Optional, Union

import fsspec

from .. import Dataset, Features, NamedSplit, config
from ..formatting import query_table
from ..packaged_modules.csv.csv import Csv
Expand Down Expand Up @@ -72,6 +74,7 @@ def __init__(
path_or_buf: Union[PathLike, BinaryIO],
batch_size: Optional[int] = None,
num_proc: Optional[int] = None,
storage_options: Optional[dict] = None,
**to_csv_kwargs,
):
if num_proc is not None and num_proc <= 0:
Expand All @@ -82,6 +85,7 @@ def __init__(
self.batch_size = batch_size if batch_size else config.DEFAULT_MAX_BATCH_SIZE
self.num_proc = num_proc
self.encoding = "utf-8"
self.storage_options = storage_options or {}
self.to_csv_kwargs = to_csv_kwargs

def write(self) -> int:
Expand All @@ -90,7 +94,7 @@ def write(self) -> int:
index = self.to_csv_kwargs.pop("index", False)

if isinstance(self.path_or_buf, (str, bytes, os.PathLike)):
with open(self.path_or_buf, "wb+") as buffer:
with fsspec.open(self.path_or_buf, "wb", **(self.storage_options or {})) as buffer:
written = self._write(file_obj=buffer, header=header, index=index, **self.to_csv_kwargs)
else:
written = self._write(file_obj=self.path_or_buf, header=header, index=index, **self.to_csv_kwargs)
Expand Down
6 changes: 5 additions & 1 deletion src/datasets/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(
path_or_buf: Union[PathLike, BinaryIO],
batch_size: Optional[int] = None,
num_proc: Optional[int] = None,
storage_options: Optional[dict] = None,
**to_json_kwargs,
):
if num_proc is not None and num_proc <= 0:
Expand All @@ -87,6 +88,7 @@ def __init__(
self.batch_size = batch_size if batch_size else config.DEFAULT_MAX_BATCH_SIZE
self.num_proc = num_proc
self.encoding = "utf-8"
self.storage_options = storage_options or {}
self.to_json_kwargs = to_json_kwargs

def write(self) -> int:
Expand All @@ -104,7 +106,9 @@ def write(self) -> int:
raise NotImplementedError(f"`datasets` currently does not support {compression} compression")

if isinstance(self.path_or_buf, (str, bytes, os.PathLike)):
with fsspec.open(self.path_or_buf, "wb", compression=compression) as buffer:
with fsspec.open(
self.path_or_buf, "wb", compression=compression, **(self.storage_options or {})
) as buffer:
written = self._write(file_obj=buffer, orient=orient, lines=lines, **self.to_json_kwargs)
else:
if compression:
Expand Down
5 changes: 4 additions & 1 deletion src/datasets/io/parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
from typing import BinaryIO, Optional, Union

import fsspec
import numpy as np
import pyarrow.parquet as pq

Expand Down Expand Up @@ -112,18 +113,20 @@ def __init__(
dataset: Dataset,
path_or_buf: Union[PathLike, BinaryIO],
batch_size: Optional[int] = None,
storage_options: Optional[dict] = None,
**parquet_writer_kwargs,
):
self.dataset = dataset
self.path_or_buf = path_or_buf
self.batch_size = batch_size or get_writer_batch_size(dataset.features)
self.storage_options = storage_options or {}
self.parquet_writer_kwargs = parquet_writer_kwargs

def write(self) -> int:
batch_size = self.batch_size if self.batch_size else config.DEFAULT_MAX_BATCH_SIZE

if isinstance(self.path_or_buf, (str, bytes, os.PathLike)):
with open(self.path_or_buf, "wb+") as buffer:
with fsspec.open(self.path_or_buf, "wb", **(self.storage_options or {})) as buffer:
written = self._write(file_obj=buffer, batch_size=batch_size, **self.parquet_writer_kwargs)
else:
written = self._write(file_obj=self.path_or_buf, batch_size=batch_size, **self.parquet_writer_kwargs)
Expand Down
11 changes: 11 additions & 0 deletions tests/io/test_csv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import csv
import os

import fsspec
import pytest

from datasets import Dataset, DatasetDict, Features, NamedSplit, Value
Expand Down Expand Up @@ -162,3 +163,13 @@ def test_dataset_to_csv_invalidproc(csv_path, tmp_path):
dataset = CsvDatasetReader({"train": csv_path}, cache_dir=cache_dir).read()
with pytest.raises(ValueError):
CsvDatasetWriter(dataset["train"], output_csv, num_proc=0)


def test_dataset_to_csv_fsspec(dataset, mockfs):
dataset_path = "mock://my_dataset.csv"
writer = CsvDatasetWriter(dataset, dataset_path, storage_options=mockfs.storage_options)
assert writer.write() > 0
assert mockfs.isfile(dataset_path)

with fsspec.open(dataset_path, "rb", **mockfs.storage_options) as f:
assert f.read()
9 changes: 9 additions & 0 deletions tests/io/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,12 @@ def test_dataset_to_json_compression(self, shared_datadir, tmp_path_factory, ext
with fsspec.open(original_path, "rb", compression="infer") as f:
original_content = f.read()
assert exported_content == original_content

def test_dataset_to_json_fsspec(self, dataset, mockfs):
dataset_path = "mock://my_dataset.json"
writer = JsonDatasetWriter(dataset, dataset_path, storage_options=mockfs.storage_options)
assert writer.write() > 0
assert mockfs.isfile(dataset_path)

with fsspec.open(dataset_path, "rb", **mockfs.storage_options) as f:
assert f.read()
11 changes: 11 additions & 0 deletions tests/io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import fsspec
import pyarrow.parquet as pq
import pytest

Expand Down Expand Up @@ -213,3 +214,13 @@ def test_dataset_to_parquet_keeps_features(shared_datadir, tmp_path):
)
def test_get_writer_batch_size(feature, expected):
assert get_writer_batch_size(feature) == expected


def test_dataset_to_parquet_fsspec(dataset, mockfs):
dataset_path = "mock://my_dataset.csv"
writer = ParquetDatasetWriter(dataset, dataset_path, storage_options=mockfs.storage_options)
assert writer.write() > 0
assert mockfs.isfile(dataset_path)

with fsspec.open(dataset_path, "rb", **mockfs.storage_options) as f:
assert f.read()