From a5d853ef2ecafbdb3af078c1dfe5e89c45e486d8 Mon Sep 17 00:00:00 2001 From: Quentin Lhoest Date: Tue, 10 Jun 2025 16:34:49 +0200 Subject: [PATCH 1/3] parallel push_to_hub --- src/datasets/arrow_dataset.py | 168 +++++++++++++++++++++++----------- 1 file changed, 116 insertions(+), 52 deletions(-) diff --git a/src/datasets/arrow_dataset.py b/src/datasets/arrow_dataset.py index b1d4f0c9230..29eeb5ee322 100644 --- a/src/datasets/arrow_dataset.py +++ b/src/datasets/arrow_dataset.py @@ -5397,17 +5397,76 @@ def to_iterable_dataset(self, num_shards: Optional[int] = 1) -> "IterableDataset ds = ds.with_format(self._format_type) return ds + def _push_parquet_shards_to_hub_single( + self, + job_id: int, + num_jobs: int, + repo_id: str, + data_dir: str, + split: str, + token: Optional[str], + revision: Optional[str], + create_pr: Optional[bool], + num_shards: int, + embed_external_files: bool, + ): + div = num_shards // num_jobs + mod = num_shards % num_jobs + start = div * job_id + min(job_id, mod) + end = start + div + (1 if job_id < mod else 0) + + index_shards = ( + (start + i, self.shard(num_shards=end - start, index=i, contiguous=True)) for i in range(end - start) + ) + + api = HfApi(endpoint=config.HF_ENDPOINT, token=token) + + uploaded_size = 0 + additions: list[CommitOperationAdd] = [] + for index, shard in index_shards: + if embed_external_files: + from .io.parquet import get_writer_batch_size + + format = shard.format + shard = shard.with_format("arrow") + shard = shard.map( + embed_table_storage, + batched=True, + batch_size=get_writer_batch_size(shard.features), + keep_in_memory=True, + ) + shard = shard.with_format(**format) + shard_path_in_repo = f"{data_dir}/{split}-{index:05d}-of-{num_shards:05d}.parquet" + buffer = BytesIO() + shard.to_parquet(buffer) + parquet_content = buffer.getvalue() + uploaded_size += len(parquet_content) + del buffer + shard_addition = CommitOperationAdd(path_in_repo=shard_path_in_repo, path_or_fileobj=parquet_content) + api.preupload_lfs_files( + repo_id=repo_id, + additions=[shard_addition], + repo_type="dataset", + revision=revision, + create_pr=create_pr, + ) + additions.append(shard_addition) + yield job_id, False, 1 + + yield job_id, True, additions + def _push_parquet_shards_to_hub( self, repo_id: str, - data_dir: str = "data", - split: Optional[str] = None, - token: Optional[str] = None, - revision: Optional[str] = None, - create_pr: Optional[bool] = False, - max_shard_size: Optional[Union[int, str]] = None, - num_shards: Optional[int] = None, - embed_external_files: bool = True, + data_dir: str, + split: str, + token: Optional[str], + revision: Optional[str], + create_pr: Optional[bool], + max_shard_size: Optional[Union[int, str]], + num_shards: Optional[int], + embed_external_files: bool, + num_proc: Optional[int], ) -> tuple[list[CommitOperationAdd], int, int]: """Pushes the dataset shards as Parquet files to the hub. @@ -5416,6 +5475,8 @@ def _push_parquet_shards_to_hub( uploaded_size (`int`): number of uploaded bytes to the repository dataset_nbytes (`int`): approximate size in bytes of the uploaded dataset after uncompression """ + dataset_nbytes = self._estimate_nbytes() + # Find decodable columns, because if there are any, we need to: # embed the bytes from the files in the shards decodable_columns = ( @@ -5423,59 +5484,54 @@ def _push_parquet_shards_to_hub( if embed_external_files else [] ) - - dataset_nbytes = self._estimate_nbytes() + embed_external_files = embed_external_files and bool(decodable_columns) if num_shards is None: max_shard_size = convert_file_size_to_int(max_shard_size or config.MAX_SHARD_SIZE) num_shards = int(dataset_nbytes / max_shard_size) + 1 - num_shards = max(num_shards, 1) - - shards = (self.shard(num_shards=num_shards, index=i, contiguous=True) for i in range(num_shards)) - - if decodable_columns: - from .io.parquet import get_writer_batch_size - - def shards_with_embedded_external_files(shards: Iterator[Dataset]) -> Iterator[Dataset]: - for shard in shards: - format = shard.format - shard = shard.with_format("arrow") - shard = shard.map( - embed_table_storage, - batched=True, - batch_size=get_writer_batch_size(shard.features), - keep_in_memory=True, - ) - shard = shard.with_format(**format) - yield shard - - shards = shards_with_embedded_external_files(shards) - - api = HfApi(endpoint=config.HF_ENDPOINT, token=token) + num_shards = max(num_shards, num_proc or 1) - uploaded_size = 0 additions: list[CommitOperationAdd] = [] - for index, shard in hf_tqdm( - enumerate(shards), - desc="Uploading the dataset shards", + + num_jobs = num_proc or 1 + kwargs_iterable = [ + { + "self": self.shard(num_shards=num_jobs, index=job_id, contiguous=True), + "job_id": job_id, + "num_jobs": num_jobs, + "repo_id": repo_id, + "data_dir": data_dir, + "split": split, + "token": token, + "revision": revision, + "create_pr": create_pr, + "num_shards": num_shards, + "embed_external_files": embed_external_files, + } + for job_id in range(num_jobs) + ] + pbar = hf_tqdm( + unit=" shards", total=num_shards, - ): - shard_path_in_repo = f"{data_dir}/{split}-{index:05d}-of-{num_shards:05d}.parquet" - buffer = BytesIO() - shard.to_parquet(buffer) - parquet_content = buffer.getvalue() - uploaded_size += len(parquet_content) - del buffer - shard_addition = CommitOperationAdd(path_in_repo=shard_path_in_repo, path_or_fileobj=parquet_content) - api.preupload_lfs_files( - repo_id=repo_id, - additions=[shard_addition], - repo_type="dataset", - revision=revision, - create_pr=create_pr, + desc="Uploading the dataset shards", + ) + with contextlib.nullcontext() if num_proc is None else Pool(num_proc) as pool: + update_stream = ( + Dataset._push_parquet_shards_to_hub_single(**kwargs_iterable[0]) + if pool is None + else iflatmap_unordered( + pool, + Dataset._push_parquet_shards_to_hub_single, + kwargs_iterable=kwargs_iterable, + ) ) - additions.append(shard_addition) + for job_id, done, content in update_stream: + if not done: + pbar.update(content) + else: + additions += content + uploaded_size = sum(addition.upload_info.size for addition in additions) return additions, uploaded_size, dataset_nbytes def push_to_hub( @@ -5494,6 +5550,7 @@ def push_to_hub( max_shard_size: Optional[Union[int, str]] = None, num_shards: Optional[int] = None, embed_external_files: bool = True, + num_proc: Optional[int] = None, ) -> CommitInfo: """Pushes the dataset to the hub as a Parquet dataset. The dataset is pushed using HTTP requests and does not need to have neither git or git-lfs installed. @@ -5553,6 +5610,12 @@ def push_to_hub( In particular, this will do the following before the push for the fields of type: - [`Audio`] and [`Image`]: remove local path information and embed file content in the Parquet files. + num_proc (`int`, *optional*, defaults to `None`): + Number of processes when preparing and uploading the dataset. + This is helpful if the dataset is made of many samples or media files to embed. + Multiprocessing is disabled by default. + + Return: huggingface_hub.CommitInfo @@ -5636,6 +5699,7 @@ def push_to_hub( num_shards=num_shards, create_pr=create_pr, embed_external_files=embed_external_files, + num_proc=num_proc, ) # Check if the repo already has a README.md and/or a dataset_infos.json to update them with the new split info (size and pattern) From 235147b150fa831bead31bf741d95ef952625553 Mon Sep 17 00:00:00 2001 From: Quentin Lhoest Date: Tue, 10 Jun 2025 16:37:08 +0200 Subject: [PATCH 2/3] minor --- src/datasets/arrow_dataset.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/datasets/arrow_dataset.py b/src/datasets/arrow_dataset.py index 29eeb5ee322..58b444e89bc 100644 --- a/src/datasets/arrow_dataset.py +++ b/src/datasets/arrow_dataset.py @@ -5510,12 +5510,14 @@ def _push_parquet_shards_to_hub( } for job_id in range(num_jobs) ] + desc = "Uploading the dataset shards" + desc += f" (num_proc={num_proc})" if num_proc is not None and num_proc > 1 else "" pbar = hf_tqdm( unit=" shards", total=num_shards, - desc="Uploading the dataset shards", + desc=desc, ) - with contextlib.nullcontext() if num_proc is None else Pool(num_proc) as pool: + with contextlib.nullcontext() if num_proc is None and num_proc > 1 else Pool(num_proc) as pool: update_stream = ( Dataset._push_parquet_shards_to_hub_single(**kwargs_iterable[0]) if pool is None From 8ee018f0093f1edec2b30d96c1f12369b8f45048 Mon Sep 17 00:00:00 2001 From: Quentin Lhoest Date: Tue, 10 Jun 2025 18:06:56 +0200 Subject: [PATCH 3/3] num_proc in IterableDataset.push_to_hub --- src/datasets/iterable_dataset.py | 169 ++++++++++++++++++++++++------- 1 file changed, 132 insertions(+), 37 deletions(-) diff --git a/src/datasets/iterable_dataset.py b/src/datasets/iterable_dataset.py index 663e1ec19b0..bcdf5458f69 100644 --- a/src/datasets/iterable_dataset.py +++ b/src/datasets/iterable_dataset.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import copy import fnmatch import inspect @@ -25,6 +26,7 @@ import pyarrow.parquet as pq from huggingface_hub import CommitInfo, CommitOperationAdd, CommitOperationDelete, DatasetCard, DatasetCardData, HfApi from huggingface_hub.hf_api import RepoFile +from multiprocess import Pool from . import config from .arrow_dataset import PUSH_TO_HUB_WITHOUT_METADATA_CONFIGS_SPLIT_PATTERN_SHARDED, Dataset, DatasetInfoMixin @@ -54,7 +56,7 @@ from .utils import tqdm as hf_tqdm from .utils.logging import get_logger from .utils.metadata import MetadataConfigs -from .utils.py_utils import Literal, asdict, glob_pattern_to_regex, string_to_dict +from .utils.py_utils import Literal, asdict, glob_pattern_to_regex, iflatmap_unordered, string_to_dict from .utils.sharding import _merge_gen_kwargs, _number_of_shards_in_gen_kwargs, _shuffle_gen_kwargs, _split_gen_kwargs from .utils.typing import PathLike @@ -3706,17 +3708,19 @@ def to_parquet( path_or_buf, batch_size=batch_size, storage_options=storage_options, **parquet_writer_kwargs ) - def _push_parquet_shards_to_hub( + def _push_parquet_shards_to_hub_single( self, + job_id: int, + num_jobs: int, repo_id: str, - data_dir: str = "data", - split: Optional[str] = None, - token: Optional[str] = None, - revision: Optional[str] = None, - create_pr: Optional[bool] = False, + data_dir: str, + split: str, + token: Optional[str], + revision: Optional[str], + create_pr: Optional[bool], # max_shard_size: Optional[Union[int, str]] = None, # TODO(QL): add arg - num_shards: Optional[int] = None, - embed_external_files: bool = True, + num_shards: int, + embed_external_files: bool, ) -> tuple[list[CommitOperationAdd], int, int]: """Pushes the dataset shards as Parquet files to the hub. @@ -3725,31 +3729,15 @@ def _push_parquet_shards_to_hub( uploaded_size (`int`): number of uploaded bytes to the repository dataset_nbytes (`int`): approximate size in bytes of the uploaded dataset after uncompression """ - # Find decodable columns, because if there are any, we need to: - # embed the bytes from the files in the shards - decodable_columns = ( - [k for k, v in self._info.features.items() if require_decoding(v, ignore_decode_attribute=True)] - if embed_external_files - else [] - ) - num_shards = self.num_shards - shards = (self.shard(num_shards=num_shards, index=i, contiguous=True) for i in range(num_shards)) + div = num_shards // num_jobs + mod = num_shards % num_jobs + start = div * job_id + min(job_id, mod) + end = start + div + (1 if job_id < mod else 0) - if decodable_columns: - from .io.parquet import get_writer_batch_size - - def shards_with_embedded_external_files(shards: Iterator[IterableDataset]) -> Iterator[IterableDataset]: - for shard in shards: - shard = shard.with_format("arrow") - shard = shard.map( - partial(embed_table_storage, token_per_repo_id=self._token_per_repo_id), - batched=True, - batch_size=get_writer_batch_size(shard.features), - ) - yield shard - - shards = shards_with_embedded_external_files(shards) + index_shards = ( + (start + i, self.shard(num_shards=end - start, index=i, contiguous=True)) for i in range(end - start) + ) api = HfApi(endpoint=config.HF_ENDPOINT, token=token) @@ -3757,11 +3745,16 @@ def shards_with_embedded_external_files(shards: Iterator[IterableDataset]) -> It dataset_nbytes = 0 num_examples = 0 additions: list[CommitOperationAdd] = [] - for index, shard in hf_tqdm( - enumerate(shards), - desc="Uploading the dataset shards", - total=num_shards, - ): + for index, shard in index_shards: + if embed_external_files: + from .io.parquet import get_writer_batch_size + + shard = shard.with_format("arrow") + shard = shard.map( + partial(embed_table_storage, token_per_repo_id=self._token_per_repo_id), + batched=True, + batch_size=get_writer_batch_size(shard.features), + ) shard_path_in_repo = f"{data_dir}/{split}-{index:05d}-of-{num_shards:05d}.parquet" buffer = BytesIO() shard.to_parquet(buffer) @@ -3782,7 +3775,91 @@ def shards_with_embedded_external_files(shards: Iterator[IterableDataset]) -> It create_pr=create_pr, ) additions.append(shard_addition) + yield job_id, False, 1 + + yield job_id, True, (additions, dataset_nbytes, num_examples) + + def _push_parquet_shards_to_hub( + self, + repo_id: str, + data_dir: str, + split: str, + token: Optional[str], + revision: Optional[str], + create_pr: Optional[bool], + # max_shard_size: Optional[Union[int, str]], # TODO(QL): add arg + num_shards: Optional[int], + embed_external_files: bool, + num_proc: Optional[int], + ) -> tuple[list[CommitOperationAdd], int, int]: + """Pushes the dataset shards as Parquet files to the hub. + Returns: + additions (`List[CommitOperation]`): list of the `CommitOperationAdd` of the uploaded shards + uploaded_size (`int`): number of uploaded bytes to the repository + dataset_nbytes (`int`): approximate size in bytes of the uploaded dataset after uncompression + num_examples (`int`): number of examples of the uploaded dataset + """ + + # Find decodable columns, because if there are any, we need to: + # embed the bytes from the files in the shards + decodable_columns = ( + [k for k, v in self._info.features.items() if require_decoding(v, ignore_decode_attribute=True)] + if embed_external_files + else [] + ) + embed_external_files = embed_external_files and bool(decodable_columns) + + if num_shards is None: + # TODO(QL): this can depend on max_shard_size later + num_shards = self.num_shards + + additions: list[CommitOperationAdd] = [] + dataset_nbytes = num_examples = 0 + + num_jobs = num_proc or 1 + kwargs_iterable = [ + { + "self": self.shard(num_shards=num_jobs, index=job_id, contiguous=True), + "job_id": job_id, + "num_jobs": num_jobs, + "repo_id": repo_id, + "data_dir": data_dir, + "split": split, + "token": token, + "revision": revision, + "create_pr": create_pr, + "num_shards": num_shards, + "embed_external_files": embed_external_files, + } + for job_id in range(num_jobs) + ] + desc = "Uploading the dataset shards" + desc += f" (num_proc={num_proc})" if num_proc is not None and num_proc > 1 else "" + pbar = hf_tqdm( + unit=" shards", + total=num_shards, + desc=desc, + ) + with contextlib.nullcontext() if num_proc is None and num_proc > 1 else Pool(num_proc) as pool: + update_stream = ( + IterableDataset._push_parquet_shards_to_hub_single(**kwargs_iterable[0]) + if pool is None + else iflatmap_unordered( + pool, + IterableDataset._push_parquet_shards_to_hub_single, + kwargs_iterable=kwargs_iterable, + ) + ) + for job_id, done, content in update_stream: + if not done: + pbar.update(content) + else: + additions += content[0] + dataset_nbytes += content[1] + num_examples += content[2] + + uploaded_size = sum(addition.upload_info.size for addition in additions) return additions, uploaded_size, dataset_nbytes, num_examples def push_to_hub( @@ -3801,6 +3878,7 @@ def push_to_hub( # max_shard_size: Optional[Union[int, str]] = None, # TODO(QL): add arg num_shards: Optional[int] = None, embed_external_files: bool = True, + num_proc: Optional[int] = None, ) -> CommitInfo: """Pushes the dataset to the hub as a Parquet dataset. The dataset is pushed using HTTP requests and does not need to have neither git or git-lfs installed. @@ -3847,6 +3925,10 @@ def push_to_hub( In particular, this will do the following before the push for the fields of type: - [`Audio`] and [`Image`]: remove local path information and embed file content in the Parquet files. + num_proc (`int`, *optional*, defaults to `None`): + Number of processes when preparing and uploading the dataset. + This is helpful if the dataset is made of many samples and transformations. + Multiprocessing is disabled by default. Return: huggingface_hub.CommitInfo @@ -3887,6 +3969,18 @@ def push_to_hub( "file containing other information like video captions, features or labels. More information " "at https://huggingface.co/docs/datasets/main/en/video_load#videofolder" ) + if num_proc is not None and num_proc > self.num_shards: + logger.warning( + f"Too many num_proc: {num_proc} (max is dataset.num_shards={self.num_shards}). " + f"Stopping {num_proc - self.num_shards} processes." + ) + logger.info( + f"To parallelize data loading, we give each process some shards (or data sources) to process. " + f"Therefore it's unnecessary to have a number of processes greater than dataset.num_shards={self.num_shards}. " + f"To enable more parallelism, please split the dataset in more files than {self.num_shards}." + ) + num_proc = self.num_shards + if config_name == "data": raise ValueError("`config_name` cannot be 'data'. Please, choose another name for configuration.") @@ -3929,6 +4023,7 @@ def push_to_hub( num_shards=num_shards, create_pr=create_pr, embed_external_files=embed_external_files, + num_proc=num_proc, ) # Check if the repo already has a README.md and/or a dataset_infos.json to update them with the new split info (size and pattern)