diff --git a/marimo/_utils/files.py b/marimo/_utils/files.py index 55e205c0ba0..9f3c7db7d5e 100644 --- a/marimo/_utils/files.py +++ b/marimo/_utils/files.py @@ -38,6 +38,19 @@ async def async_get_files(folder: str) -> AsyncGenerator[Path, None]: yield file_path +def _get_root(pattern: str) -> str: + sep = os.sep + root = "." + parts = pattern.split(sep) + for i, part in enumerate(parts): + if "*" in part or "?" in part: + root = sep.join(parts[:i]) if i > 0 else "." + break + elif os.path.isdir(sep.join(parts[: i + 1])): + root = sep.join(parts[: i + 1]) + return root + + def expand_file_patterns(file_patterns: tuple[str, ...]) -> list[Path]: """Expand file patterns to actual file paths. @@ -57,14 +70,7 @@ def expand_file_patterns(file_patterns: tuple[str, ...]) -> list[Path]: # Handle glob patterns by walking from root and filtering if "**" in pattern or "*" in pattern or "?" in pattern: # Extract root directory to walk from - root = "." - parts = pattern.split("/") - for i, part in enumerate(parts): - if "*" in part or "?" in part: - root = "/".join(parts[:i]) if i > 0 else "." - break - elif os.path.isdir("/".join(parts[: i + 1])): - root = "/".join(parts[: i + 1]) + root = _get_root(pattern) # Get all files from root and filter by pattern if os.path.isdir(root): @@ -111,14 +117,7 @@ async def async_expand_file_patterns( # Handle glob patterns by walking from root and filtering if "**" in pattern or "*" in pattern or "?" in pattern: # Extract root directory to walk from - root = "." - parts = pattern.split("/") - for i, part in enumerate(parts): - if "*" in part or "?" in part: - root = "/".join(parts[:i]) if i > 0 else "." - break - elif os.path.isdir("/".join(parts[: i + 1])): - root = "/".join(parts[: i + 1]) + root = _get_root(pattern) # Get all files from root and filter by pattern if os.path.isdir(root): diff --git a/tests/_lint/test_run_check.py b/tests/_lint/test_run_check.py index 4c1049d5077..a08907523aa 100644 --- a/tests/_lint/test_run_check.py +++ b/tests/_lint/test_run_check.py @@ -1,14 +1,10 @@ # Copyright 2025 Marimo. All rights reserved. """Unit tests for the run_check CLI integration.""" -import tempfile from pathlib import Path -import pytest - from marimo._lint import FileStatus, Linter, run_check from marimo._lint.diagnostic import Diagnostic, Severity -from marimo._utils.platform import is_windows class TestRunCheck: @@ -22,36 +18,33 @@ def test_run_check_with_empty_files(self): assert len(result.files) == 0 assert result.errored is False - def test_run_check_with_unsupported_files(self): + def test_run_check_with_unsupported_files(self, tmpdir): """Test run_check skips unsupported file types.""" - with tempfile.TemporaryDirectory() as tmpdir: - # Create a non-notebook file - txt_file = Path(tmpdir) / "test.txt" - txt_file.write_text("This is not a notebook") + # Create a non-notebook file + txt_file = Path(tmpdir) / "test.txt" + txt_file.write_text("This is not a notebook") - result = run_check((str(txt_file),)) + result = run_check((str(txt_file),)) - assert len(result.files) == 1 - assert result.files[0].skipped is True - assert "not a notebook file" in result.files[0].message + assert len(result.files) == 1 + assert result.files[0].skipped is True + assert "not a notebook file" in result.files[0].message - def test_run_check_with_empty_py_file(self): + def test_run_check_with_empty_py_file(self, tmpdir): """Test run_check with an empty Python file.""" - with tempfile.TemporaryDirectory() as tmpdir: - py_file = Path(tmpdir) / "empty.py" - py_file.write_text("") + py_file = Path(tmpdir) / "empty.py" + py_file.write_text("") - result = run_check((str(py_file),)) + result = run_check((str(py_file),)) - assert len(result.files) == 1 - assert result.files[0].skipped is True - assert "empty file" in result.files[0].message + assert len(result.files) == 1 + assert result.files[0].skipped is True + assert "empty file" in result.files[0].message - def test_run_check_with_valid_notebook(self): + def test_run_check_with_valid_notebook(self, tmpdir): """Test run_check with a valid marimo notebook.""" - with tempfile.TemporaryDirectory() as tmpdir: - notebook_file = Path(tmpdir) / "notebook.py" - notebook_content = """import marimo + notebook_file = Path(tmpdir) / "notebook.py" + notebook_content = """import marimo __generated_with = "0.15.0" app = marimo.App() @@ -61,65 +54,58 @@ def __(): x = 1 return (x,) """ - notebook_file.write_text(notebook_content) + notebook_file.write_text(notebook_content) - result = run_check((str(notebook_file),)) + result = run_check((str(notebook_file),)) - assert len(result.files) == 1 - assert result.files[0].skipped is False - assert result.files[0].failed is False - assert isinstance(result.files[0].diagnostics, list) + assert len(result.files) == 1 + assert result.files[0].skipped is False + assert result.files[0].failed is False + assert isinstance(result.files[0].diagnostics, list) - def test_run_check_with_syntax_error(self): + def test_run_check_with_syntax_error(self, tmpdir): """Test run_check with a file containing syntax errors.""" - with tempfile.TemporaryDirectory() as tmpdir: - bad_file = Path(tmpdir) / "bad.py" - bad_file.write_text( - "import marimo\napp = marimo.App(\ndef broken(:\n pass" - ) - - result = run_check((str(bad_file),)) - - assert len(result.files) == 1 - assert result.files[0].failed is True - assert "Failed to parse" in result.files[0].message - assert len(result.files[0].details) > 0 - - @pytest.mark.skipif( - is_windows(), reason=r"Skip glob patterns due to path issues" - ) - def test_run_check_with_glob_patterns(self): + bad_file = Path(tmpdir) / "bad.py" + bad_file.write_text( + "import marimo\napp = marimo.App(\ndef broken(:\n pass" + ) + + result = run_check((str(bad_file),)) + + assert len(result.files) == 1 + assert result.files[0].failed is True + assert "Failed to parse" in result.files[0].message + assert len(result.files[0].details) > 0 + + def test_run_check_with_glob_patterns(self, tmpdir): """Test run_check with glob patterns.""" - with tempfile.TemporaryDirectory() as tmpdir: - # Create multiple files - py_file = Path(tmpdir) / "test.py" - py_file.write_text("# empty notebook") - - md_file = Path(tmpdir) / "test.md" - md_file.write_text("# Empty markdown") - - txt_file = Path(tmpdir) / "test.txt" - txt_file.write_text("ignored") - - # Use glob pattern - pattern = str(Path(tmpdir) / "*") - result = run_check((pattern,)) - - # Should find py and md files, skip txt - assert len(result.files) == 3 - py_result = next(f for f in result.files if f.file.endswith(".py")) - md_result = next(f for f in result.files if f.file.endswith(".md")) - txt_result = next( - f for f in result.files if f.file.endswith(".txt") - ) - - # py files with simple comments are not valid notebooks, so they fail parsing - assert py_result.failed is True # Not a valid notebook - # md files with simple content might be processed differently - assert ( - md_result.failed is False or md_result.skipped is True - ) # Markdown files may be handled differently - assert txt_result.skipped is True # Not a notebook + # Create multiple files + py_file = Path(tmpdir) / "test.py" + py_file.write_text("# empty notebook") + + md_file = Path(tmpdir) / "test.md" + md_file.write_text("# Empty markdown") + + txt_file = Path(tmpdir) / "test.txt" + txt_file.write_text("ignored") + + # Use glob pattern + pattern = str(Path(tmpdir) / "*") + result = run_check((pattern,)) + + # Should find py and md files, skip txt + assert len(result.files) == 3 + py_result = next(f for f in result.files if f.file.endswith(".py")) + md_result = next(f for f in result.files if f.file.endswith(".md")) + txt_result = next(f for f in result.files if f.file.endswith(".txt")) + + # py files with simple comments are not valid notebooks, so they fail parsing + assert py_result.failed is True # Not a valid notebook + # md files with simple content might be processed differently + assert ( + md_result.failed is False or md_result.skipped is True + ) # Markdown files may be handled differently + assert txt_result.skipped is True # Not a notebook class TestFileStatus: @@ -154,129 +140,126 @@ def test_file_status_with_diagnostics(self): assert len(status.diagnostics) == 1 assert status.diagnostics[0].code == "MB001" - async def test_file_status_fix_no_fixable_diagnostics(self): + async def test_file_status_fix_no_fixable_diagnostics(self, tmpdir): """Test Linter.fix() with no fixable diagnostics.""" - with tempfile.TemporaryDirectory() as tmpdir: - test_file = Path(tmpdir) / "test.py" - original_content = "# Original content" - test_file.write_text(original_content) + test_file = Path(tmpdir) / "test.py" + original_content = "# Original content" + test_file.write_text(original_content) - diagnostic = Diagnostic( - message="Test error", - cell_id=None, - line=1, - column=1, - code="MB001", - name="test-error", - severity=Severity.BREAKING, - fixable=False, # Not fixable - ) - - # FileStatus needs notebook and contents for fix to work - status = FileStatus( - file=str(test_file), - diagnostics=[diagnostic], - notebook=None, # No notebook means fix returns False - contents=original_content, - ) - - # Since no notebook, fix should return False (no changes) - linter = Linter() - result = await linter.fix(status) - assert result is False + diagnostic = Diagnostic( + message="Test error", + cell_id=None, + line=1, + column=1, + code="MB001", + name="test-error", + severity=Severity.BREAKING, + fixable=False, # Not fixable + ) + + # FileStatus needs notebook and contents for fix to work + status = FileStatus( + file=str(test_file), + diagnostics=[diagnostic], + notebook=None, # No notebook means fix returns False + contents=original_content, + ) - # File should remain unchanged - assert test_file.read_text() == original_content + # Since no notebook, fix should return False (no changes) + linter = Linter() + result = await linter.fix(status) + assert result is False - async def test_file_status_fix_with_fixable_diagnostics(self): + # File should remain unchanged + assert test_file.read_text() == original_content + + async def test_file_status_fix_with_fixable_diagnostics(self, tmpdir): """Test Linter.fix() with fixable diagnostics (but no notebook).""" - with tempfile.TemporaryDirectory() as tmpdir: - test_file = Path(tmpdir) / "test.py" - original_content = "# Original content" - test_file.write_text(original_content) + test_file = Path(tmpdir) / "test.py" + original_content = "# Original content" + test_file.write_text(original_content) + + diagnostic = Diagnostic( + message="Formatting error", + cell_id=None, + line=1, + column=1, + code="MF001", + name="formatting-error", + severity=Severity.FORMATTING, + fixable=True, # Fixable + ) + + # FileStatus needs notebook and contents for fix to work + status = FileStatus( + file=str(test_file), + diagnostics=[diagnostic], + notebook=None, # No notebook means fix returns False + contents=original_content, + ) + + # Since no notebook, fix should return False (no changes) + linter = Linter() + result = await linter.fix(status) + assert result is False + + # File should remain unchanged + assert test_file.read_text() == original_content + + async def test_file_status_fix_with_multiple_diagnostics(self, tmpdir): + """Test Linter.fix() with multiple diagnostics (but no notebook).""" + test_file = Path(tmpdir) / "test.py" + original_content = "# Original content" + test_file.write_text(original_content) - diagnostic = Diagnostic( - message="Formatting error", + diagnostics = [ + Diagnostic( + message="Formatting", cell_id=None, line=1, column=1, code="MF001", name="formatting-error", severity=Severity.FORMATTING, - fixable=True, # Fixable - ) - - # FileStatus needs notebook and contents for fix to work - status = FileStatus( - file=str(test_file), - diagnostics=[diagnostic], - notebook=None, # No notebook means fix returns False - contents=original_content, - ) - - # Since no notebook, fix should return False (no changes) - linter = Linter() - result = await linter.fix(status) - assert result is False - - # File should remain unchanged - assert test_file.read_text() == original_content + fixable=True, + ), + Diagnostic( + message="Breaking", + cell_id=None, + line=2, + column=1, + code="MB001", + name="breaking-error", + severity=Severity.BREAKING, + fixable=True, + ), + Diagnostic( + message="Runtime", + cell_id=None, + line=3, + column=1, + code="MR001", + name="runtime-error", + severity=Severity.RUNTIME, + fixable=True, + ), + ] + + # FileStatus needs notebook and contents for fix to work + status = FileStatus( + file=str(test_file), + diagnostics=diagnostics, + notebook=None, # No notebook means fix returns False + contents=original_content, + ) - async def test_file_status_fix_with_multiple_diagnostics(self): - """Test Linter.fix() with multiple diagnostics (but no notebook).""" - with tempfile.TemporaryDirectory() as tmpdir: - test_file = Path(tmpdir) / "test.py" - original_content = "# Original content" - test_file.write_text(original_content) - - diagnostics = [ - Diagnostic( - message="Formatting", - cell_id=None, - line=1, - column=1, - code="MF001", - name="formatting-error", - severity=Severity.FORMATTING, - fixable=True, - ), - Diagnostic( - message="Breaking", - cell_id=None, - line=2, - column=1, - code="MB001", - name="breaking-error", - severity=Severity.BREAKING, - fixable=True, - ), - Diagnostic( - message="Runtime", - cell_id=None, - line=3, - column=1, - code="MR001", - name="runtime-error", - severity=Severity.RUNTIME, - fixable=True, - ), - ] - - # FileStatus needs notebook and contents for fix to work - status = FileStatus( - file=str(test_file), - diagnostics=diagnostics, - notebook=None, # No notebook means fix returns False - contents=original_content, - ) - - # Since no notebook, fix should return False (no changes) - linter = Linter() - result = await linter.fix(status) - assert result is False + # Since no notebook, fix should return False (no changes) + linter = Linter() + result = await linter.fix(status) + assert result is False - # File should remain unchanged - assert test_file.read_text() == original_content + # File should remain unchanged + assert test_file.read_text() == original_content class TestLinter: @@ -306,13 +289,12 @@ def test_check_result_with_files(self): class TestIntegration: """Integration tests combining multiple components.""" - def test_full_workflow_with_notebook_violations(self): + def test_full_workflow_with_notebook_violations(self, tmpdir): """Test the full workflow with a notebook that has violations.""" - with tempfile.TemporaryDirectory() as tmpdir: - notebook_file = Path(tmpdir) / "bad_notebook.py" + notebook_file = Path(tmpdir) / "bad_notebook.py" - # Create a notebook with violations (missing __generated_with) - notebook_content = """import marimo + # Create a notebook with violations (missing __generated_with) + notebook_content = """import marimo app = marimo.App() @@ -324,50 +306,46 @@ def __(): y = 2 return (y,) """ - notebook_file.write_text(notebook_content) + notebook_file.write_text(notebook_content) - result = run_check((str(notebook_file),)) + result = run_check((str(notebook_file),)) - assert len(result.files) == 1 - file_status = result.files[0] + assert len(result.files) == 1 + file_status = result.files[0] - assert not file_status.skipped - assert not file_status.failed - assert len(file_status.diagnostics) > 0 + assert not file_status.skipped + assert not file_status.failed + assert len(file_status.diagnostics) > 0 - # Should have formatting violations - assert any( - d.severity == Severity.FORMATTING - for d in file_status.diagnostics - ) + # Should have formatting violations + assert any( + d.severity == Severity.FORMATTING for d in file_status.diagnostics + ) - # Test fixing (if notebook is available) - if file_status.notebook is not None: - # Use Linter.fix() method - import asyncio + # Test fixing (if notebook is available) + if file_status.notebook is not None: + # Use Linter.fix() method + import asyncio - linter = Linter() - result = asyncio.run(linter.fix(file_status)) - assert isinstance(result, bool) + linter = Linter() + result = asyncio.run(linter.fix(file_status)) + assert isinstance(result, bool) - def test_error_handling_in_run_check(self): + def test_error_handling_in_run_check(self, tmpdir): """Test error handling in run_check.""" # Test by providing invalid content that will cause parsing to fail - import tempfile - - with tempfile.TemporaryDirectory() as tmpdir: - test_file = Path(tmpdir) / "test.py" - # Write invalid Python content that will cause an exception - test_file.write_text( - "import marimo\napp = marimo.App()\ndef broken(:\n pass" - ) + test_file = Path(tmpdir) / "test.py" + # Write invalid Python content that will cause an exception + test_file.write_text( + "import marimo\napp = marimo.App()\ndef broken(:\n pass" + ) - result = run_check((str(test_file),)) + result = run_check((str(test_file),)) - assert len(result.files) == 1 - assert result.files[0].failed is True - # Note: errored might not be True as we changed error handling - assert "Failed to parse" in result.files[0].message + assert len(result.files) == 1 + assert result.files[0].failed is True + # Note: errored might not be True as we changed error handling + assert "Failed to parse" in result.files[0].message def test_run_check_with_nonexistent_file_pattern(self): """Test run_check with a specific nonexistent file."""