diff --git a/docs/source/beam.mdx b/docs/source/beam.mdx index 17eb5d1e8c9..90ef38bcf3c 100644 --- a/docs/source/beam.mdx +++ b/docs/source/beam.mdx @@ -1,5 +1,11 @@ # Beam Datasets + + +The Beam API is deprecated and will be removed in the next major release. + + + Some datasets are too large to be processed on a single machine. Instead, you can process them with [Apache Beam](https://beam.apache.org/), a library for parallel data processing. The processing pipeline is executed on a distributed processing backend such as [Apache Flink](https://flink.apache.org/), [Apache Spark](https://spark.apache.org/), or [Google Cloud Dataflow](https://cloud.google.com/dataflow). We have already created Beam pipelines for some of the larger datasets like [wikipedia](https://huggingface.co/datasets/wikipedia), and [wiki40b](https://huggingface.co/datasets/wiki40b). You can load these normally with [`load_dataset`]. But if you want to run your own Beam pipeline with Dataflow, here is how: diff --git a/src/datasets/arrow_reader.py b/src/datasets/arrow_reader.py index 9ac14e28ce6..f3b830a6596 100644 --- a/src/datasets/arrow_reader.py +++ b/src/datasets/arrow_reader.py @@ -34,6 +34,7 @@ from .table import InMemoryTable, MemoryMappedTable, Table, concat_tables from .utils import logging from .utils import tqdm as hf_tqdm +from .utils.deprecation_utils import deprecated from .utils.file_utils import cached_path @@ -284,6 +285,7 @@ def read_files( dataset_kwargs = {"arrow_table": pa_table, "info": self._info, "split": split} return dataset_kwargs + @deprecated() def download_from_hf_gcs(self, download_config: DownloadConfig, relative_data_dir): """ Download the dataset files from the Hf GCS diff --git a/src/datasets/builder.py b/src/datasets/builder.py index 8f4bc1962b6..1389aa7a20e 100644 --- a/src/datasets/builder.py +++ b/src/datasets/builder.py @@ -70,6 +70,7 @@ from .utils import logging from .utils import tqdm as hf_tqdm from .utils._filelock import FileLock +from .utils.deprecation_utils import deprecated from .utils.file_utils import cached_path, is_remote_url from .utils.info_utils import VerificationMode, get_size_checksum_dict, verify_checksums, verify_splits from .utils.py_utils import ( @@ -748,7 +749,7 @@ def download_and_prepare( download_mode: Optional[Union[DownloadMode, str]] = None, verification_mode: Optional[Union[VerificationMode, str]] = None, ignore_verifications="deprecated", - try_from_hf_gcs: bool = True, + try_from_hf_gcs="deprecated", dl_manager: Optional[DownloadManager] = None, base_path: Optional[str] = None, use_auth_token="deprecated", @@ -785,6 +786,13 @@ def download_and_prepare( try_from_hf_gcs (`bool`): If `True`, it will try to download the already prepared dataset from the HF Google cloud storage. + + + + `try_from_hf_gcs` was deprecated in version 2.16.0 and will be removed in 3.0.0. + Host the processed files on the Hugging Face Hub instead. + + dl_manager (`DownloadManager`, *optional*): Specific `DownloadManger` to use. base_path (`str`, *optional*): @@ -865,6 +873,14 @@ def download_and_prepare( else: token = self.token + if try_from_hf_gcs != "deprecated": + warnings.warn( + "'try_from_hf_gcs' was deprecated in version 2.16.0 and will be removed in 3.0.0.", + FutureWarning, + ) + else: + try_from_hf_gcs = False + output_dir = output_dir if output_dir is not None else self._cache_dir # output_dir can be a remote bucket on GCS or S3 (when using BeamBasedBuilder for distributed data processing) fs, _, [output_dir] = fsspec.get_fs_token_paths(output_dir, storage_options=storage_options) @@ -2025,6 +2041,7 @@ class MissingBeamOptions(ValueError): pass +@deprecated("Use `GeneratorBasedBuilder` or `ArrowBasedBuilder` instead.") class BeamBasedBuilder(DatasetBuilder): """Beam-based Builder.""" diff --git a/src/datasets/commands/run_beam.py b/src/datasets/commands/run_beam.py index 3843a5568f2..1d59e464344 100644 --- a/src/datasets/commands/run_beam.py +++ b/src/datasets/commands/run_beam.py @@ -10,6 +10,7 @@ from datasets.download.download_config import DownloadConfig from datasets.download.download_manager import DownloadMode from datasets.load import dataset_module_factory, import_main_class +from datasets.utils.deprecation_utils import deprecated from datasets.utils.info_utils import VerificationMode @@ -28,6 +29,9 @@ def run_beam_command_factory(args, **kwargs): ) +@deprecated( + "`BeamBasedBuilder` and `datasets-cli run_beam` are deprecated and will be removed in v3.0.0. Please use `GeneratorBasedBuilder` or `ArrowBasedBuilder` instead." +) class RunBeamCommand(BaseDatasetsCLICommand): @staticmethod def register_subcommand(parser: ArgumentParser): @@ -135,7 +139,6 @@ def run(self): verification_mode=VerificationMode.NO_CHECKS if self._ignore_verifications else VerificationMode.ALL_CHECKS, - try_from_hf_gcs=False, ) if self._save_infos: builder._save_infos() diff --git a/src/datasets/io/csv.py b/src/datasets/io/csv.py index 3aa11303e91..4ac2ea1135b 100644 --- a/src/datasets/io/csv.py +++ b/src/datasets/io/csv.py @@ -57,7 +57,6 @@ def read(self): download_config=download_config, download_mode=download_mode, verification_mode=verification_mode, - # try_from_hf_gcs=try_from_hf_gcs, base_path=base_path, num_proc=self.num_proc, ) diff --git a/src/datasets/io/generator.py b/src/datasets/io/generator.py index 3cb461769c5..2566d5fcdcc 100644 --- a/src/datasets/io/generator.py +++ b/src/datasets/io/generator.py @@ -48,7 +48,6 @@ def read(self): download_config=download_config, download_mode=download_mode, verification_mode=verification_mode, - try_from_hf_gcs=False, base_path=base_path, num_proc=self.num_proc, ) diff --git a/src/datasets/io/parquet.py b/src/datasets/io/parquet.py index 2a04285259f..51434106fb1 100644 --- a/src/datasets/io/parquet.py +++ b/src/datasets/io/parquet.py @@ -97,7 +97,6 @@ def read(self): download_config=download_config, download_mode=download_mode, verification_mode=verification_mode, - # try_from_hf_gcs=try_from_hf_gcs, base_path=base_path, num_proc=self.num_proc, ) diff --git a/src/datasets/io/sql.py b/src/datasets/io/sql.py index ceb425447c2..2331e3e6407 100644 --- a/src/datasets/io/sql.py +++ b/src/datasets/io/sql.py @@ -43,7 +43,6 @@ def read(self): download_config=download_config, download_mode=download_mode, verification_mode=verification_mode, - # try_from_hf_gcs=try_from_hf_gcs, base_path=base_path, ) diff --git a/src/datasets/io/text.py b/src/datasets/io/text.py index 42aa62b0658..58963f3c7ab 100644 --- a/src/datasets/io/text.py +++ b/src/datasets/io/text.py @@ -51,7 +51,6 @@ def read(self): download_config=download_config, download_mode=download_mode, verification_mode=verification_mode, - # try_from_hf_gcs=try_from_hf_gcs, base_path=base_path, num_proc=self.num_proc, ) diff --git a/src/datasets/load.py b/src/datasets/load.py index 55c91f3e1df..0245d55c323 100644 --- a/src/datasets/load.py +++ b/src/datasets/load.py @@ -2594,16 +2594,11 @@ def load_dataset( if streaming: return builder_instance.as_streaming_dataset(split=split) - # Some datasets are already processed on the HF google storage - # Don't try downloading from Google storage for the packaged datasets as text, json, csv or pandas - try_from_hf_gcs = path not in _PACKAGED_DATASETS_MODULES - # Download and prepare data builder_instance.download_and_prepare( download_config=download_config, download_mode=download_mode, verification_mode=verification_mode, - try_from_hf_gcs=try_from_hf_gcs, num_proc=num_proc, storage_options=storage_options, ) diff --git a/tests/test_builder.py b/tests/test_builder.py index 3722a728147..81966044fc3 100644 --- a/tests/test_builder.py +++ b/tests/test_builder.py @@ -243,7 +243,7 @@ def _generate_examples(self, filepaths, dummy_kwarg_with_different_length): def _run_concurrent_download_and_prepare(tmp_dir): builder = DummyBuilder(cache_dir=tmp_dir) - builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.REUSE_DATASET_IF_EXISTS) + builder.download_and_prepare(download_mode=DownloadMode.REUSE_DATASET_IF_EXISTS) return builder @@ -257,7 +257,7 @@ class BuilderTest(TestCase): def test_download_and_prepare(self): with tempfile.TemporaryDirectory() as tmp_dir: builder = DummyBuilder(cache_dir=tmp_dir) - builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD) + builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD) self.assertTrue( os.path.exists( os.path.join( @@ -274,15 +274,12 @@ def test_download_and_prepare(self): def test_download_and_prepare_checksum_computation(self): with tempfile.TemporaryDirectory() as tmp_dir: builder_no_verification = DummyBuilder(cache_dir=tmp_dir) - builder_no_verification.download_and_prepare( - try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD - ) + builder_no_verification.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD) self.assertTrue( all(v["checksum"] is not None for _, v in builder_no_verification.info.download_checksums.items()) ) builder_with_verification = DummyBuilder(cache_dir=tmp_dir) builder_with_verification.download_and_prepare( - try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD, verification_mode=VerificationMode.ALL_CHECKS, ) @@ -326,22 +323,16 @@ def test_download_and_prepare_with_base_path(self): # test relative path is missing builder = DummyBuilderWithDownload(cache_dir=tmp_dir, rel_path=rel_path) with self.assertRaises(FileNotFoundError): - builder.download_and_prepare( - try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir - ) + builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir) # test absolute path is missing builder = DummyBuilderWithDownload(cache_dir=tmp_dir, abs_path=abs_path) with self.assertRaises(FileNotFoundError): - builder.download_and_prepare( - try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir - ) + builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir) # test that they are both properly loaded when they exist open(os.path.join(tmp_dir, rel_path), "w") open(abs_path, "w") builder = DummyBuilderWithDownload(cache_dir=tmp_dir, rel_path=rel_path, abs_path=abs_path) - builder.download_and_prepare( - try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir - ) + builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD, base_path=tmp_dir) self.assertTrue( os.path.exists( os.path.join( @@ -580,7 +571,7 @@ def _post_processing_resources(self, split): ) builder._post_process = types.MethodType(_post_process, builder) builder._post_processing_resources = types.MethodType(_post_processing_resources, builder) - builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD) + builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD) self.assertTrue( os.path.exists( os.path.join( @@ -604,7 +595,7 @@ def _post_process(self, dataset, resources_paths): with tempfile.TemporaryDirectory() as tmp_dir: builder = DummyBuilder(cache_dir=tmp_dir) builder._post_process = types.MethodType(_post_process, builder) - builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD) + builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD) self.assertTrue( os.path.exists( os.path.join( @@ -637,7 +628,7 @@ def _post_processing_resources(self, split): builder = DummyBuilder(cache_dir=tmp_dir) builder._post_process = types.MethodType(_post_process, builder) builder._post_processing_resources = types.MethodType(_post_processing_resources, builder) - builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD) + builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD) self.assertTrue( os.path.exists( os.path.join( @@ -662,7 +653,6 @@ def _prepare_split(self, split_generator, **kwargs): self.assertRaises( ValueError, builder.download_and_prepare, - try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD, ) self.assertRaises(FileNotFoundError, builder.as_dataset) @@ -670,7 +660,7 @@ def _prepare_split(self, split_generator, **kwargs): def test_generator_based_download_and_prepare(self): with tempfile.TemporaryDirectory() as tmp_dir: builder = DummyGeneratorBasedBuilder(cache_dir=tmp_dir) - builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD) + builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD) self.assertTrue( os.path.exists( os.path.join( @@ -940,7 +930,7 @@ def test_generator_based_builder_as_dataset(in_memory, tmp_path): cache_dir.mkdir() cache_dir = str(cache_dir) builder = DummyGeneratorBasedBuilder(cache_dir=cache_dir) - builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD) + builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD) with assert_arrow_memory_increases() if in_memory else assert_arrow_memory_doesnt_increase(): dataset = builder.as_dataset("train", in_memory=in_memory) assert dataset.data.to_pydict() == {"text": ["foo"] * 100} @@ -955,7 +945,7 @@ def test_custom_writer_batch_size(tmp_path, writer_batch_size, default_writer_ba DummyGeneratorBasedBuilder.DEFAULT_WRITER_BATCH_SIZE = default_writer_batch_size builder = DummyGeneratorBasedBuilder(cache_dir=cache_dir, writer_batch_size=writer_batch_size) assert builder._writer_batch_size == (writer_batch_size or default_writer_batch_size) - builder.download_and_prepare(try_from_hf_gcs=False, download_mode=DownloadMode.FORCE_REDOWNLOAD) + builder.download_and_prepare(download_mode=DownloadMode.FORCE_REDOWNLOAD) dataset = builder.as_dataset("train") assert len(dataset.data[0].chunks) == expected_chunks diff --git a/tests/test_hf_gcp.py b/tests/test_hf_gcp.py index 7278e079a8b..3620b7a7c14 100644 --- a/tests/test_hf_gcp.py +++ b/tests/test_hf_gcp.py @@ -5,67 +5,79 @@ import pytest from absl.testing import parameterized -from datasets import config +from datasets import config, load_dataset_builder from datasets.arrow_reader import HF_GCP_BASE_URL -from datasets.builder import DatasetBuilder from datasets.dataset_dict import IterableDatasetDict from datasets.iterable_dataset import IterableDataset -from datasets.load import dataset_module_factory, import_main_class from datasets.utils.file_utils import cached_path DATASETS_ON_HF_GCP = [ - {"dataset": "wikipedia", "config_name": "20220301.de"}, - {"dataset": "wikipedia", "config_name": "20220301.en"}, - {"dataset": "wikipedia", "config_name": "20220301.fr"}, - {"dataset": "wikipedia", "config_name": "20220301.frr"}, - {"dataset": "wikipedia", "config_name": "20220301.it"}, - {"dataset": "wikipedia", "config_name": "20220301.simple"}, - {"dataset": "wiki40b", "config_name": "en"}, - {"dataset": "wiki_dpr", "config_name": "psgs_w100.nq.compressed"}, - {"dataset": "wiki_dpr", "config_name": "psgs_w100.nq.no_index"}, - {"dataset": "wiki_dpr", "config_name": "psgs_w100.multiset.no_index"}, - {"dataset": "natural_questions", "config_name": "default"}, + {"dataset": "wikipedia", "config_name": "20220301.de", "revision": "4d013bdd32c475c8536aae00a56efc774f061649"}, + {"dataset": "wikipedia", "config_name": "20220301.en", "revision": "4d013bdd32c475c8536aae00a56efc774f061649"}, + {"dataset": "wikipedia", "config_name": "20220301.fr", "revision": "4d013bdd32c475c8536aae00a56efc774f061649"}, + {"dataset": "wikipedia", "config_name": "20220301.frr", "revision": "4d013bdd32c475c8536aae00a56efc774f061649"}, + {"dataset": "wikipedia", "config_name": "20220301.it", "revision": "4d013bdd32c475c8536aae00a56efc774f061649"}, + {"dataset": "wikipedia", "config_name": "20220301.simple", "revision": "4d013bdd32c475c8536aae00a56efc774f061649"}, + {"dataset": "wiki40b", "config_name": "en", "revision": "7b21a2e64b90323b2d3d1b81aa349bb4bc76d9bf"}, + { + "dataset": "wiki_dpr", + "config_name": "psgs_w100.nq.compressed", + "revision": "b24a417d802a583f8922946c1c75210290e93108", + }, + { + "dataset": "wiki_dpr", + "config_name": "psgs_w100.nq.no_index", + "revision": "b24a417d802a583f8922946c1c75210290e93108", + }, + { + "dataset": "wiki_dpr", + "config_name": "psgs_w100.multiset.no_index", + "revision": "b24a417d802a583f8922946c1c75210290e93108", + }, + {"dataset": "natural_questions", "config_name": "default", "revision": "19ba7767b174ad046a84f46af056517a3910ee57"}, ] -def list_datasets_on_hf_gcp_parameters(with_config=True): +def list_datasets_on_hf_gcp_parameters(with_config=True, with_revision=True): + columns = ["dataset"] if with_config: - return [ - { - "testcase_name": d["dataset"] + "/" + d["config_name"], - "dataset": d["dataset"], - "config_name": d["config_name"], - } - for d in DATASETS_ON_HF_GCP - ] - else: - return [ - {"testcase_name": dataset, "dataset": dataset} for dataset in {d["dataset"] for d in DATASETS_ON_HF_GCP} - ] - - -@parameterized.named_parameters(list_datasets_on_hf_gcp_parameters(with_config=True)) + columns.append("config_name") + if with_revision: + columns.append("revision") + dataset_list = [{col: dataset[col] for col in columns} for dataset in DATASETS_ON_HF_GCP] + + def get_testcase_name(dataset): + testcase_name = dataset["dataset"] + if with_config: + testcase_name += "/" + dataset["config_name"] + if with_revision: + testcase_name += "@" + dataset["revision"] + return testcase_name + + dataset_list = [{"testcase_name": get_testcase_name(dataset), **dataset} for dataset in dataset_list] + return dataset_list + + +@parameterized.named_parameters(list_datasets_on_hf_gcp_parameters(with_config=True, with_revision=True)) class TestDatasetOnHfGcp(TestCase): dataset = None config_name = None + revision = None - def test_dataset_info_available(self, dataset, config_name): + def test_dataset_info_available(self, dataset, config_name, revision): with TemporaryDirectory() as tmp_dir: - dataset_module = dataset_module_factory(dataset, cache_dir=tmp_dir) - - builder_cls = import_main_class(dataset_module.module_path, dataset=True) - - builder_instance: DatasetBuilder = builder_cls( + builder = load_dataset_builder( + dataset, + config_name, + revision=revision, cache_dir=tmp_dir, - config_name=config_name, - hash=dataset_module.hash, ) dataset_info_url = "/".join( [ HF_GCP_BASE_URL, - builder_instance._relative_data_dir(with_hash=False).replace(os.sep, "/"), + builder._relative_data_dir(with_hash=False).replace(os.sep, "/"), config.DATASET_INFO_FILENAME, ] ) @@ -76,30 +88,20 @@ def test_dataset_info_available(self, dataset, config_name): @pytest.mark.integration def test_as_dataset_from_hf_gcs(tmp_path_factory): tmp_dir = tmp_path_factory.mktemp("test_hf_gcp") / "test_wikipedia_simple" - dataset_module = dataset_module_factory("wikipedia", cache_dir=tmp_dir) - builder_cls = import_main_class(dataset_module.module_path) - builder_instance: DatasetBuilder = builder_cls( - cache_dir=tmp_dir, - config_name="20220301.frr", - hash=dataset_module.hash, - ) + builder = load_dataset_builder("wikipedia", "20220301.frr", cache_dir=tmp_dir) # use the HF cloud storage, not the original download_and_prepare that uses apache-beam - builder_instance._download_and_prepare = None - builder_instance.download_and_prepare() - ds = builder_instance.as_dataset() + builder._download_and_prepare = None + builder.download_and_prepare(try_from_hf_gcs=True) + ds = builder.as_dataset() assert ds @pytest.mark.integration def test_as_streaming_dataset_from_hf_gcs(tmp_path): - dataset_module = dataset_module_factory("wikipedia", cache_dir=tmp_path) - builder_cls = import_main_class(dataset_module.module_path, dataset=True) - builder_instance: DatasetBuilder = builder_cls( - cache_dir=tmp_path, - config_name="20220301.frr", - hash=dataset_module.hash, + builder = load_dataset_builder( + "wikipedia", "20220301.frr", revision="4d013bdd32c475c8536aae00a56efc774f061649", cache_dir=tmp_path ) - ds = builder_instance.as_streaming_dataset() + ds = builder.as_streaming_dataset() assert ds assert isinstance(ds, IterableDatasetDict) assert "train" in ds