Skip to content
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
6d89cfb
use fsspec for caching
lhoestq Jul 20, 2022
606a48f
add parquet writer
lhoestq Jul 20, 2022
cdf8dcd
add file_format argument
lhoestq Jul 20, 2022
ad91270
style
lhoestq Jul 20, 2022
742d2a9
use "gs" instead of "gcs" for apache beam + use is_remote_filesystem
lhoestq Jul 20, 2022
aed8ce6
typo
lhoestq Jul 20, 2022
93d5660
fix test
lhoestq Jul 20, 2022
65c2037
test ArrowWriter with filesystem
lhoestq Jul 20, 2022
84d8397
test parquet writer
lhoestq Jul 21, 2022
4c46349
more tests
lhoestq Jul 21, 2022
ee7e3f5
Merge branch 'main' into dl-and-pp-as-parquet
lhoestq Jul 21, 2022
033a3b8
more tests
lhoestq Jul 21, 2022
ce8d7f9
fix nullcontext on 3.6
lhoestq Jul 21, 2022
15dccf9
parquet_writer.write_batch is not available in pyarrow 6
lhoestq Jul 21, 2022
3a3d784
remove reference to open file
lhoestq Jul 21, 2022
3eef46d
fix test
lhoestq Jul 22, 2022
b480549
docs
lhoestq Jul 22, 2022
713f83c
Merge branch 'main' into dl-and-pp-as-parquet
lhoestq Jul 25, 2022
4757930
shard parquet in download_and_prepare
lhoestq Jul 26, 2022
32f5bf8
typing, docs, docstrings
lhoestq Jul 27, 2022
1db12b9
docs: dask from parquet files
lhoestq Jul 27, 2022
874b2a0
Apply suggestions from code review
lhoestq Jul 27, 2022
f6ecb64
use contextlib.nullcontext
lhoestq Jul 27, 2022
b0e4222
Merge branch 'main' into dl-and-pp-as-parquet
lhoestq Jul 27, 2022
e7f3ac4
fix missing import
lhoestq Jul 27, 2022
df0343a
Use unstrip_protocol to merge protocol and path
mariosasko Jul 29, 2022
a0f84f4
remove bad "raise" and add TODOs
lhoestq Jul 29, 2022
56f68bd
Merge branch 'dl-and-pp-as-parquet' into shard-parquet
lhoestq Jul 29, 2022
509ff3f
Merge branch 'main' into dl-and-pp-as-parquet
lhoestq Aug 25, 2022
1b02b66
add output_dir arg to download_and_prepare
lhoestq Aug 25, 2022
2e85216
update tests
lhoestq Aug 25, 2022
ba167db
update docs
lhoestq Aug 25, 2022
fbcde93
Merge branch 'dl-and-pp-as-parquet' into shard-parquet
lhoestq Aug 25, 2022
c9b1ca9
docs
lhoestq Aug 25, 2022
a9379f8
fix tests
lhoestq Aug 25, 2022
6881268
Merge branch 'dl-and-pp-as-parquet' into shard-parquet
lhoestq Aug 25, 2022
ec94a4b
fix tests
lhoestq Aug 25, 2022
67a5b5f
Merge branch 'dl-and-pp-as-parquet' into shard-parquet
lhoestq Aug 25, 2022
f47871a
fix output parent dir creattion
lhoestq Aug 26, 2022
debd8fc
Merge branch 'dl-and-pp-as-parquet' into shard-parquet
lhoestq Aug 26, 2022
460e1a6
Apply suggestions from code review
lhoestq Aug 26, 2022
88daa8a
revert changes for remote cache_dir
lhoestq Aug 26, 2022
fdf7252
fix wording in the docs: load -> download and prepare
lhoestq Aug 26, 2022
22aaf7b
style
lhoestq Aug 26, 2022
c051b31
fix
lhoestq Aug 26, 2022
e0a7742
simplify incomplete_dir
lhoestq Aug 26, 2022
53d46cc
fix tests
lhoestq Aug 29, 2022
606951f
albert's comments
lhoestq Sep 5, 2022
81a34ce
Merge branch 'dl-and-pp-as-parquet' into shard-parquet
lhoestq Sep 5, 2022
bd94afb
set arrow to default
lhoestq Sep 5, 2022
e66ce41
Merge branch 'main' into shard-parquet
lhoestq Sep 5, 2022
0ea79c1
style
lhoestq Sep 5, 2022
0a950fd
add config.MAX_SHARD_SIZE
lhoestq Sep 6, 2022
fbc8fe1
nit
lhoestq Sep 6, 2022
c88a797
style
lhoestq Sep 6, 2022
d59b68f
fix for relative output_dir
lhoestq Sep 6, 2022
211b38b
typo
lhoestq Sep 6, 2022
361e32a
fix test
lhoestq Sep 6, 2022
3e21213
Merge branch 'main' into shard-parquet
lhoestq Sep 9, 2022
aa146ea
fix test
lhoestq Sep 9, 2022
11bd133
fix win tests
lhoestq Sep 12, 2022
a9f3fb8
Merge branch 'main' into shard-parquet
lhoestq Sep 13, 2022
6e480e0
Update src/datasets/builder.py
lhoestq Sep 13, 2022
d335742
Update src/datasets/builder.py
lhoestq Sep 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/source/filesystems.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ Use your own data files (see [how to load local and remote files](./loading#loca
It is highly recommended to save the files as compressed Parquet files to optimize I/O by specifying `file_format="parquet"`.
Otherwise the dataset is saved as an uncompressed Arrow file.

You can also specify the size of the Parquet shard using `max_shard_size` (default is 500MB):

```py
>>> builder.download_and_prepare(output_dir, storage_options=storage_options, file_format="parquet", max_shard_size="1GB")
```

#### Dask

Dask is a parallel computing library and it has a pandas-like API for working with larger than memory Parquet datasets in parallel.
Expand Down
7 changes: 3 additions & 4 deletions src/datasets/arrow_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4052,7 +4052,7 @@ def _push_parquet_shards_to_hub(
private: Optional[bool] = False,
token: Optional[str] = None,
branch: Optional[str] = None,
max_shard_size: Union[int, str] = "500MB",
max_shard_size: Optional[Union[int, str]] = None,
embed_external_files: bool = True,
) -> Tuple[str, str, int, int]:
"""Pushes the dataset to the hub.
Expand Down Expand Up @@ -4098,8 +4098,7 @@ def _push_parquet_shards_to_hub(
>>> dataset.push_to_hub("<organization>/<dataset_id>", split="evaluation")
```
"""

max_shard_size = convert_file_size_to_int(max_shard_size)
max_shard_size = convert_file_size_to_int(max_shard_size or config.MAX_SHARD_SIZE)

api = HfApi(endpoint=config.HF_ENDPOINT)
token = token if token is not None else HfFolder.get_token()
Expand Down Expand Up @@ -4270,7 +4269,7 @@ def push_to_hub(
private: Optional[bool] = False,
token: Optional[str] = None,
branch: Optional[str] = None,
max_shard_size: Union[int, str] = "500MB",
max_shard_size: Optional[Union[int, str]] = None,
shard_size: Optional[int] = "deprecated",
embed_external_files: bool = True,
):
Expand Down
266 changes: 215 additions & 51 deletions src/datasets/builder.py

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/datasets/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@
# For big tables, we write them on disk instead
MAX_TABLE_NBYTES_FOR_PICKLING = 4 << 30

# Max shard size in bytes (e.g. to shard parquet datasets in push_to_hub or download_and_prepare)
MAX_SHARD_SIZE = "500MB"

# Offline mode
HF_DATASETS_OFFLINE = os.environ.get("HF_DATASETS_OFFLINE", "AUTO").upper() in ENV_VARS_TRUE_VALUES

Expand Down
2 changes: 1 addition & 1 deletion src/datasets/dataset_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,7 @@ def push_to_hub(
private: Optional[bool] = False,
token: Optional[str] = None,
branch: Optional[None] = None,
max_shard_size: Union[int, str] = "500MB",
max_shard_size: Optional[Union[int, str]] = None,
shard_size: Optional[int] = "deprecated",
embed_external_files: bool = True,
):
Expand Down
2 changes: 1 addition & 1 deletion src/datasets/utils/py_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def convert_file_size_to_int(size: Union[int, str]) -> int:
if size.upper().endswith("KB"):
int_size = int(size[:-2]) * (10**3)
return int_size // 8 if size.endswith("b") else int_size
raise ValueError("`size` is not in a valid format. Use an integer followed by the unit, e.g., '5GB'.")
raise ValueError(f"`size={size}` is not in a valid format. Use an integer followed by the unit, e.g., '5GB'.")


def string_to_dict(string: str, pattern: str) -> Dict[str, str]:
Expand Down
111 changes: 110 additions & 1 deletion tests/test_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@
from datasets.streaming import xjoin
from datasets.utils.file_utils import is_local_path

from .utils import assert_arrow_memory_doesnt_increase, assert_arrow_memory_increases, require_beam, require_faiss
from .utils import (
assert_arrow_memory_doesnt_increase,
assert_arrow_memory_increases,
require_beam,
require_faiss,
set_current_working_directory_to_temp_dir,
)


class DummyBuilder(DatasetBuilder):
Expand Down Expand Up @@ -757,6 +763,16 @@ def test_beam_based_download_and_prepare(tmp_path):
assert os.path.exists(os.path.join(tmp_path, builder.name, "default", "0.0.0", "dataset_info.json"))


@require_beam
def test_beam_based_as_dataset(tmp_path):
builder = DummyBeamBasedBuilder(cache_dir=tmp_path, beam_runner="DirectRunner")
builder.download_and_prepare()
dataset = builder.as_dataset()
assert dataset
assert isinstance(dataset["train"], Dataset)
assert len(dataset["train"]) > 0


@pytest.mark.parametrize(
"split, expected_dataset_class, expected_dataset_length",
[
Expand Down Expand Up @@ -915,6 +931,27 @@ def test_builder_config_version(builder_class, kwargs, tmp_path):
assert builder.config.version == "2.0.0"


def test_builder_download_and_prepare_with_absolute_output_dir(tmp_path):
builder = DummyGeneratorBasedBuilder()
output_dir = str(tmp_path)
builder.download_and_prepare(output_dir)
assert builder._output_dir.startswith(tmp_path.resolve().as_posix())
assert os.path.exists(os.path.join(output_dir, "dataset_info.json"))
assert os.path.exists(os.path.join(output_dir, f"{builder.name}-train.arrow"))
assert not os.path.exists(os.path.join(output_dir + ".incomplete"))


def test_builder_download_and_prepare_with_relative_output_dir():
with set_current_working_directory_to_temp_dir():
builder = DummyGeneratorBasedBuilder()
output_dir = "test-out"
builder.download_and_prepare(output_dir)
assert Path(builder._output_dir).resolve().as_posix().startswith(Path(output_dir).resolve().as_posix())
assert os.path.exists(os.path.join(output_dir, "dataset_info.json"))
assert os.path.exists(os.path.join(output_dir, f"{builder.name}-train.arrow"))
assert not os.path.exists(os.path.join(output_dir + ".incomplete"))


def test_builder_with_filesystem_download_and_prepare(tmp_path, mockfs):
builder = DummyGeneratorBasedBuilder(cache_dir=tmp_path)
builder.download_and_prepare("mock://my_dataset", storage_options=mockfs.storage_options)
Expand Down Expand Up @@ -948,6 +985,43 @@ def test_generator_based_builder_download_and_prepare_as_parquet(tmp_path):
assert pq.ParquetFile(parquet_path) is not None


def test_generator_based_builder_download_and_prepare_as_sharded_parquet(tmp_path):
writer_batch_size = 25
builder = DummyGeneratorBasedBuilder(cache_dir=tmp_path, writer_batch_size=writer_batch_size)
with patch("datasets.config.MAX_SHARD_SIZE", 1): # one batch per shard
builder.download_and_prepare(file_format="parquet")
expected_num_shards = 100 // writer_batch_size
assert builder.info.splits["train"].num_examples, 100
parquet_path = os.path.join(
tmp_path, builder.name, "default", "0.0.0", f"{builder.name}-train-00000-of-{expected_num_shards:05d}.parquet"
)
assert os.path.exists(parquet_path)
parquet_files = [
pq.ParquetFile(parquet_path)
for parquet_path in Path(tmp_path).rglob(f"{builder.name}-train-*-of-{expected_num_shards:05d}.parquet")
]
assert len(parquet_files) == expected_num_shards
assert sum(parquet_file.metadata.num_rows for parquet_file in parquet_files) == 100


def test_generator_based_builder_download_and_prepare_as_sharded_parquet_with_max_shard_size(tmp_path):
writer_batch_size = 25
builder = DummyGeneratorBasedBuilder(cache_dir=tmp_path, writer_batch_size=writer_batch_size)
builder.download_and_prepare(file_format="parquet", max_shard_size=1) # one batch per shard
expected_num_shards = 100 // writer_batch_size
assert builder.info.splits["train"].num_examples, 100
parquet_path = os.path.join(
tmp_path, builder.name, "default", "0.0.0", f"{builder.name}-train-00000-of-{expected_num_shards:05d}.parquet"
)
assert os.path.exists(parquet_path)
parquet_files = [
pq.ParquetFile(parquet_path)
for parquet_path in Path(tmp_path).rglob(f"{builder.name}-train-*-of-{expected_num_shards:05d}.parquet")
]
assert len(parquet_files) == expected_num_shards
assert sum(parquet_file.metadata.num_rows for parquet_file in parquet_files) == 100


def test_arrow_based_builder_download_and_prepare_as_parquet(tmp_path):
builder = DummyArrowBasedBuilder(cache_dir=tmp_path)
builder.download_and_prepare(file_format="parquet")
Expand All @@ -959,6 +1033,41 @@ def test_arrow_based_builder_download_and_prepare_as_parquet(tmp_path):
assert pq.ParquetFile(parquet_path) is not None


def test_arrow_based_builder_download_and_prepare_as_sharded_parquet(tmp_path):
builder = DummyArrowBasedBuilder(cache_dir=tmp_path)
with patch("datasets.config.MAX_SHARD_SIZE", 1): # one batch per shard
builder.download_and_prepare(file_format="parquet")
expected_num_shards = 10
assert builder.info.splits["train"].num_examples, 100
parquet_path = os.path.join(
tmp_path, builder.name, "default", "0.0.0", f"{builder.name}-train-00000-of-{expected_num_shards:05d}.parquet"
)
assert os.path.exists(parquet_path)
parquet_files = [
pq.ParquetFile(parquet_path)
for parquet_path in Path(tmp_path).rglob(f"{builder.name}-train-*-of-{expected_num_shards:05d}.parquet")
]
assert len(parquet_files) == expected_num_shards
assert sum(parquet_file.metadata.num_rows for parquet_file in parquet_files) == 100


def test_arrow_based_builder_download_and_prepare_as_sharded_parquet_with_max_shard_size(tmp_path):
builder = DummyArrowBasedBuilder(cache_dir=tmp_path)
builder.download_and_prepare(file_format="parquet", max_shard_size=1) # one table per shard
expected_num_shards = 10
assert builder.info.splits["train"].num_examples, 100
parquet_path = os.path.join(
tmp_path, builder.name, "default", "0.0.0", f"{builder.name}-train-00000-of-{expected_num_shards:05d}.parquet"
)
assert os.path.exists(parquet_path)
parquet_files = [
pq.ParquetFile(parquet_path)
for parquet_path in Path(tmp_path).rglob(f"{builder.name}-train-*-of-{expected_num_shards:05d}.parquet")
]
assert len(parquet_files) == expected_num_shards
assert sum(parquet_file.metadata.num_rows for parquet_file in parquet_files) == 100


def test_beam_based_builder_download_and_prepare_as_parquet(tmp_path):
builder = DummyBeamBasedBuilder(cache_dir=tmp_path, beam_runner="DirectRunner")
builder.download_and_prepare(file_format="parquet")
Expand Down
29 changes: 29 additions & 0 deletions tests/test_upstream_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,35 @@ def test_push_dataset_dict_to_hub_multiple_files(self, temporary_repo):

local_ds = DatasetDict({"train": ds})

with temporary_repo(f"{CI_HUB_USER}/test-{int(time.time() * 10e3)}") as ds_name:
with patch("datasets.config.MAX_SHARD_SIZE", "16KB"):
local_ds.push_to_hub(ds_name, token=self._token)
hub_ds = load_dataset(ds_name, download_mode="force_redownload")

assert local_ds.column_names == hub_ds.column_names
assert list(local_ds["train"].features.keys()) == list(hub_ds["train"].features.keys())
assert local_ds["train"].features == hub_ds["train"].features

# Ensure that there are two files on the repository that have the correct name
files = sorted(self._api.list_repo_files(ds_name, repo_type="dataset", token=self._token))
assert all(
fnmatch.fnmatch(file, expected_file)
for file, expected_file in zip(
files,
[
".gitattributes",
"data/train-00000-of-00002-*.parquet",
"data/train-00001-of-00002-*.parquet",
"dataset_infos.json",
],
)
)

def test_push_dataset_dict_to_hub_multiple_files_with_max_shard_size(self, temporary_repo):
ds = Dataset.from_dict({"x": list(range(1000)), "y": list(range(1000))})

local_ds = DatasetDict({"train": ds})

with temporary_repo(f"{CI_HUB_USER}/test-{int(time.time() * 10e3)}") as ds_name:
local_ds.push_to_hub(ds_name, token=self._token, max_shard_size="16KB")
hub_ds = load_dataset(ds_name, download_mode="force_redownload")
Expand Down