Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datasets/timit_asr/timit_asr.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def _generate_examples(self, data_info_csv):
data_path = os.path.join(os.path.dirname(data_info_csv).strip(), "data")

# Read the data info to extract rows mentioning about non-converted audio only
data_info = pd.read_csv(data_info_csv, encoding="utf8")
data_info = pd.read_csv(open(data_info_csv, encoding="utf8"))
# making sure that the columns having no information about the file paths are removed
data_info.dropna(subset=["path_from_data_dir"], inplace=True)

Expand Down
2 changes: 1 addition & 1 deletion docs/source/dataset_streaming.rst
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ Dataset script compatibility
Now that you are aware of how dataset streaming works, you can make sure your dataset script work in streaming mode:

1. make sure you use ``open`` to open the data files: it is extended to work with remote files
2. if you have to deal with archives like ZIP files, make sure you use ``os.path.join`` to navigate in the archive
2. if you have to deal with archives like ZIP files, make sure you use ``os.path.join`` and ``os.path.dirname`` to navigate in the archive

Currently a few python functions or classes are not supported for dataset streaming:

Expand Down
3 changes: 2 additions & 1 deletion src/datasets/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from .utils.logging import get_logger
from .utils.patching import patch_submodule
from .utils.streaming_download_manager import xjoin, xopen, xpathjoin, xpathopen
from .utils.streaming_download_manager import xdirname, xjoin, xopen, xpathjoin, xpathopen


logger = get_logger(__name__)
Expand Down Expand Up @@ -38,6 +38,7 @@ def extend_module_for_streaming(module_path, use_auth_token: Optional[Union[str,
patch_submodule(module, "open", xopen).start()
# allow to navigate in remote zip files
patch_submodule(module, "os.path.join", xjoin).start()
patch_submodule(module, "os.path.dirname", xdirname).start()
if hasattr(module, "Path"):
patch.object(module.Path, "joinpath", xpathjoin).start()
patch.object(module.Path, "__truediv__", xpathjoin).start()
Expand Down
31 changes: 31 additions & 0 deletions src/datasets/utils/streaming_download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,37 @@ def xjoin(a, *p):
return "::".join([a] + b)


def xdirname(a, *p):
"""
This function extends os.path.dirname to support the "::" hop separator. It supports both paths and urls.

A shorthand, particularly useful where you have multiple hops, is to “chain” the URLs with the special separator "::".
This is used to access files inside a zip file over http for example.

Let's say you have a zip file at https://host.com/archive.zip, and you want to access the file inside the zip file at /folder1/file.txt.
Then you can just chain the url this way:

zip://folder1/file.txt::https://host.com/archive.zip

The xdirname function allows you to apply the dirname on the first path of the chain.

Example::

>>> xdirname("zip://folder1/file.txt::https://host.com/archive.zip")
zip://folder1::https://host.com/archive.zip
"""
a, *b = a.split("::")
if is_local_path(a):
a = os.path.dirname(Path(a).as_posix())
else:
a = posixpath.dirname(a)
# if we end up at the root of the protocol, we get for example a = 'http:'
# so we have to fix it by adding the '//' that was removed:
if a.endswith(":"):
a += "//"
return "::".join([a] + b)


def _as_posix(path: Path):
"""Extend :meth:`pathlib.PurePath.as_posix` to fix missing slash after protocol.

Expand Down
56 changes: 53 additions & 3 deletions tests/test_streaming_download_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re
from pathlib import Path

import pytest
Expand All @@ -21,6 +22,28 @@
TEST_URL_CONTENT = "foo\nbar\nfoobar"


def _readd_double_slash_removed_by_path(path_as_posix: str) -> str:
"""Path(...) on an url path like zip://file.txt::http://host.com/data.zip
converts the :// to :/
This function readds the ://

It handles cases like:

- https://host.com/data.zip
- C://data.zip
- zip://file.txt::https://host.com/data.zip
- zip://file.txt::/Users/username/data.zip
- zip://file.txt::C://data.zip

Args:
path_as_posix (str): output of Path(...).as_posix()

Returns:
str: the url path with :// instead of :/
"""
return re.sub("([A-z]:/)([A-z:])", r"\g<1>/\g<2>", path_as_posix)


@pytest.mark.parametrize(
"input_path, expected_path",
[("zip:/test.txt::/Users/username/bar.zip", "zip://test.txt::/Users/username/bar.zip")],
Expand All @@ -32,6 +55,7 @@ def test_as_posix(input_path, expected_path):
@pytest.mark.parametrize(
"input_path, paths_to_join, expected_path",
[
(str(Path(__file__).resolve().parent), (Path(__file__).name,), str(Path(__file__).resolve())),
("https://host.com/archive.zip", ("file.txt",), "https://host.com/archive.zip/file.txt"),
(
"zip://::https://host.com/archive.zip",
Expand All @@ -57,11 +81,35 @@ def test_as_posix(input_path, expected_path):
)
def test_xjoin(input_path, paths_to_join, expected_path):
output_path = xjoin(input_path, *paths_to_join)
assert output_path == expected_path
output_path = _readd_double_slash_removed_by_path(Path(output_path).as_posix())
assert output_path == _readd_double_slash_removed_by_path(Path(expected_path).as_posix())
output_path = xpathjoin(Path(input_path), *paths_to_join)
assert output_path == Path(expected_path)


@pytest.mark.parametrize(
"input_path, expected_path",
[
(str(Path(__file__).resolve()), str(Path(__file__).resolve().parent)),
("https://host.com/archive.zip", "https://host.com"),
(
"zip://file.txt::https://host.com/archive.zip",
"zip://::https://host.com/archive.zip",
),
(
"zip://folder/file.txt::https://host.com/archive.zip",
"zip://folder::https://host.com/archive.zip",
),
],
)
def test_xdirname(input_path, expected_path):
from datasets.utils.streaming_download_manager import xdirname

output_path = xdirname(input_path)
output_path = _readd_double_slash_removed_by_path(Path(output_path).as_posix())
assert output_path == _readd_double_slash_removed_by_path(Path(expected_path).as_posix())


def test_xopen_local(text_path):
with xopen(text_path, encoding="utf-8") as f, open(text_path, encoding="utf-8") as expected_file:
assert list(f) == list(expected_file)
Expand Down Expand Up @@ -99,7 +147,8 @@ def test_streaming_dl_manager_download_and_extract_no_extraction(urlpath):
def test_streaming_dl_manager_extract(text_gz_path, text_path):
dl_manager = StreamingDownloadManager()
output_path = dl_manager.extract(text_gz_path)
path = os.path.basename(text_gz_path).rstrip(".gz")
path = os.path.basename(text_gz_path)
path = path[: path.rindex(".")]
assert output_path == f"gzip://{path}::{text_gz_path}"
fsspec_open_file = xopen(output_path, encoding="utf-8")
with fsspec_open_file as f, open(text_path, encoding="utf-8") as expected_file:
Expand All @@ -109,7 +158,8 @@ def test_streaming_dl_manager_extract(text_gz_path, text_path):
def test_streaming_dl_manager_download_and_extract_with_extraction(text_gz_path, text_path):
dl_manager = StreamingDownloadManager()
output_path = dl_manager.download_and_extract(text_gz_path)
path = os.path.basename(text_gz_path).rstrip(".gz")
path = os.path.basename(text_gz_path)
path = path[: path.rindex(".")]
assert output_path == f"gzip://{path}::{text_gz_path}"
fsspec_open_file = xopen(output_path, encoding="utf-8")
with fsspec_open_file as f, open(text_path, encoding="utf-8") as expected_file:
Expand Down