diff --git a/CHANGELOG.md b/CHANGELOG.md index 731d446f9..aee619e18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ Only write entries that are worth mentioning to users. ## Unreleased +- Shell: Fix file path completion missing shallow files due to slot limits by using a trie data structure with BFS traversal, prioritizing shallow paths + ## 1.16.0 (2026-02-27) - Web: Update ASCII logo banner to a new styled design diff --git a/docs/en/configuration/config-files.md b/docs/en/configuration/config-files.md index 4bff0a29a..3fb7524a8 100644 --- a/docs/en/configuration/config-files.md +++ b/docs/en/configuration/config-files.md @@ -1,4 +1,4 @@ -# Config Files +# Config files Kimi Code CLI uses configuration files to manage API providers, models, services, and runtime parameters, supporting both TOML and JSON formats. diff --git a/docs/en/release-notes/changelog.md b/docs/en/release-notes/changelog.md index 6b9f481bd..a6fbf5565 100644 --- a/docs/en/release-notes/changelog.md +++ b/docs/en/release-notes/changelog.md @@ -4,6 +4,8 @@ This page documents the changes in each Kimi Code CLI release. ## Unreleased +- Shell: Fix file path completion missing shallow files due to slot limits by using a trie data structure with BFS traversal, prioritizing shallow paths + ## 1.16.0 (2026-02-27) - Web: Update ASCII logo banner to a new styled design diff --git a/docs/zh/release-notes/changelog.md b/docs/zh/release-notes/changelog.md index 47432d74f..6f02c7b53 100644 --- a/docs/zh/release-notes/changelog.md +++ b/docs/zh/release-notes/changelog.md @@ -4,6 +4,8 @@ ## 未发布 +- Shell:修复文件路径补全因收集限制而遗漏浅层文件的问题,使用 Trie 数据结构和 BFS 遍历并优先扫描浅层路径 + ## 1.16.0 (2026-02-27) - Web:更新 ASCII Logo 横幅为新的样式设计 diff --git a/pyproject.toml b/pyproject.toml index ff25fa92b..b7b0b6899 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,113 +5,98 @@ description = "Kimi Code CLI is your next CLI agent." readme = "README.md" requires-python = ">=3.12" dependencies = [ - "agent-client-protocol==0.7.0", - "aiofiles>=24.0,<26.0", - "aiohttp==3.13.3", - "typer==0.21.1", - "kosong[contrib]==0.43.0", - # loguru stays >=0.6.0 because notify-py (via batrachian-toad) caps it at <=0.6.0 on 3.14+. - "loguru>=0.6.0,<0.8", - "prompt-toolkit==3.0.52", - "pillow==12.1.0", - "pyyaml==6.0.3", - "rich==14.2.0", - "ripgrepy==2.2.0", - "streamingjson==0.0.5", - "trafilatura==2.0.0", - # lxml is used by trafilatura/htmldate/justext; keep pinned for binary wheels. - "lxml==6.0.2", - "tenacity==9.1.2", - "fastmcp==2.12.5", - "pydantic==2.12.5", - "httpx[socks]==0.28.1", - "pykaos==0.7.0", - "batrachian-toad==0.5.23; python_version >= \"3.14\"", - "tomlkit==0.14.0", - "jinja2==3.1.6", - "pyobjc-framework-cocoa>=12.1 ; sys_platform == 'darwin'", - "fastapi>=0.115.0", - "uvicorn[standard]>=0.32.0", - "scalar-fastapi>=1.5.0", - "websockets>=14.0", - "keyring>=25.7.0", - "setproctitle>=1.3.0", + "agent-client-protocol==0.7.0", + "aiofiles>=24.0,<26.0", + "aiohttp==3.13.3", + "batrachian-toad==0.5.23; python_version >= \"3.14\"", + "fastapi>=0.115.0", + "fastmcp==2.12.5", + "httpx[socks]==0.28.1", + "jinja2==3.1.6", + "keyring>=25.7.0", + "kosong[contrib]==0.43.0", + # loguru stays >=0.6.0 because notify-py (via batrachian-toad) caps it at <=0.6.0 on 3.14+. + "loguru>=0.6.0,<0.8", + # lxml is used by trafilatura/htmldate/justext; keep pinned for binary wheels. + "lxml==6.0.2", + "pillow==12.1.0", + "prompt-toolkit==3.0.52", + "pydantic==2.12.5", + "pykaos==0.7.0", + "pyobjc-framework-cocoa>=12.1 ; sys_platform == 'darwin'", + "pyyaml==6.0.3", + "rich==14.2.0", + "ripgrepy==2.2.0", + "scalar-fastapi>=1.5.0", + "setproctitle>=1.3.0", + "streamingjson==0.0.5", + "tenacity==9.1.2", + "tomlkit==0.14.0", + "trafilatura==2.0.0", + "typer==0.21.1", + "uvicorn[standard]>=0.32.0", + "websockets>=14.0", ] +[project.scripts] +kimi = "kimi_cli.cli:cli" +kimi-cli = "kimi_cli.cli:cli" + [dependency-groups] dev = [ - "pyinstaller==6.18.0", - "inline-snapshot[black]>=0.31.1", - "pyright>=1.1.407", - "ty>=0.0.9", - "pytest>=9.0.2", - "pytest-asyncio>=1.3.0", - "ruff>=0.14.10", + "inline-snapshot[black]>=0.31.1", + "pyinstaller==6.18.0", + "pyright>=1.1.407", + "pytest>=9.0.2", + "pytest-asyncio>=1.3.0", + "ruff>=0.14.10", + "ty>=0.0.9", ] [build-system] requires = ["uv_build>=0.8.5,<0.10.0"] build-backend = "uv_build" -[tool.uv.build-backend] -module-name = ["kimi_cli"] -source-exclude = ["examples/**/*", "tests/**/*", "src/kimi_cli/deps/**/*"] - -[tool.uv.workspace] -members = [ - "packages/kosong", - "packages/kaos", - "packages/kimi-code", - "sdks/kimi-sdk", +[tool.pyright] +typeCheckingMode = "standard" +pythonVersion = "3.14" +include = [ + "src/**/*.py", + "tests/**/*.py", + "tests_ai/scripts/**/*.py", + "tests_e2e/**/*.py", ] - -[tool.uv.sources] -kosong = { workspace = true } -pykaos = { workspace = true } -kimi-cli = { workspace = true } - -[project.scripts] -kimi = "kimi_cli.cli:cli" -kimi-cli = "kimi_cli.cli:cli" +strict = ["src/kimi_cli/**/*.py"] [tool.ruff] line-length = 100 [tool.ruff.lint] select = [ - "E", # pycodestyle - "F", # Pyflakes - "UP", # pyupgrade - "B", # flake8-bugbear - "SIM", # flake8-simplify - "I", # isort + "E", # pycodestyle + "F", # Pyflakes + "UP", # pyupgrade + "B", # flake8-bugbear + "SIM", # flake8-simplify + "I", # isort ] [tool.ruff.lint.per-file-ignores] "tests/**/*.py" = ["E501"] "tests_e2e/**/*.py" = ["E501"] -"src/kimi_cli/web/api/**/*.py" = ["B008"] # FastAPI Depends() is standard usage - -[tool.pyright] -typeCheckingMode = "standard" -pythonVersion = "3.14" -include = [ - "src/**/*.py", - "tests/**/*.py", - "tests_ai/scripts/**/*.py", - "tests_e2e/**/*.py", -] -strict = ["src/kimi_cli/**/*.py"] +"src/kimi_cli/web/api/**/*.py" = [ + "B008" +] # FastAPI Depends() is standard usage [tool.ty.environment] python-version = "3.14" [tool.ty.src] include = [ - "src/**/*.py", - "tests/**/*.py", - "tests_ai/scripts/**/*.py", - "tests_e2e/**/*.py", + "src/**/*.py", + "tests/**/*.py", + "tests_ai/scripts/**/*.py", + "tests_e2e/**/*.py", ] [tool.typos.files] @@ -122,3 +107,20 @@ datas = "datas" Seeked = "Seeked" seeked = "seeked" iterm = "iterm" + +[tool.uv.build-backend] +module-name = ["kimi_cli"] +source-exclude = ["examples/**/*", "tests/**/*", "src/kimi_cli/deps/**/*"] + +[tool.uv.workspace] +members = [ + "packages/kosong", + "packages/kaos", + "packages/kimi-code", + "sdks/kimi-sdk", +] + +[tool.uv.sources] +kosong = { workspace = true } +pykaos = { workspace = true } +kimi-cli = { workspace = true } diff --git a/src/kimi_cli/ui/shell/prompt.py b/src/kimi_cli/ui/shell/prompt.py index 1c0c9471f..0717e61c9 100644 --- a/src/kimi_cli/ui/shell/prompt.py +++ b/src/kimi_cli/ui/shell/prompt.py @@ -4,7 +4,6 @@ import base64 import json import mimetypes -import os import re import time from collections import deque @@ -14,7 +13,7 @@ from enum import Enum from hashlib import md5, sha256 from io import BytesIO -from pathlib import Path +from pathlib import Path, PurePath from typing import Any, Literal, override from kaos.path import KaosPath @@ -47,6 +46,7 @@ from kimi_cli.utils.clipboard import grab_image_from_clipboard, is_clipboard_available from kimi_cli.utils.logging import logger from kimi_cli.utils.media_tags import wrap_media_part +from kimi_cli.utils.path import PathTrie from kimi_cli.utils.slashcmd import SlashCommand from kimi_cli.utils.string import random_string from kimi_cli.wire.types import ContentPart, ImageURLPart, TextPart @@ -220,10 +220,10 @@ def __init__( self._refresh_interval = refresh_interval self._limit = limit self._cache_time: float = 0.0 - self._cached_paths: list[str] = [] - self._top_cache_time: float = 0.0 - self._top_cached_paths: list[str] = [] + self._cached_paths: tuple[str, ...] = () + self._cached_fragment: str = "" # Track which fragment was used for cache self._fragment_hint: str | None = None + self._trie: PathTrie | None = None self._word_completer = WordCompleter( self._get_paths, @@ -245,76 +245,69 @@ def _is_ignored(cls, name: str) -> bool: return True return bool(cls._IGNORED_PATTERNS.fullmatch(name)) - def _get_paths(self) -> list[str]: - fragment = self._fragment_hint or "" - if "/" not in fragment and len(fragment) < 3: - return self._get_top_level_paths() - return self._get_deep_paths() + def _get_or_create_trie(self, refresh: bool = False) -> PathTrie: + """Get or create the path trie. - def _get_top_level_paths(self) -> list[str]: - now = time.monotonic() - if now - self._top_cache_time <= self._refresh_interval: - return self._top_cached_paths + Args: + refresh: If True, recreate the trie to pick up filesystem changes. + """ + if self._trie is None or refresh: + self._trie = PathTrie(self._root, self._is_ignored, self._limit) + return self._trie - entries: list[str] = [] - try: - for entry in sorted(self._root.iterdir(), key=lambda p: p.name): - name = entry.name - if self._is_ignored(name): - continue - entries.append(f"{name}/" if entry.is_dir() else name) - if len(entries) >= self._limit: - break - except OSError: - return self._top_cached_paths + def _get_paths(self) -> tuple[str, ...]: + """Get paths based on cached fragment depth. - self._top_cached_paths = entries - self._top_cache_time = now - return self._top_cached_paths - - def _get_deep_paths(self) -> list[str]: + If fragment contains "/", we collect deeper paths. + """ now = time.monotonic() - if now - self._cache_time <= self._refresh_interval: - return self._cached_paths - - paths: list[str] = [] - try: - for current_root, dirs, files in os.walk(self._root): - relative_root = Path(current_root).relative_to(self._root) - - # Prevent descending into ignored directories. - dirs[:] = sorted(d for d in dirs if not self._is_ignored(d)) + fragment = self._fragment_hint or "" - if relative_root.parts and any( - self._is_ignored(part) for part in relative_root.parts - ): - dirs[:] = [] - continue + # Invalidate cache if fragment changed (depth requirement changed) + # or if cache expired + cache_expired = now - self._cache_time > self._refresh_interval + cache_valid = not cache_expired and fragment == self._cached_fragment + if cache_valid: + return self._cached_paths - if relative_root.parts: - paths.append(relative_root.as_posix() + "/") - if len(paths) >= self._limit: - break + # Refresh trie if cache expired to pick up filesystem changes + trie = self._get_or_create_trie(refresh=cache_expired) - for file_name in sorted(files): - if self._is_ignored(file_name): - continue - relative = (relative_root / file_name).as_posix() - if not relative: - continue - paths.append(relative) - if len(paths) >= self._limit: - break + # Calculate required depth from fragment + # When user types "/", they are navigating into a directory, + # so we need to ensure deeper paths are collected + frag_path = PurePath(fragment) + frag_depth = len(frag_path.parts) if frag_path.parts else 0 - if len(paths) >= self._limit: - break - except OSError: - return self._cached_paths + if "/" in fragment: + # User is navigating: expand to include deeper paths + required_depth = 2 + frag_depth + paths = trie.get_paths(max_depth=required_depth) + else: + # No specific depth required: return all collected paths + paths = trie.get_paths() - self._cached_paths = paths + self._cached_paths = self._format_paths(paths) + self._cached_fragment = fragment self._cache_time = now return self._cached_paths + def _format_paths(self, paths: Iterable[PurePath]) -> tuple[str, ...]: + """Format Path objects as strings, adding trailing slash for directories. + + The result is meant for completer. Uses forward slashes for cross-platform + consistency since users type "/" even on Windows. + """ + trie = self._trie + result: list[str] = [] + for p in paths: + path_str = p.as_posix() + # Check if it's a directory by looking up in the trie + if trie is not None and trie.is_directory(p): + path_str += "/" + result.append(path_str) + return tuple(result) + @staticmethod def _extract_fragment(text: str) -> str | None: index = text.rfind("@") @@ -361,14 +354,16 @@ def get_completions( candidates = list(self._fuzzy.get_completions(mention_doc, complete_event)) # re-rank: prefer basename matches - frag_lower = fragment.lower() + frag_path = Path(fragment) + frag_name = frag_path.name.lower() def _rank(c: Completion) -> tuple[int, ...]: - path = c.text - base = path.rstrip("/").split("/")[-1].lower() - if base.startswith(frag_lower): + # Completion text uses forward slashes, create Path for parsing + path = Path(c.text) + base = path.name.lower() + if base.startswith(frag_name): cat = 0 - elif frag_lower in base: + elif frag_name in base: cat = 1 else: cat = 2 diff --git a/src/kimi_cli/utils/path.py b/src/kimi_cli/utils/path.py index 335b1121b..0b5dcc146 100644 --- a/src/kimi_cli/utils/path.py +++ b/src/kimi_cli/utils/path.py @@ -3,7 +3,8 @@ import asyncio import os import re -from collections.abc import Sequence +from collections import deque +from collections.abc import Callable, Sequence from pathlib import Path, PurePath from stat import S_ISDIR @@ -125,3 +126,244 @@ def is_within_workspace( if is_within_directory(path, work_dir): return True return any(is_within_directory(path, d) for d in additional_dirs) + + +class PathTrieNode: + """A node in the path trie representing a directory entry. + + Each node tracks: + - name: the entry name (basename, used as dict key in parent) + - full_path: the relative path from root to this node (as PurePath) + - is_dir: whether this is a directory + - children: child nodes (lazily populated) + - visited: whether this node's children have been scanned + """ + + name: str + full_path: PurePath + is_dir: bool + children: dict[str, PathTrieNode] | None + visited: bool + + __slots__ = ("name", "full_path", "is_dir", "children", "visited") + + def __init__(self, name: str, full_path: PurePath, is_dir: bool = False) -> None: + self.name = name + self.full_path = full_path + self.is_dir = is_dir + self.children = None + self.visited = False + + def get_or_create_child(self, name: str, is_dir: bool = False) -> PathTrieNode: + """Get or create a child node with the given name. + + The child's full_path is computed as parent.full_path / name. + """ + if self.children is None: + self.children = {} + if name not in self.children: + child_full_path = self.full_path / name + self.children[name] = PathTrieNode(name, child_full_path, is_dir) + return self.children[name] + + +class PathTrie: + """A trie data structure for storing and incrementally collecting file paths. + + The trie is built lazily by depth levels: + - Initially collects paths up to FIRST_STAGE_LIMIT (to ensure shallow files are included) + - When user types "/", deeper levels are scanned on demand + - BFS ensures shallow paths are collected before deep-nested ones, preventing + shallow files from being missed due to slot limits + """ + + # Number of paths to collect initially: ensures shallow files are included while + # still collecting deep enough to provide a pool for fuzzy matching. + FIRST_STAGE_LIMIT: int = 200 + + root: Path + check_ignored: Callable[[str], bool] + limit: int + root_node: PathTrieNode + collected_paths: list[PurePath] # All paths collected so far via BFS traversal + _dirs_to_scan: deque[tuple[PathTrieNode, int]] # (node, depth) queue for BFS traversal + reach_depth: int # Maximum depth level that has been completely collected + + def __init__(self, root: Path, check_ignored: Callable[[str], bool], limit: int) -> None: + self.root = root + self.check_ignored = check_ignored + self.limit = limit + self.root_node = PathTrieNode("", PurePath(""), is_dir=True) + self.root_node.visited = True # Root is always "visited" + self.collected_paths = [] + # BFS queue of (node, depth) tuples for incremental directory scanning. + # Front: directories at the current depth being processed. + # Back: deeper directories discovered from scanning the front. + self._dirs_to_scan = deque([(self.root_node, 0)]) + self.reach_depth = -1 # No levels collected yet + + def depth_of(self, path: PurePath) -> int: + """Calculate depth of a path (root = 0, direct children = 1, etc.).""" + return len(path.parts) + + def scan_node(self, node: PathTrieNode) -> None: + """Scan a directory node and populate its children.""" + if node.children is not None: + return # Already scanned + + # Build the absolute path for this node (use Path for filesystem access) + # root is Path, full_path is PurePath, / operator handles the conversion + abs_path = self.root / node.full_path + + try: + # Sort entries for deterministic ordering + entries = tuple(sorted(abs_path.iterdir(), key=lambda p: p.name)) + except OSError: + return + + for entry in entries: + name = entry.name + if self.check_ignored(name): + continue + + # Check limit before processing more entries + if len(self.collected_paths) >= self.limit: + break + + try: + is_dir = entry.is_dir(follow_symlinks=False) + except OSError: + continue # Skip entries we can't stat + + child = node.get_or_create_child(name, is_dir) + + # Add to collection + self.collected_paths.append(child.full_path) + + def collect_to_depth(self, target_depth: int) -> None: + """Collect all paths up to and including target_depth. + + Each call processes one BFS level at a time. + """ + while self._dirs_to_scan and self.reach_depth < target_depth: + # Process all nodes at the current front depth level + _n, current_depth = self._dirs_to_scan[0] + if current_depth > target_depth: + break + + # Collect all nodes at this depth level + # Count nodes at current depth (stop early if limit already reached) + level_size = 0 + for _n, d in self._dirs_to_scan: + if d != current_depth: + break + if len(self.collected_paths) >= self.limit: + return + level_size += 1 + + for _i in range(level_size): + if not self._dirs_to_scan: + break + node, depth = self._dirs_to_scan.popleft() + + if depth == 0: + # Root node - scan it directly (depth 0 is root, children will be depth 1) + self.scan_node(node) + # Check limit after scanning to stop early + if len(self.collected_paths) >= self.limit: + break + if node.children: + for child in sorted(node.children.values(), key=lambda c: c.name): + if child.is_dir: + self._dirs_to_scan.append((child, depth + 1)) + elif node.is_dir and not node.visited: + node.visited = True + self.scan_node(node) + # Check limit after scanning to stop early + if len(self.collected_paths) >= self.limit: + break + # Add subdirectories to the queue for next BFS level + if node.children: + for child in sorted(node.children.values(), key=lambda c: c.name): + if child.is_dir: + self._dirs_to_scan.append((child, depth + 1)) + + self.reach_depth = current_depth + + # Check limit after each level + if len(self.collected_paths) >= self.limit: + break + + def ensure_depth(self, min_depth: int) -> None: + """Ensure paths are collected up to min_depth.""" + if min_depth > self.reach_depth: + self.collect_to_depth(min_depth) + + def collect_first_stage(self) -> None: + """Collect paths until we have at least FIRST_STAGE_LIMIT paths. + + Collects level by level (BFS) until the limit is reached. + Respects the hard limit (self.limit). + """ + while ( + self._dirs_to_scan + and len(self.collected_paths) < self.FIRST_STAGE_LIMIT + and len(self.collected_paths) < self.limit + ): + # Process next level + _n, target_depth = self._dirs_to_scan[0] + self.collect_to_depth(target_depth) + + def get_paths(self, max_depth: int | None = None) -> tuple[PurePath, ...]: + """Get collected paths up to max_depth. + + If max_depth is None, collects until FIRST_STAGE_LIMIT is reached + (to ensure shallow files are included), then returns all collected paths. + """ + if max_depth is None: + # No specific depth required: ensure shallow files are included + self.collect_first_stage() + return tuple(self.collected_paths[: self.limit]) + # Navigation mode: ensure specific depth + self.ensure_depth(max_depth) + # Collect paths up to max_depth, breaking early when limit is reached + # BFS orders paths by depth, so we stop when depth exceeds max_depth + result: list[PurePath] = [] + for p in self.collected_paths: + if self.depth_of(p) > max_depth: + break # BFS guarantees deeper paths come after; we're done + result.append(p) + if len(result) >= self.limit: + break + return tuple(result) + + def get_top_level_paths(self) -> tuple[PurePath, ...]: + """Get only top-level paths (direct children of root, depth 1).""" + self.ensure_depth(1) + # Collect depth-1 paths only, breaking early when limit is reached + # Since BFS orders paths by depth, depth-1 paths are contiguous at the start + result: list[PurePath] = [] + for p in self.collected_paths: + if self.depth_of(p) != 1: + break # BFS guarantees depth-1 paths come first; we're done + result.append(p) + if len(result) >= self.limit: + break + return tuple(result) + + def is_directory(self, path: PurePath) -> bool: + """Check if a path is a directory by looking it up in the trie.""" + if not path.parts: + return True # Root is a directory + + # Navigate the trie to find the node + node = self.root_node + for part in path.parts: + if node.children is None or part not in node.children: + # Not found in trie, fall back to filesystem check + try: + return (self.root / path).is_dir() + except OSError: + return False + node = node.children[part] + return node.is_dir diff --git a/tests/ui_and_conv/test_path_trie.py b/tests/ui_and_conv/test_path_trie.py new file mode 100644 index 000000000..4c552fd69 --- /dev/null +++ b/tests/ui_and_conv/test_path_trie.py @@ -0,0 +1,306 @@ +"""Tests for the PathTrie incremental file collection.""" + +from __future__ import annotations + +from pathlib import Path, PurePath + +from kimi_cli.utils.path import PathTrie, PathTrieNode + + +def _is_ignored(name: str) -> bool: + """Simple ignore function for testing.""" + return name.startswith(".") or name == "__pycache__" + + +# ============================================================================= +# PathTrieNode tests +# ============================================================================= + + +def test_node_creation(): + """Node stores name, full_path, and is_dir correctly.""" + node = PathTrieNode("src", PurePath("src"), is_dir=True) + assert node.name == "src" + assert node.full_path == PurePath("src") + assert node.is_dir is True + assert node.children is None + assert node.visited is False + + +def test_get_or_create_child(): + """Child creation builds correct full_path.""" + root = PathTrieNode("", PurePath(""), is_dir=True) + child = root.get_or_create_child("src", is_dir=True) + + assert child.name == "src" + assert child.full_path == PurePath("src") + assert child.is_dir is True + + # Nested child + grandchild = child.get_or_create_child("kimi", is_dir=False) + assert grandchild.name == "kimi" + assert grandchild.full_path == PurePath("src/kimi") + assert grandchild.is_dir is False + + +def test_get_existing_child(): + """Getting existing child returns same instance.""" + root = PathTrieNode("", PurePath(""), is_dir=True) + child1 = root.get_or_create_child("src", is_dir=True) + child2 = root.get_or_create_child("src", is_dir=True) + + assert child1 is child2 + + +# ============================================================================= +# PathTrie basic tests +# ============================================================================= + + +def test_empty_directory(tmp_path: Path): + """Trie works with empty root directory.""" + trie = PathTrie(tmp_path, _is_ignored, limit=100) + paths = trie.get_paths() + assert paths == () + + +def test_single_file(tmp_path: Path): + """Single file is collected.""" + (tmp_path / "file.txt").write_text("content") + trie = PathTrie(tmp_path, _is_ignored, limit=100) + paths = trie.get_paths() + + assert len(paths) == 1 + assert paths[0] == PurePath("file.txt") + + +def test_single_directory(tmp_path: Path): + """Single directory is collected.""" + (tmp_path / "src").mkdir() + trie = PathTrie(tmp_path, _is_ignored, limit=100) + paths = trie.get_paths() + + assert paths == (PurePath("src"),) + + +# ============================================================================= +# BFS ordering tests +# ============================================================================= + + +def test_shallow_paths_first(tmp_path: Path): + """BFS ensures shallow paths are collected before deep ones.""" + # Create structure: a/b/c/d/e (deep) and x/y (shallow) + (tmp_path / "a" / "b" / "c" / "d" / "e").mkdir(parents=True) + (tmp_path / "x" / "y").mkdir(parents=True) + + trie = PathTrie(tmp_path, _is_ignored, limit=100) + paths = trie.get_paths() + + # Verify BFS order: depth 1 (a, x) before depth 2 (a/b, x/y) + expected = ( + PurePath("a"), + PurePath("x"), + PurePath("a/b"), + PurePath("x/y"), + ) + assert paths[:4] == expected + + +def test_breadth_before_depth(tmp_path: Path): + """Verify specific BFS order: siblings before children.""" + # Create: src/ (with file.py), tests/ (with test.py) + # BFS should collect: src, tests, src/file.py, tests/test.py + (tmp_path / "src").mkdir() + (tmp_path / "src" / "file.py").write_text("") + (tmp_path / "tests").mkdir() + (tmp_path / "tests" / "test.py").write_text("") + + trie = PathTrie(tmp_path, _is_ignored, limit=100) + paths = trie.get_paths() + + names = [str(p) for p in paths] + + # Both depth-1 items should come before depth-2 items + depth1_items = ["src", "tests"] + depth2_items = ["src/file.py", "tests/test.py"] + + max_depth1_pos = max(names.index(n) for n in depth1_items) + min_depth2_pos = min(names.index(n) for n in depth2_items) + + assert max_depth1_pos < min_depth2_pos, "Depth-1 items should all appear before depth-2 items" + + +# ============================================================================= +# Incremental depth tests +# ============================================================================= + + +def test_initial_depth_limit(tmp_path: Path): + """Initially only collects up to initial_depth (2 levels).""" + # Create: a/b/c/d (4 levels deep) + (tmp_path / "a" / "b" / "c" / "d").mkdir(parents=True) + + trie = PathTrie(tmp_path, _is_ignored, limit=100) + paths = trie.get_paths() + + assert PurePath("a") in paths + assert PurePath("a/b") in paths + + +def test_incremental_expansion(tmp_path: Path): + """Deeper paths are collected on demand via ensure_depth.""" + # Create: a/b/c/d (4 levels deep) + (tmp_path / "a" / "b" / "c" / "d").mkdir(parents=True) + + trie = PathTrie(tmp_path, _is_ignored, limit=100) + + # Initially only depth 1-2 + paths = trie.get_paths(max_depth=2) + assert PurePath("a") in paths + assert PurePath("a/b") in paths + assert PurePath("a/b/c") not in paths + + # Expand to depth 3 + trie.ensure_depth(3) + paths = trie.get_paths(max_depth=3) + assert PurePath("a/b/c") in paths + assert PurePath("a/b/c/d") not in paths + + # Expand to depth 4 + trie.ensure_depth(4) + paths = trie.get_paths(max_depth=4) + assert PurePath("a/b/c/d") in paths + + +def test_get_top_level_paths(tmp_path: Path): + """get_top_level_paths returns only depth 1 items.""" + (tmp_path / "src").mkdir() + (tmp_path / "src" / "file.py").write_text("") + (tmp_path / "README.md").write_text("") + + trie = PathTrie(tmp_path, _is_ignored, limit=100) + top_level = trie.get_top_level_paths() + + assert len(top_level) == 2 + assert PurePath("src") in top_level + assert PurePath("README.md") in top_level + assert PurePath("src/file.py") not in top_level + + +# ============================================================================= +# Limit enforcement tests +# ============================================================================= + + +def test_limit_respected(tmp_path: Path): + """Total paths collected respects the limit.""" + for i in range(20): + (tmp_path / f"file{i}.txt").write_text("") + + trie = PathTrie(tmp_path, _is_ignored, limit=10) + paths = trie.get_paths() + + assert len(paths) <= 10 + + +def test_limit_preserves_shallow_paths(tmp_path: Path): + """When limit hits, shallow paths are preserved over deep ones.""" + # Structure: a/b/c0../c9/deep.txt (deep), plus x, y (shallow) + # With limit=4, BFS collects: a, x, y, a/b + # Deep paths (a/b/c*) should NOT be included + (tmp_path / "a" / "b" / "c0").mkdir(parents=True) + for i in range(10): + (tmp_path / "a" / "b" / f"c{i}").mkdir(exist_ok=True) + (tmp_path / "a" / "b" / f"c{i}" / "deep.txt").write_text("") + + (tmp_path / "x").mkdir() + (tmp_path / "y").mkdir() + + limit = 4 + trie = PathTrie(tmp_path, _is_ignored, limit=limit) + paths = trie.get_paths() + + path_strs = [str(p) for p in paths] + # Verify shallow paths are included + assert "a" in path_strs + assert "x" in path_strs + assert "y" in path_strs + assert "a/b" in path_strs + # Verify deep paths are excluded due to limit + assert not any("a/b/c" in p for p in path_strs) + + +# ============================================================================= +# is_directory tests +# ============================================================================= + + +def test_is_directory_for_file(tmp_path: Path): + """is_directory returns False for files.""" + (tmp_path / "file.txt").write_text("content") + trie = PathTrie(tmp_path, _is_ignored, limit=100) + trie.get_paths() # Populate + + assert trie.is_directory(PurePath("file.txt")) is False + + +def test_is_directory_for_directory(tmp_path: Path): + """is_directory returns True for directories.""" + (tmp_path / "src").mkdir() + trie = PathTrie(tmp_path, _is_ignored, limit=100) + trie.get_paths() # Populate + + assert trie.is_directory(PurePath("src")) is True + + +def test_is_directory_for_root(tmp_path: Path): + """is_directory returns True for root (empty path).""" + trie = PathTrie(tmp_path, _is_ignored, limit=100) + + assert trie.is_directory(PurePath("")) is True + + +def test_is_directory_for_unknown_path(tmp_path: Path): + """is_directory falls back to filesystem check for unknown paths.""" + trie = PathTrie(tmp_path, _is_ignored, limit=100) + + (tmp_path / "unknown").mkdir() + assert trie.is_directory(PurePath("unknown")) is True + + +# ============================================================================= +# Ignored patterns tests +# ============================================================================= + + +def test_ignored_names_not_collected(tmp_path: Path): + """Ignored names are not added to the trie.""" + (tmp_path / "src").mkdir() + (tmp_path / ".hidden").mkdir() + (tmp_path / "__pycache__").mkdir() + + trie = PathTrie(tmp_path, _is_ignored, limit=100) + paths = trie.get_paths() + + assert PurePath("src") in paths + assert PurePath(".hidden") not in paths + assert PurePath("__pycache__") not in paths + + +def test_ignored_patterns_not_descended(tmp_path: Path): + """Ignored directories are not scanned for children.""" + pkg_name = "elephant" + js_pkg_dir = tmp_path / "node_modules" / pkg_name + js_pkg_dir.mkdir(parents=True) + sample_code = "console.log('Hello')" + (js_pkg_dir / "file.js").write_text(sample_code) + + def ignore_node_modules(name: str) -> bool: + return name == "node_modules" + + trie = PathTrie(tmp_path, ignore_node_modules, limit=100) + paths = trie.get_paths() + + assert PurePath(f"node_modules/{pkg_name}") not in paths + assert PurePath(f"node_modules/{pkg_name}/file.js") not in paths