From d098a77f2f8c590310d0ea07e64b782541ca4f7a Mon Sep 17 00:00:00 2001 From: Quentin Lhoest Date: Wed, 25 Aug 2021 16:19:40 +0200 Subject: [PATCH 1/7] make timit_asr streamable --- datasets/timit_asr/timit_asr.py | 2 +- src/datasets/streaming.py | 3 +- .../utils/streaming_download_manager.py | 31 +++++++++++++++++++ tests/test_streaming_download_manager.py | 30 ++++++++++++++++-- 4 files changed, 62 insertions(+), 4 deletions(-) diff --git a/datasets/timit_asr/timit_asr.py b/datasets/timit_asr/timit_asr.py index 8853c29afa7..7b2e13e8d47 100644 --- a/datasets/timit_asr/timit_asr.py +++ b/datasets/timit_asr/timit_asr.py @@ -121,7 +121,7 @@ def _generate_examples(self, data_info_csv): data_path = os.path.join(os.path.dirname(data_info_csv).strip(), "data") # Read the data info to extract rows mentioning about non-converted audio only - data_info = pd.read_csv(data_info_csv, encoding="utf8") + data_info = pd.read_csv(open(data_info_csv, encoding="utf8")) # making sure that the columns having no information about the file paths are removed data_info.dropna(subset=["path_from_data_dir"], inplace=True) diff --git a/src/datasets/streaming.py b/src/datasets/streaming.py index 5a6272e5240..b05dab6aea6 100644 --- a/src/datasets/streaming.py +++ b/src/datasets/streaming.py @@ -4,7 +4,7 @@ from .utils.logging import get_logger from .utils.patching import patch_submodule -from .utils.streaming_download_manager import xjoin, xopen +from .utils.streaming_download_manager import xdirname, xjoin, xopen logger = get_logger(__name__) @@ -27,3 +27,4 @@ def extend_module_for_streaming(module_path, use_auth_token: Optional[Union[str, patch_submodule(module, "open", xopen).start() # allow to navigate in remote zip files patch_submodule(module, "os.path.join", xjoin).start() + patch_submodule(module, "os.path.dirname", xdirname).start() diff --git a/src/datasets/utils/streaming_download_manager.py b/src/datasets/utils/streaming_download_manager.py index 14c5d5a0034..2c876ee78ea 100644 --- a/src/datasets/utils/streaming_download_manager.py +++ b/src/datasets/utils/streaming_download_manager.py @@ -56,6 +56,37 @@ def xjoin(a, *p): return "::".join([a] + b) +def xdirname(a, *p): + """ + This function extends os.path.dirname to support the "::" hop separator. It supports both paths and urls. + + A shorthand, particularly useful where you have multiple hops, is to “chain” the URLs with the special separator "::". + This is used to access files inside a zip file over http for example. + + Let's say you have a zip file at https://host.com/archive.zip, and you want to access the file inside the zip file at /folder1/file.txt. + Then you can just chain the url this way: + + zip://folder1/file.txt::https://host.com/archive.zip + + The xdirname function allows you to apply the dirname on the first path of the chain. + + Example:: + + >>> xdirname("zip://folder1/file.txt::https://host.com/archive.zip") + zip://folder1::https://host.com/archive.zip + """ + a, *b = a.split("::") + if is_local_path(a): + a = os.path.dirname(a) + else: + a = posixpath.dirname(a) + # if we end up at the root of the protocol, we get for example a = 'http:' + # so we have to fix it by adding the '//' that was removed: + if a.endswith(":"): + a += "//" + return "::".join([a] + b) + + def _add_retries_to_file_obj_read_method(file_obj): read = file_obj.read max_retries = config.STREAMING_READ_MAX_RETRIES diff --git a/tests/test_streaming_download_manager.py b/tests/test_streaming_download_manager.py index 9ab795cac50..4948b645fba 100644 --- a/tests/test_streaming_download_manager.py +++ b/tests/test_streaming_download_manager.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import pytest @@ -15,6 +16,7 @@ @pytest.mark.parametrize( "input_path, paths_to_join, expected_path", [ + (str(Path(__file__).resolve().parent), (Path(__file__).name,), str(Path(__file__).resolve())), ("https://host.com/archive.zip", ("file.txt",), "https://host.com/archive.zip/file.txt"), ( "zip://::https://host.com/archive.zip", @@ -35,6 +37,28 @@ def test_xjoin(input_path, paths_to_join, expected_path): assert output_path == expected_path +@pytest.mark.parametrize( + "input_path, expected_path", + [ + (str(Path(__file__).resolve()), str(Path(__file__).resolve().parent)), + ("https://host.com/archive.zip", "https://host.com"), + ( + "zip://file.txt::https://host.com/archive.zip", + "zip://::https://host.com/archive.zip", + ), + ( + "zip://folder/file.txt::https://host.com/archive.zip", + "zip://folder::https://host.com/archive.zip", + ), + ], +) +def test_xdirname(input_path, expected_path): + from datasets.utils.streaming_download_manager import xdirname + + output_path = xdirname(input_path) + assert output_path == expected_path + + def test_xopen_local(text_path): with xopen(text_path, encoding="utf-8") as f, open(text_path, encoding="utf-8") as expected_file: @@ -79,7 +103,8 @@ def test_streaming_dl_manager_extract(text_gz_path, text_path): dl_manager = StreamingDownloadManager() output_path = dl_manager.extract(text_gz_path) - path = os.path.basename(text_gz_path).rstrip(".gz") + path = os.path.basename(text_gz_path) + path = path[: path.rindex(".")] assert output_path == f"gzip://{path}::{text_gz_path}" fsspec_open_file = xopen(output_path, encoding="utf-8") with fsspec_open_file as f, open(text_path, encoding="utf-8") as expected_file: @@ -91,7 +116,8 @@ def test_streaming_dl_manager_download_and_extract_with_extraction(text_gz_path, dl_manager = StreamingDownloadManager() output_path = dl_manager.download_and_extract(text_gz_path) - path = os.path.basename(text_gz_path).rstrip(".gz") + path = os.path.basename(text_gz_path) + path = path[: path.rindex(".")] assert output_path == f"gzip://{path}::{text_gz_path}" fsspec_open_file = xopen(output_path, encoding="utf-8") with fsspec_open_file as f, open(text_path, encoding="utf-8") as expected_file: From 1616c7d19414946d2b837c7d1158b89c1deb4fff Mon Sep 17 00:00:00 2001 From: Quentin Lhoest Date: Wed, 25 Aug 2021 16:21:06 +0200 Subject: [PATCH 2/7] update docs about dirname --- docs/source/dataset_streaming.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/dataset_streaming.rst b/docs/source/dataset_streaming.rst index aab8e009fbd..527d4d7b4e6 100644 --- a/docs/source/dataset_streaming.rst +++ b/docs/source/dataset_streaming.rst @@ -263,7 +263,7 @@ Dataset script compatibility Now that you are aware of how dataset streaming works, you can make sure your dataset script work in streaming mode: 1. make sure you use ``open`` to open the data files: it is extended to work with remote files -2. if you have to deal with archives like ZIP files, make sure you use ``os.path.join`` to navigate in the archive +2. if you have to deal with archives like ZIP files, make sure you use ``os.path.join`` and ``os.path.dirname`` to navigate in the archive Currently a few python functions or classes are not supported for dataset streaming: From 3e65613b1206fc144aa879f6a64e1e51eb8383a5 Mon Sep 17 00:00:00 2001 From: Quentin Lhoest Date: Wed, 25 Aug 2021 16:37:10 +0200 Subject: [PATCH 3/7] fix test --- tests/test_streaming_download_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_streaming_download_manager.py b/tests/test_streaming_download_manager.py index 4948b645fba..2e9cd6e59c7 100644 --- a/tests/test_streaming_download_manager.py +++ b/tests/test_streaming_download_manager.py @@ -34,7 +34,7 @@ def test_xjoin(input_path, paths_to_join, expected_path): from datasets.utils.streaming_download_manager import xjoin output_path = xjoin(input_path, *paths_to_join) - assert output_path == expected_path + assert output_path == Path(expected_path).as_posix() @pytest.mark.parametrize( @@ -56,7 +56,7 @@ def test_xdirname(input_path, expected_path): from datasets.utils.streaming_download_manager import xdirname output_path = xdirname(input_path) - assert output_path == expected_path + assert output_path == Path(expected_path).as_posix() def test_xopen_local(text_path): From 907f5e451c26b3c263817bc9d0555e8dd004b705 Mon Sep 17 00:00:00 2001 From: Quentin Lhoest Date: Tue, 7 Sep 2021 11:33:09 +0200 Subject: [PATCH 4/7] fix tests --- tests/test_streaming_download_manager.py | 27 ++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/tests/test_streaming_download_manager.py b/tests/test_streaming_download_manager.py index a67d2bb3259..f4cf09b9a1f 100644 --- a/tests/test_streaming_download_manager.py +++ b/tests/test_streaming_download_manager.py @@ -1,4 +1,5 @@ import os +import re from pathlib import Path import pytest @@ -13,6 +14,28 @@ TEST_URL_CONTENT = "foo\nbar\nfoobar" +def _readd_double_slash_removed_by_path(path_as_posix: str) -> str: + """Path(...) on an url path like zip://file.txt::http://host.com/data.zip + converts the :// to :/ + This function readds the :// + + It handles cases like: + + - https://host.com/data.zip + - C://data.zip + - zip://file.txt::https://host.com/data.zip + - zip://file.txt::/Users/username/data.zip + - zip://file.txt::C://data.zip + + Args: + path_as_posix (str): output of Path(...).as_posix() + + Returns: + str: the url path with :// instead of :/ + """ + return re.sub("([A-z]:/)([A-z:])", "\g<1>/\g<2>", path_as_posix) + + @pytest.mark.parametrize( "input_path, paths_to_join, expected_path", [ @@ -34,7 +57,7 @@ def test_xjoin(input_path, paths_to_join, expected_path): from datasets.utils.streaming_download_manager import xjoin output_path = xjoin(input_path, *paths_to_join) - assert output_path == Path(expected_path).as_posix() + assert output_path == _readd_double_slash_removed_by_path(Path(expected_path).as_posix()) @pytest.mark.parametrize( @@ -56,7 +79,7 @@ def test_xdirname(input_path, expected_path): from datasets.utils.streaming_download_manager import xdirname output_path = xdirname(input_path) - assert output_path == Path(expected_path).as_posix() + assert output_path == _readd_double_slash_removed_by_path(Path(expected_path).as_posix()) def test_xopen_local(text_path): From 277f813cf4844694544cba3b402847320449ed3c Mon Sep 17 00:00:00 2001 From: Quentin Lhoest Date: Tue, 7 Sep 2021 11:41:52 +0200 Subject: [PATCH 5/7] style --- tests/test_streaming_download_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_streaming_download_manager.py b/tests/test_streaming_download_manager.py index f4cf09b9a1f..3f57125a5d8 100644 --- a/tests/test_streaming_download_manager.py +++ b/tests/test_streaming_download_manager.py @@ -33,7 +33,7 @@ def _readd_double_slash_removed_by_path(path_as_posix: str) -> str: Returns: str: the url path with :// instead of :/ """ - return re.sub("([A-z]:/)([A-z:])", "\g<1>/\g<2>", path_as_posix) + return re.sub("([A-z]:/)([A-z:])", r"\g<1>/\g<2>", path_as_posix) @pytest.mark.parametrize( From 5810c2079b3d00f15d30949a875e079023ea4845 Mon Sep 17 00:00:00 2001 From: Quentin Lhoest Date: Tue, 7 Sep 2021 13:24:54 +0200 Subject: [PATCH 6/7] fix windows test --- src/datasets/utils/streaming_download_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datasets/utils/streaming_download_manager.py b/src/datasets/utils/streaming_download_manager.py index a6001fa20ca..c4f79e33673 100644 --- a/src/datasets/utils/streaming_download_manager.py +++ b/src/datasets/utils/streaming_download_manager.py @@ -77,7 +77,7 @@ def xdirname(a, *p): """ a, *b = a.split("::") if is_local_path(a): - a = os.path.dirname(a) + a = os.path.dirname(Path(a).as_posix()) else: a = posixpath.dirname(a) # if we end up at the root of the protocol, we get for example a = 'http:' From b5165df5e03e805a2391507bf25f56d155b4b565 Mon Sep 17 00:00:00 2001 From: Quentin Lhoest Date: Tue, 7 Sep 2021 13:35:23 +0200 Subject: [PATCH 7/7] again --- tests/test_streaming_download_manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_streaming_download_manager.py b/tests/test_streaming_download_manager.py index 3f57125a5d8..56bf526f516 100644 --- a/tests/test_streaming_download_manager.py +++ b/tests/test_streaming_download_manager.py @@ -57,6 +57,7 @@ def test_xjoin(input_path, paths_to_join, expected_path): from datasets.utils.streaming_download_manager import xjoin output_path = xjoin(input_path, *paths_to_join) + output_path = _readd_double_slash_removed_by_path(Path(output_path).as_posix()) assert output_path == _readd_double_slash_removed_by_path(Path(expected_path).as_posix()) @@ -79,6 +80,7 @@ def test_xdirname(input_path, expected_path): from datasets.utils.streaming_download_manager import xdirname output_path = xdirname(input_path) + output_path = _readd_double_slash_removed_by_path(Path(output_path).as_posix()) assert output_path == _readd_double_slash_removed_by_path(Path(expected_path).as_posix())