Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 13 additions & 17 deletions datalad_fuse/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import methodtools

from .consts import CACHE_SIZE
from .utils import is_annex_dir_or_key
from .utils import AnnexKey, is_annex_dir_or_key

lgr = logging.getLogger("datalad.fuse.fsspec")

Expand Down Expand Up @@ -51,19 +51,17 @@ def close(self) -> None:
self.annex._batched.clear()

@methodtools.lru_cache(maxsize=CACHE_SIZE)
def get_file_state(self, relpath: str) -> Tuple[FileState, Optional[str]]:
def get_file_state(self, relpath: str) -> Tuple[FileState, Optional[AnnexKey]]:
p = self.path / relpath
lgr.debug("get_file_state: %s", relpath)

def handle_path_under_annex_objects(p: Path):
iadok = is_annex_dir_or_key(p)
if iadok is not None and iadok[1] == "key":
assert iadok[0] == str(self.path)
key = filename2key(p.name)
if isinstance(iadok, AnnexKey):
if p.exists():
return (FileState.HAS_CONTENT, key)
return (FileState.HAS_CONTENT, iadok)
else:
return (FileState.NO_CONTENT, key)
return (FileState.NO_CONTENT, iadok)
else:
return (FileState.NOT_ANNEXED, None)

Expand All @@ -75,7 +73,7 @@ def handle_path_under_annex_objects(p: Path):
if not p.is_symlink():
if p.stat().st_size < 1024 and self.annex is not None:
if self.annex.is_under_annex(relpath, batch=True):
key = self.annex.get_file_key(relpath, batch=True)
key = AnnexKey.parse(self.annex.get_file_key(relpath, batch=True))
if self.annex.file_has_content(relpath, batch=True):
return (FileState.HAS_CONTENT, key)
else:
Expand Down Expand Up @@ -163,7 +161,7 @@ def open(
)
if fstate is FileState.NO_CONTENT:
lgr.debug("%s: opening via fsspec", relpath)
for url in self.get_urls(key):
for url in self.get_urls(str(key)):
try:
lgr.debug("%s: Attempting to open via URL %s", relpath, url)
return self.fs.open(url, mode, **kwargs)
Expand Down Expand Up @@ -242,6 +240,12 @@ def open(
)
return dsap.open(relpath, mode=mode, encoding=encoding, errors=errors)

def get_file_state(
self, filepath: Union[str, Path]
) -> Tuple[FileState, Optional[AnnexKey]]:
dsap, relpath = self.resolve_dataset(filepath)
return dsap.get_file_state(relpath)

def is_under_annex(self, filepath: Union[str, Path]) -> bool:
dsap, relpath = self.resolve_dataset(filepath)
fstate, _ = dsap.get_file_state(relpath)
Expand All @@ -254,11 +258,3 @@ def get_commit_datetime(self, filepath: Union[str, Path]) -> datetime:

def is_http_url(s: str) -> bool:
return s.lower().startswith(("http://", "https://"))


def filename2key(name: str) -> str:
# See `keyFile` and `fileKey` in `Annex/Locations.hs` in the git-annex
# source
return (
name.replace("%", "/").replace("&c", ":").replace("&s", "%").replace("&a", "&")
)
62 changes: 37 additions & 25 deletions datalad_fuse/fuse_.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# Make it relatively small since we are aiming for metadata records ATM
# Seems of no real good positive net ATM
# BLOCK_SIZE = 2**20 # 1M. block size to fetch at a time.
from .utils import is_annex_dir_or_key
from .utils import AnnexDir, AnnexKey, is_annex_dir_or_key

lgr = logging.getLogger("datalad.fuse")

Expand Down Expand Up @@ -126,34 +126,48 @@ def getattr(self, path, fh=None):
else:
iadok = is_annex_dir_or_key(path)
if iadok is not None:
topdir, dir_or_key = iadok
if dir_or_key == "key":
# needs to be open but it is a key. We will let fsspec
# to handle it
pass
elif dir_or_key == "dir":
if isinstance(iadok, AnnexKey):
if iadok.size is not None:
lgr.debug("Got size from key")
r = mkstat(
is_file=True,
size=iadok.size,
timestamp=self._adapter.get_commit_datetime(path),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't we under .git/annex/objects here and thus the commit date wouldn't really be pertinent to that key file ? then let's just use some arbitrary timestamp -- e.g. fixed timestamp on when we started this fusefs instance. Should help us to save some cpu cycles

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commit date is cached when the adapter for the (sub)dataset is created [link], so there aren't many cycles to save.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it still needs to do some traversal to figure out the top of the dataset right? indeed might be negligible though

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's proceed for now as is, and optimize if we see it adds penalty

)
else:
# needs to be open but it is a key. We will let
# fsspec handle it
pass
elif isinstance(iadok, AnnexDir):
# just return that one of the top directory
# TODO: cache this since would be a frequent operation
r = self._filter_stat(os.stat(topdir))
r = self._filter_stat(os.stat(iadok.topdir))
else:
raise AssertionError(f"Unexpected dir_or_key: {dir_or_key!r}")
raise AssertionError(f"Unexpected iadok: {iadok!r}")
elif self.is_under_git(path):
lgr.debug("Path under .git does not exist; raising ENOENT")
raise FuseOSError(ENOENT)
if r is None:
fsspec_file = None
if fh and fh >= self._counter_offset:
lgr.debug("File already open")
fsspec_file = self._fhdict[fh]
to_close = False
else:
# TODO: it is expensive to open each file just for `getattr`!
# We should just fabricate stats from the key here or not even
# bother???!
lgr.debug("File not already open")
with self.rwlock:
fsspec_file = self._adapter.open(path)
to_close = True
if fsspec_file:
_, key = self._adapter.get_file_state(path)
if key.size is not None:
lgr.debug("Got size from key")
r = mkstat(
is_file=True,
size=key.size,
timestamp=self._adapter.get_commit_datetime(path),
)
else:
lgr.debug("File not already open")
with self.rwlock:
fsspec_file = self._adapter.open(path)
to_close = True
if fsspec_file is not None:
if isinstance(fsspec_file, io.BufferedIOBase):
# full file was already fetched locally
lgr.debug("File object is io.BufferedIOBase")
Expand All @@ -166,12 +180,6 @@ def getattr(self, path, fh=None):
if to_close:
with self.rwlock:
fsspec_file.close()
else:
# TODO: although seems to be logical -- seems to cause logging etc
# lgr.error("ENOENTing %s %s", path, fh)
# raise FuseOSError(ENOENT)
lgr.debug("File failed to open???")
r = {} # we have nothing to say. TODO: proper return/error?
lgr.debug("Returning %r for %s", r, path)
return r

Expand Down Expand Up @@ -377,16 +385,20 @@ def file_getattr(f, timestamp: datetime):
info = f.info()
except FileNotFoundError:
raise FuseOSError(ENOENT)
return mkstat(info["type"] == "file", info["size"], timestamp)


def mkstat(is_file: bool, size: int, timestamp: datetime) -> dict:
# TODO Also I get UID.GID funny -- yarik, not yoh
# get of the original symlink, so float it up!
data = {"st_uid": os.getuid(), "st_gid": os.getgid()}
if info["type"] != "file":
if not is_file:
data["st_mode"] = stat.S_IFDIR | 0o755
data["st_size"] = 0
data["st_blksize"] = 0
else:
data["st_mode"] = stat.S_IFREG | 0o644
data["st_size"] = info["size"]
data["st_size"] = size
data["st_blksize"] = 5 * 2**20
data["st_nlink"] = 1
data["st_atime"] = timestamp.timestamp()
Expand Down
1 change: 1 addition & 0 deletions datalad_fuse/tests/test_fuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def test_fuse(tmp_path, transparent, url_dataset):
with fusing(ds.path, tmp_path, transparent=transparent) as mount:
assert sorted(q.name for q in mount.iterdir()) == dots + sorted(data_files)
for fname, blob in data_files.items():
assert os.path.getsize(mount / fname) == len(blob)
assert (mount / fname).read_bytes() == blob


Expand Down
53 changes: 23 additions & 30 deletions datalad_fuse/tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,40 @@
from typing import Optional, Tuple
from __future__ import annotations

import pytest

from datalad_fuse.fsspec import filename2key
from datalad_fuse.utils import is_annex_dir_or_key
from datalad_fuse.utils import AnnexDir, AnnexKey, is_annex_dir_or_key

SAMPLE_KEY = "MD5E-s1064--8804d3d11f17e33bd912f1f0947afdb9.json"
URL_KEY = "URL--http&c%%127.0.0.1&c55485%binary.png"

SAMPLE_ANNEX_KEY = AnnexKey(
backend="MD5E",
size=1064,
name="8804d3d11f17e33bd912f1f0947afdb9",
suffix=".json",
)

URL_ANNEX_KEY = AnnexKey(backend="URL", name="http://127.0.0.1:55485/binary.png")


@pytest.mark.parametrize(
"path,expected",
[
(f".git/annex/objects/p0/4v/{SAMPLE_KEY}/{SAMPLE_KEY}", (".", "key")),
(f".git/annex/objects/p2/pX/{URL_KEY}/{URL_KEY}", (".", "key")),
(f".git/annex/objects/p0/4v/{SAMPLE_KEY}/", (".", "dir")),
(f".git/annex/objects/p0/4v/{SAMPLE_KEY}", (".", "dir")),
(".git/annex/objects/p0/4v", (".", "dir")),
(f".git/annex/objects/p0/4v/{SAMPLE_KEY}/{SAMPLE_KEY}", SAMPLE_ANNEX_KEY),
(f".git/annex/objects/p2/pX/{URL_KEY}/{URL_KEY}", URL_ANNEX_KEY),
(f".git/annex/objects/p0/4v/{SAMPLE_KEY}/", AnnexDir(".")),
(f".git/annex/objects/p0/4v/{SAMPLE_KEY}", AnnexDir(".")),
(".git/annex/objects/p0/4v", AnnexDir(".")),
(
f"some/project/.git/annex/objects/p0/4v/{SAMPLE_KEY}/{SAMPLE_KEY}",
("some/project", "key"),
SAMPLE_ANNEX_KEY,
),
("some/project/.git/annex/objects/p0/4v", ("some/project", "dir")),
("some/project/.git/annex/objects/p0/4v", AnnexDir("some/project")),
(
f"/usr/src/project/.git/annex/objects/p0/4v/{SAMPLE_KEY}/{SAMPLE_KEY}",
("/usr/src/project", "key"),
SAMPLE_ANNEX_KEY,
),
("/usr/src/project/.git/annex/objects/p0/4v", ("/usr/src/project", "dir")),
("/usr/src/project/.git/annex/objects/p0/4v", AnnexDir("/usr/src/project")),
("foo.txt", None),
("foo.git/annex/objects/p0/4v", None),
("some/project/.git/refs/heads", None),
Expand All @@ -44,28 +52,13 @@
(
"some/project/.git/embedded/sub/.git/annex/objects/p0/4v/"
f"{SAMPLE_KEY}/{SAMPLE_KEY}",
("some/project/.git/embedded/sub", "key"),
SAMPLE_ANNEX_KEY,
),
(
"some/project/.git/embedded/sub/.git/annex/objects/p0/4v",
("some/project/.git/embedded/sub", "dir"),
AnnexDir("some/project/.git/embedded/sub"),
),
],
)
def test_is_annex_dir_or_key(path: str, expected: Optional[Tuple[str, str]]) -> None:
def test_is_annex_dir_or_key(path: str, expected: AnnexDir | AnnexKey | None) -> None:
assert is_annex_dir_or_key(path) == expected


@pytest.mark.parametrize(
"filename,key",
[
(
"URL--http&c%%127.0.0.1&c35401%text.txt",
"URL--http://127.0.0.1:35401/text.txt",
),
("foo&ac", "foo&c"),
("foo&a&s", "foo&%"),
],
)
def test_filename2key(filename: str, key: str) -> None:
assert filename2key(filename) == key
Loading