Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 3 additions & 14 deletions src/datasets/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,21 +120,10 @@
logger.info("Disabling Apache Beam because USE_BEAM is set to False")


USE_RAR = os.environ.get("USE_RAR", "AUTO").upper()
RARFILE_VERSION = "N/A"
RARFILE_AVAILABLE = False
if USE_RAR in ("1", "ON", "YES", "AUTO"):
try:
RARFILE_VERSION = version.parse(importlib_metadata.version("rarfile"))
RARFILE_AVAILABLE = True
logger.info("rarfile available.")
except importlib_metadata.PackageNotFoundError:
pass
else:
logger.info("Disabling rarfile because USE_RAR is set to False")


# Optional compression tools
RARFILE_AVAILABLE = importlib.util.find_spec("rarfile") is not None
ZSTANDARD_AVAILABLE = importlib.util.find_spec("zstandard") is not None
LZ4_AVAILABLE = importlib.util.find_spec("lz4") is not None


# Cache location
Expand Down
11 changes: 10 additions & 1 deletion src/datasets/filesystems/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import importlib
from typing import List

import fsspec

Expand All @@ -10,9 +11,17 @@
if _has_s3fs:
from .s3filesystem import S3FileSystem # noqa: F401

COMPRESSION_FILESYSTEMS: List[compression.BaseCompressedFileFileSystem] = [
compression.Bz2FileSystem,
compression.GZipFileSystem,
compression.Lz4FileSystem,
compression.XzFileSystem,
compression.ZstdFileSystem,
]

# Register custom filesystems
fsspec.register_implementation(compression.gzip.GZipFileSystem.protocol, compression.gzip.GZipFileSystem)
for fs_class in COMPRESSION_FILESYSTEMS:
fsspec.register_implementation(fs_class.protocol, fs_class)


def extract_path_from_uri(dataset_path: str) -> str:
Expand Down
168 changes: 168 additions & 0 deletions src/datasets/filesystems/compression.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import os
from typing import Optional

import fsspec
from fsspec.archive import AbstractArchiveFileSystem
from fsspec.utils import DEFAULT_BLOCK_SIZE


class BaseCompressedFileFileSystem(AbstractArchiveFileSystem):
"""Read contents of compressed file as a filesystem with one file inside."""

root_marker = ""
protocol: str = (
None # protocol passed in prefix to the url. ex: "gzip", for gzip://file.txt::http://foo.bar/file.txt.gz
)
compression: str = None # compression type in fsspec. ex: "gzip"
extension: str = None # extension of the filename to strip. ex: "".gz" to get file.txt from file.txt.gz

def __init__(
self, fo: str = "", target_protocol: Optional[str] = None, target_options: Optional[dict] = None, **kwargs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to use a more descriptive name for fo? e.g. filepath since we explain in the docstring that we use ffspec.open?

Copy link
Member Author

@lhoestq lhoestq Aug 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fo is the variable name in fsspec to wrap any file to open. I went with the same convention.
It is used when unchaining a chained URL. See the note at the end of https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining

For developers: this “chaining” methods works by formatting
the arguments passed to open_* into target_protocol (a simple
string) and target_options (a dict) and also optionally fo
(target path, if a specific file is required). In order for an
implementation to chain successfully like this, it must look
for exactly those named arguments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok sounds good! since we're sticking close to fsspec let's keep is as fo

):
"""
The compressed file system can be instantiated from any compressed file.
It reads the contents of compressed file as a filesystem with one file inside, as if it was an archive.

The single file inside the filesystem is named after the compresssed file,
without the compression extension at the end of the filename.

Args:
fo (:obj:``str``): Path to compressed file. Will fetch file using ``fsspec.open()``
mode (:obj:``str``): Currently, only 'rb' accepted
target_protocol(:obj:``str``, optional): To override the FS protocol inferred from a URL.
target_options (:obj:``dict``, optional): Kwargs passed when instantiating the target FS.
"""
super().__init__(self, **kwargs)
# always open as "rb" since fsspec can then use the TextIOWrapper to make it work for "r" mode
self.file = fsspec.open(
fo, mode="rb", protocol=target_protocol, compression=self.compression, **(target_options or {})
)
self.info = self.file.fs.info(self.file.path)
self.compressed_name = os.path.basename(self.file.path.split("::")[0])
self.uncompressed_name = self.compressed_name[: self.compressed_name.rindex(".")]
self.dir_cache = None

@classmethod
def _strip_protocol(cls, path):
# compressed file paths are always relative to the archive root
return super()._strip_protocol(path).lstrip("/")

def _get_dirs(self):
if self.dir_cache is None:
f = {**self.info, "name": self.uncompressed_name}
self.dir_cache = {f["name"]: f}

def cat(self, path: str):
return self.file.open().read()

def _open(
self,
path: str,
mode: str = "rb",
block_size=None,
autocommit=True,
cache_options=None,
**kwargs,
):
path = self._strip_protocol(path)
if mode != "rb":
raise ValueError(f"Tried to read with mode {mode} on file {self.file.path} opened with mode 'rb'")
if path != self.uncompressed_name:
raise FileNotFoundError(f"Expected file {self.uncompressed_name} but got {path}")
return self.file.open()


class Bz2FileSystem(BaseCompressedFileFileSystem):
"""Read contents of BZ2 file as a filesystem with one file inside."""

protocol = "bz2"
compression = "bz2"
extension = ".bz2"


class GZipFileSystem(BaseCompressedFileFileSystem):
"""Read contents of GZIP file as a filesystem with one file inside."""

protocol = "gzip"
compression = "gzip"
extension = ".gz"


class Lz4FileSystem(BaseCompressedFileFileSystem):
"""Read contents of Lz4 file as a filesystem with one file inside."""

protocol = "lz4"
compression = "lz4"
extension = ".lz4"


class XzFileSystem(BaseCompressedFileFileSystem):
"""Read contents of .xz (LZMA) file as a filesystem with one file inside."""

protocol = "xz"
compression = "xz"
extension = ".xz"


class ZstdFileSystem(BaseCompressedFileFileSystem):
"""
Read contents of zstd file as a filesystem with one file inside.

Note that reading in binary mode with fsspec isn't supported yet:
https://github.com/indygreg/python-zstandard/issues/136
"""

protocol = "zstd"
compression = "zstd"
extension = ".zst"

def __init__(
self,
fo: str,
mode: str = "rb",
target_protocol: Optional[str] = None,
target_options: Optional[dict] = None,
block_size: int = DEFAULT_BLOCK_SIZE,
**kwargs,
):
super().__init__(
fo=fo,
mode=mode,
target_protocol=target_protocol,
target_options=target_options,
block_size=block_size,
**kwargs,
)
# We need to wrap the zstd decompressor to avoid this error in fsspec==2021.7.0 and zstandard==0.15.2:
#
# File "/Users/user/.virtualenvs/hf-datasets/lib/python3.7/site-packages/fsspec/core.py", line 145, in open
# out.close = close
# AttributeError: 'zstd.ZstdDecompressionReader' object attribute 'close' is read-only
#
# see https://github.com/intake/filesystem_spec/issues/725
_enter = self.file.__enter__

class WrappedFile:
def __init__(self, file_):
self._file = file_

def __enter__(self):
self._file.__enter__()
return self

def __exit__(self, *args, **kwargs):
self._file.__exit__(*args, **kwargs)

def __iter__(self):
return iter(self._file)

def __next__(self):
return next(self._file)

def __getattr__(self, attr):
return getattr(self._file, attr)

def fixed_enter(*args, **kwargs):
return WrappedFile(_enter(*args, **kwargs))

self.file.__enter__ = fixed_enter
1 change: 0 additions & 1 deletion src/datasets/filesystems/compression/__init__.py

This file was deleted.

70 changes: 0 additions & 70 deletions src/datasets/filesystems/compression/gzip.py

This file was deleted.

Loading