Skip to content
244 changes: 198 additions & 46 deletions mcpunk/file_breakdown.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
import logging
import math
import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from threading import Lock, Timer
from typing import Literal

from git import Repo
from pydantic import (
BaseModel,
)
from watchdog.events import (
DirCreatedEvent,
DirDeletedEvent,
DirModifiedEvent,
FileCreatedEvent,
FileDeletedEvent,
FileModifiedEvent,
FileSystemEventHandler,
)
from watchdog.observers import Observer

from mcpunk.file_chunk import Chunk, ChunkCategory
from mcpunk.file_chunkers import (
Expand All @@ -27,8 +42,122 @@
logger = logging.getLogger(__name__)


class NoGitRepoError(Exception):
pass
class _ProjectFileHandler(FileSystemEventHandler):
def __init__(
self,
project: "Project",
file_watch_refresh_freq_seconds: float = 0.1,
) -> None:
self.project = project
self._project_lock = Lock()

self._paths_pending_refresh: set[Path] = set()
self._paths_pending_refresh_lock = Lock()

self._refresh_freq_sec = file_watch_refresh_freq_seconds
self._timer: Timer | None = None
self._schedule_refresh()

def _schedule_refresh(self) -> None:
if self._timer is not None:
self._timer.cancel()
self._timer = Timer(self._refresh_freq_sec, self._refresh_paths)
self._timer.daemon = True
self._timer.start()

def _refresh_paths(self) -> None:
with self._project_lock:
try:
with self._paths_pending_refresh_lock:
paths_pending_refresh = self._paths_pending_refresh.copy()
self._paths_pending_refresh.clear()
if paths_pending_refresh:
logger.info(f"Refreshing {len(paths_pending_refresh)} paths")
_paths_fmt = "\n\t".join(str(x) for x in paths_pending_refresh)
logger.debug(f"Refreshing\n\t{_paths_fmt}")

paths_to_delete = {x for x in paths_pending_refresh if not x.exists()}
for p in paths_to_delete:
if p.absolute() in self.project.file_map:
del self.project.file_map[p.absolute()]

dir_paths = {x for x in paths_pending_refresh if x.exists() and x.is_dir()}

paths_to_really_refresh = paths_pending_refresh - dir_paths - paths_to_delete
self.project.load_files(list(paths_to_really_refresh))
except Exception:
logger.exception("Error refreshing paths")
finally:
self._schedule_refresh()

def _path_event(
self,
path: Path,
*,
action: Literal["modified", "created", "deleted"],
) -> None:
path = path.absolute()

logger.debug(f"watchdog says {action}: {path}")
if self._should_process(path):
logger.debug(f"New path pending refresh: {path}")
with self._paths_pending_refresh_lock:
self._paths_pending_refresh.add(path)
else:
logger.debug(f"Ignoring {action} for {path}")

def _should_process(self, path: Path) -> bool:
if self.project.git_repo is None:
return True

assert path.is_absolute()

# We don't want to exclude non-existent files, as they may have been deleted.

if path.exists() and path.is_dir():
return False

# Special case this, as we def don't want it and it seems that
# `git check-ignore` doesn't consider it as ignored.
if path.is_relative_to(self.project.root / ".git"):
return False

try:
rel_path = str(path.relative_to(self.project.root))
t1 = time.monotonic()
check_ignore_res: str = self.project.git_repo.git.execute( # type: ignore[call-overload]
["git", "check-ignore", str(rel_path)],
with_exceptions=False,
)
logger.debug(f"git check-ignore took {(time.monotonic() - t1) * 1000:.4f}ms")
return check_ignore_res == ""
except Exception:
logger.exception(f"Error checking git ignore for {path}")
return False

def on_modified(self, event: FileModifiedEvent | DirModifiedEvent) -> None:
self._path_event(
Path(self._to_str(event.src_path)),
action="modified",
)

def on_created(self, event: FileCreatedEvent | DirCreatedEvent) -> None:
self._path_event(
Path(self._to_str(event.src_path)),
action="created",
)

def on_deleted(self, event: FileDeletedEvent | DirDeletedEvent) -> None:
self._path_event(
Path(self._to_str(event.src_path)),
action="deleted",
)

@staticmethod
def _to_str(s: str | bytes) -> str:
if isinstance(s, bytes):
return s.decode("utf-8")
return s


class File(BaseModel):
Expand Down Expand Up @@ -65,58 +194,87 @@ def chunks_of_type(self, chunk_type: ChunkCategory) -> list[Chunk]:
return [c for c in self.chunks if c.category == chunk_type]


class Project(BaseModel):
root: Path
files: list[File]
class Project:
def __init__(
self,
*,
root: Path,
files_per_parallel_worker: int = 100,
file_watch_refresh_freq_seconds: float = 0.1,
) -> None:
self.root = root.expanduser().absolute()
self.files_per_parallel_worker = files_per_parallel_worker
self.file_map: dict[Path, File] = {}

git_repo: Repo | None
if (root / ".git").exists():
git_repo = Repo(root / ".git")
else:
git_repo = None
self.git_repo = git_repo

self._init_from_root_dir(root)

# Note potential that if a file is modified here it won't be picked up.

self.observer = Observer()
self.observer.schedule(
event_handler=_ProjectFileHandler(
self,
file_watch_refresh_freq_seconds=file_watch_refresh_freq_seconds,
),
path=str(self.root),
recursive=True,
)
self.observer.start()

@property
def git_repo(self) -> Repo:
return _git_repo(self.root)
def files(self) -> list[File]:
return list(self.file_map.values())

@classmethod
def from_files(cls, root: Path, files: list[Path], max_workers: int | None = None) -> "Project":
files_analysed: list[File] = []
def load_files(self, files: list[Path]) -> None:
# How many workers to use?
_cpu_count = os.cpu_count() or 1
n_workers = math.floor(len(files) / self.files_per_parallel_worker)
n_workers = min(n_workers, _cpu_count // 2) # Avoid maxing out the system
n_workers = max(n_workers, 1)

with ProcessPoolExecutor(max_workers=max_workers) as executor:
# Submit all file analysis tasks
future_to_file = {
executor.submit(_analyze_file, file_path): file_path for file_path in files
}
files_analysed: list[File]
if n_workers == 1:
files_analysed_maybe_none = [_analyze_file(file_path) for file_path in files]
files_analysed = [x for x in files_analysed_maybe_none if x is not None]
else:
logger.info(f"Using {n_workers} workers to process {len(files)} files")
files_analysed = []
with ProcessPoolExecutor(max_workers=n_workers) as executor:
future_to_file = {
executor.submit(_analyze_file, file_path): file_path for file_path in files
}

for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
result = future.result()
if result is not None:
files_analysed.append(result)
except Exception:
logger.exception(f"File {file_path} generated an exception")
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
result = future.result()
if result is not None:
files_analysed.append(result)
except Exception:
logger.exception(f"File {file_path} generated an exception")

project = Project(
root=root,
files=files_analysed,
)
return project
for file in files_analysed:
self.file_map[file.abs_path] = file

@classmethod
def from_root_dir(cls, root: Path, max_workers: int | None = None) -> "Project":
def _init_from_root_dir(self, root: Path) -> None:
if not root.exists():
raise ValueError(f"Root directory {root} does not exist")

repo: Repo | None
try:
repo = _git_repo(root)
except NoGitRepoError:
repo = None

files: list[Path] = []
if repo is not None:
rel_paths = repo.git.ls_files().splitlines()
if self.git_repo is not None:
rel_paths = self.git_repo.git.ls_files().splitlines()
files.extend(root / rel_path for rel_path in rel_paths)
else:
# Exclude specific top-level directories
# TODO: make this configurable
ignore_dirs = {".venv", "build", ".git", "__pycache__"} # customize this set
ignore_dirs = {".venv", "build", ".git", "__pycache__"}

for path in root.iterdir():
if path.is_dir() and path.name not in ignore_dirs:
Expand All @@ -126,7 +284,7 @@ def from_root_dir(cls, root: Path, max_workers: int | None = None) -> "Project":
files.extend(root.glob("*"))

files = [file for file in files if file.is_file()]
return Project.from_files(root, files, max_workers=max_workers)
self.load_files(files)


def _analyze_file(file_path: Path) -> File | None:
Expand All @@ -142,9 +300,3 @@ def _analyze_file(file_path: Path) -> File | None:
except Exception:
logger.exception(f"Error processing file {file_path}")
return None


def _git_repo(root: Path) -> Repo:
if not (root / ".git").exists():
raise NoGitRepoError(f"No git repo found at {root}")
return Repo(root / ".git")
6 changes: 6 additions & 0 deletions mcpunk/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ class Settings(BaseSettings):
# will become available again for pickup.
task_queue_visibility_timeout_seconds: int = 300

# How long to wait between refreshing files modified on disk. This allows files
# to queue up and be refreshed in parallel if many are modified (e.g. switching
# branches), and it generally also avoids churn when e.g. an IDE creates temporary
# files during save (though this is not a guarantee).
file_watch_refresh_freq_seconds: float = 0.1

@property
def task_queue_visibility_timeout(self) -> timedelta:
return timedelta(seconds=self.task_queue_visibility_timeout_seconds)
Expand Down
10 changes: 9 additions & 1 deletion mcpunk/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from git import Repo
from pydantic import (
BaseModel,
ConfigDict,
Field,
model_validator,
)
Expand Down Expand Up @@ -52,6 +53,8 @@ class ToolProject(BaseModel):

chunk_project: FileBreakdownProject

model_config = ConfigDict(arbitrary_types_allowed=True)

@property
def root(self) -> pathlib.Path:
return self.chunk_project.root
Expand Down Expand Up @@ -252,7 +255,12 @@ def configure_project(
path = root_path.expanduser().absolute()
if project_name in PROJECTS:
raise ValueError(f"Project {project_name} already exists")
project = ToolProject(chunk_project=FileBreakdownProject.from_root_dir(path))
project = ToolProject(
chunk_project=FileBreakdownProject(
root=path,
file_watch_refresh_freq_seconds=deps.settings().file_watch_refresh_freq_seconds,
),
)
PROJECTS[project_name] = project
return MCPToolOutput(
text=(
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies = [
"sqlalchemy~=2.0.36",
"beautifulsoup4~=4.12.3",
"uvicorn>=0.34.0,<1.0.0",
"watchdog~=6.0.0",
]

[project.scripts]
Expand Down
33 changes: 32 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import hashlib
from collections.abc import Generator
from pathlib import Path

Expand All @@ -9,8 +10,9 @@


@pytest.fixture(scope="function", autouse=True)
def fresh_db(tmp_path: Path) -> Generator[None, None, None]:
def fresh_db(tmp_path_factory: pytest.TempPathFactory) -> Generator[None, None, None]:
"""Fiddle settings such that we have a fresh database"""
tmp_path = tmp_path_factory.mktemp("test")
settings = Settings(db_path=tmp_path / "test.db")
assert not settings.db_path.absolute().exists()
with deps.override(settings_partial=settings):
Expand All @@ -23,6 +25,35 @@ def fiddle_settings() -> Generator[None, None, None]:
"""Fiddle misc settings for consistency in testing."""
settings = Settings(
include_chars_in_response=False,
file_watch_refresh_freq_seconds=0.0,
)
with deps.override(settings_partial=settings):
yield


@pytest.fixture
def test_id(request: pytest.FixtureRequest) -> str:
"""A (probably) unique ID for each test"""
test_hash = hashlib.md5(request.node.nodeid.encode()).hexdigest() # noqa: S324
assert len(test_hash) == 32
return f"{request.node.name[:(60 - len(test_hash) - 1)]}_{test_hash}"


class FileSet:
def __init__(self, root: Path) -> None:
self.root = root

def add_file(self, fname: str, src: str) -> None:
new_path = self.root / fname
new_path.parent.mkdir(parents=True, exist_ok=True)
new_path.write_text(src)


@pytest.fixture
def basic_file_set(tmp_path_factory: pytest.TempPathFactory) -> FileSet:
"""A basic file set with two files."""
tmp_path = tmp_path_factory.mktemp("test")
fs = FileSet(tmp_path)
fs.add_file("a.py", "a=1")
fs.add_file("b.py", "b=2\ndef f(a: int=1):\n return a")
return fs
Loading