diff --git a/python-dsl/.gitignore b/python-dsl/.gitignore index 3ab8e431..dc7c2a30 100644 --- a/python-dsl/.gitignore +++ b/python-dsl/.gitignore @@ -42,3 +42,6 @@ ENV/ # OS .DS_Store Thumbs.db + +# Mypy +.mypy_cache/ diff --git a/python-dsl/README.md b/python-dsl/README.md index 55cee452..1a481d91 100644 --- a/python-dsl/README.md +++ b/python-dsl/README.md @@ -1,4 +1,4 @@ -# Pathfinder Python DSL +# Code-Pathfinder Python DSL Python DSL for defining security patterns in code-pathfinder. @@ -11,7 +11,7 @@ pip install codepathfinder ## Quick Start ```python -from pathfinder import rule, calls, variable +from codepathfinder import rule, calls, variable @rule(id="code-injection", severity="critical", cwe="CWE-94") def detect_eval(): @@ -31,7 +31,7 @@ def detect_user_input(): Matches function/method calls. ```python -from pathfinder import calls +from codepathfinder import calls # Exact match calls("eval") @@ -49,7 +49,7 @@ calls("*.execute") # Matches cursor.execute, conn.execute, etc. Matches variable references. ```python -from pathfinder import variable +from codepathfinder import variable # Exact match variable("user_input") @@ -59,12 +59,77 @@ variable("user_*") # Matches user_input, user_data, etc. variable("*_id") # Matches user_id, post_id, etc. ``` +## Dataflow Analysis + +### `flows(from_sources, to_sinks, sanitized_by=None, propagates_through=None, scope="global")` + +Tracks tainted data flow from sources to sinks for OWASP Top 10 vulnerability detection. + +```python +from codepathfinder import flows, calls, propagates + +# SQL Injection +flows( + from_sources=calls("request.GET", "request.POST"), + to_sinks=calls("execute", "executemany"), + sanitized_by=calls("quote_sql"), + propagates_through=[ + propagates.assignment(), + propagates.function_args(), + ], + scope="global" +) + +# Command Injection +flows( + from_sources=calls("request.POST"), + to_sinks=calls("os.system", "subprocess.call"), + sanitized_by=calls("shlex.quote"), + propagates_through=[ + propagates.assignment(), + propagates.function_args(), + propagates.function_returns(), + ] +) + +# Path Traversal +flows( + from_sources=calls("request.GET"), + to_sinks=calls("open", "os.path.join"), + sanitized_by=calls("os.path.abspath"), + propagates_through=[propagates.assignment()], + scope="local" +) +``` + +**Parameters:** +- `from_sources`: Source matcher(s) where taint originates (e.g., user input) +- `to_sinks`: Sink matcher(s) for dangerous functions +- `sanitized_by` (optional): Sanitizer matcher(s) that neutralize taint +- `propagates_through` (optional): List of propagation primitives (EXPLICIT!) +- `scope`: `"local"` (intra-procedural) or `"global"` (inter-procedural, default) + +### Propagation Primitives + +Propagation primitives define HOW taint flows through code: + +```python +from codepathfinder import propagates + +# Phase 1 (Available Now): +propagates.assignment() # x = tainted +propagates.function_args() # func(tainted) +propagates.function_returns() # return tainted +``` + +**Important:** Propagation is EXPLICIT - you must specify which primitives to enable. No defaults are applied. + ## Rule Decorator The `@rule` decorator marks functions as security rules with metadata. ```python -from pathfinder import rule, calls +from codepathfinder import rule, calls @rule( id="sqli-001", @@ -90,7 +155,7 @@ The function docstring becomes the rule description. Rules serialize to JSON Intermediate Representation (IR) for the Go executor: ```python -from pathfinder import rule, calls +from codepathfinder import rule, calls import json @rule(id="test", severity="high") @@ -132,13 +197,13 @@ pip install -e ".[dev]" pytest # Format code -black pathfinder/ tests/ +black codepathfinder/ tests/ # Lint -ruff check pathfinder/ tests/ +ruff check codepathfinder/ tests/ # Type check -mypy pathfinder/ +mypy codepathfinder/ ``` ## Requirements diff --git a/python-dsl/codepathfinder/__init__.py b/python-dsl/codepathfinder/__init__.py new file mode 100644 index 00000000..96a6e41c --- /dev/null +++ b/python-dsl/codepathfinder/__init__.py @@ -0,0 +1,32 @@ +""" +codepathfinder - Python DSL for static analysis security patterns + +Examples: + Basic matchers: + >>> from codepathfinder import calls, variable + >>> calls("eval") + >>> variable("user_input") + + Rule definition: + >>> from codepathfinder import rule, calls + >>> @rule(id="test", severity="high") + >>> def detect_eval(): + >>> return calls("eval") + + Dataflow analysis: + >>> from codepathfinder import flows, calls, propagates + >>> flows( + ... from_sources=calls("request.GET"), + ... to_sinks=calls("execute"), + ... propagates_through=[propagates.assignment()] + ... ) +""" + +__version__ = "1.0.0" + +from .matchers import calls, variable +from .decorators import rule +from .dataflow import flows +from .propagation import propagates + +__all__ = ["calls", "variable", "rule", "flows", "propagates", "__version__"] diff --git a/python-dsl/codepathfinder/dataflow.py b/python-dsl/codepathfinder/dataflow.py new file mode 100644 index 00000000..4a5d18da --- /dev/null +++ b/python-dsl/codepathfinder/dataflow.py @@ -0,0 +1,190 @@ +""" +Dataflow matcher for taint analysis. + +The flows() function is the core of OWASP Top 10 pattern detection. +It describes how tainted data flows from sources to sinks. +""" + +from typing import List, Optional, Union +from .matchers import CallMatcher +from .propagation import PropagationPrimitive, create_propagation_list +from .ir import IRType + + +class DataflowMatcher: + """ + Matches tainted data flows from sources to sinks. + + This is the primary matcher for security vulnerabilities like: + - SQL Injection (A03:2021) + - Command Injection (A03:2021) + - SSRF (A10:2021) + - Path Traversal (A01:2021) + - Insecure Deserialization (A08:2021) + + Attributes: + sources: Matchers for taint sources (e.g., user input) + sinks: Matchers for dangerous sinks (e.g., eval, execute) + sanitizers: Optional matchers for sanitizer functions + propagates_through: List of propagation primitives (EXPLICIT!) + scope: "local" (same function) or "global" (cross-function) + """ + + def __init__( + self, + from_sources: Union[CallMatcher, List[CallMatcher]], + to_sinks: Union[CallMatcher, List[CallMatcher]], + sanitized_by: Optional[Union[CallMatcher, List[CallMatcher]]] = None, + propagates_through: Optional[List[PropagationPrimitive]] = None, + scope: str = "global", + ): + """ + Args: + from_sources: Source matcher(s) - where taint originates + to_sinks: Sink matcher(s) - dangerous functions + sanitized_by: Optional sanitizer matcher(s) + propagates_through: EXPLICIT list of propagation primitives + (default: None = no propagation!) + scope: "local" (intra-procedural) or "global" (inter-procedural) + + Raises: + ValueError: If sources/sinks are empty, scope invalid, etc. + + Examples: + # SQL Injection + flows( + from_sources=calls("request.GET", "request.POST"), + to_sinks=calls("execute", "executemany"), + sanitized_by=calls("quote_sql"), + propagates_through=[ + propagates.assignment(), + propagates.function_args(), + ], + scope="global" + ) + """ + # Validate sources + if isinstance(from_sources, CallMatcher): + from_sources = [from_sources] + if not from_sources: + raise ValueError("flows() requires at least one source") + self.sources = from_sources + + # Validate sinks + if isinstance(to_sinks, CallMatcher): + to_sinks = [to_sinks] + if not to_sinks: + raise ValueError("flows() requires at least one sink") + self.sinks = to_sinks + + # Validate sanitizers + if sanitized_by is None: + sanitized_by = [] + elif isinstance(sanitized_by, CallMatcher): + sanitized_by = [sanitized_by] + self.sanitizers = sanitized_by + + # Validate propagation (EXPLICIT!) + if propagates_through is None: + propagates_through = [] # NO DEFAULT! Developer must specify! + self.propagates_through = propagates_through + + # Validate scope + if scope not in ["local", "global"]: + raise ValueError(f"scope must be 'local' or 'global', got '{scope}'") + self.scope = scope + + def to_ir(self) -> dict: + """ + Serialize to JSON IR for Go executor. + + Returns: + { + "type": "dataflow", + "sources": [ + {"type": "call_matcher", "patterns": ["request.GET"], ...} + ], + "sinks": [ + {"type": "call_matcher", "patterns": ["execute"], ...} + ], + "sanitizers": [ + {"type": "call_matcher", "patterns": ["quote_sql"], ...} + ], + "propagation": [ + {"type": "assignment", "metadata": {}}, + {"type": "function_args", "metadata": {}} + ], + "scope": "global" + } + """ + return { + "type": IRType.DATAFLOW.value, + "sources": [src.to_ir() for src in self.sources], + "sinks": [sink.to_ir() for sink in self.sinks], + "sanitizers": [san.to_ir() for san in self.sanitizers], + "propagation": create_propagation_list(self.propagates_through), + "scope": self.scope, + } + + def __repr__(self) -> str: + src_count = len(self.sources) + sink_count = len(self.sinks) + prop_count = len(self.propagates_through) + return ( + f"flows(sources={src_count}, sinks={sink_count}, " + f"propagation={prop_count}, scope='{self.scope}')" + ) + + +# Public API +def flows( + from_sources: Union[CallMatcher, List[CallMatcher]], + to_sinks: Union[CallMatcher, List[CallMatcher]], + sanitized_by: Optional[Union[CallMatcher, List[CallMatcher]]] = None, + propagates_through: Optional[List[PropagationPrimitive]] = None, + scope: str = "global", +) -> DataflowMatcher: + """ + Create a dataflow matcher for taint analysis. + + This is the PRIMARY matcher for OWASP Top 10 vulnerabilities. + + Args: + from_sources: Where taint originates (e.g., user input) + to_sinks: Dangerous functions that consume tainted data + sanitized_by: Optional functions that neutralize taint + propagates_through: HOW taint flows (MUST be explicit!) + scope: "local" or "global" analysis + + Returns: + DataflowMatcher instance + + Examples: + >>> from codepathfinder import flows, calls, propagates + >>> + >>> # SQL Injection + >>> flows( + ... from_sources=calls("request.GET"), + ... to_sinks=calls("execute"), + ... propagates_through=[propagates.assignment()] + ... ) + >>> + >>> # Command Injection with sanitization + >>> flows( + ... from_sources=calls("request.POST"), + ... to_sinks=calls("os.system", "subprocess.call"), + ... sanitized_by=calls("shlex.quote"), + ... propagates_through=[ + ... propagates.assignment(), + ... propagates.function_args() + ... ], + ... scope="global" + ... ) + """ + return DataflowMatcher( + from_sources=from_sources, + to_sinks=to_sinks, + sanitized_by=sanitized_by, + propagates_through=propagates_through, + scope=scope, + ) diff --git a/python-dsl/pathfinder/decorators.py b/python-dsl/codepathfinder/decorators.py similarity index 100% rename from python-dsl/pathfinder/decorators.py rename to python-dsl/codepathfinder/decorators.py diff --git a/python-dsl/pathfinder/ir.py b/python-dsl/codepathfinder/ir.py similarity index 80% rename from python-dsl/pathfinder/ir.py rename to python-dsl/codepathfinder/ir.py index 035cb0bc..805a9c64 100644 --- a/python-dsl/pathfinder/ir.py +++ b/python-dsl/codepathfinder/ir.py @@ -88,4 +88,20 @@ def validate_ir(ir: Dict[str, Any]) -> bool: and isinstance(ir["wildcard"], bool) ) + if ir_type == IRType.DATAFLOW: + return ( + "sources" in ir + and isinstance(ir["sources"], list) + and len(ir["sources"]) > 0 + and "sinks" in ir + and isinstance(ir["sinks"], list) + and len(ir["sinks"]) > 0 + and "sanitizers" in ir + and isinstance(ir["sanitizers"], list) + and "propagation" in ir + and isinstance(ir["propagation"], list) + and "scope" in ir + and ir["scope"] in ["local", "global"] + ) + return True diff --git a/python-dsl/pathfinder/matchers.py b/python-dsl/codepathfinder/matchers.py similarity index 100% rename from python-dsl/pathfinder/matchers.py rename to python-dsl/codepathfinder/matchers.py diff --git a/python-dsl/codepathfinder/propagation.py b/python-dsl/codepathfinder/propagation.py new file mode 100644 index 00000000..5370682c --- /dev/null +++ b/python-dsl/codepathfinder/propagation.py @@ -0,0 +1,207 @@ +""" +Taint propagation primitives for dataflow analysis. + +These primitives define HOW taint propagates through code constructs. +Developers specify which primitives to enable via propagates_through parameter. +""" + +from typing import Dict, Any, List, Optional +from enum import Enum + + +class PropagationType(Enum): + """ + Enum of all propagation primitive types. + + Phase 1 (MVP - This PR): + ASSIGNMENT, FUNCTION_ARGS, FUNCTION_RETURNS + + Phase 2 (MVP - Future PR): + STRING_CONCAT, STRING_FORMAT + + Phase 3-6 (Post-MVP): + Collections, control flow, OOP, advanced + """ + + # ===== PHASE 1: BARE MINIMUM (MVP) ===== + ASSIGNMENT = "assignment" + FUNCTION_ARGS = "function_args" + FUNCTION_RETURNS = "function_returns" + + # ===== PHASE 2: STRING OPERATIONS (MVP - Future PR) ===== + STRING_CONCAT = "string_concat" + STRING_FORMAT = "string_format" + + # ===== PHASE 3: COLLECTIONS (POST-MVP) ===== + LIST_APPEND = "list_append" + LIST_EXTEND = "list_extend" + DICT_VALUES = "dict_values" + DICT_UPDATE = "dict_update" + SET_ADD = "set_add" + + # ===== PHASE 4: CONTROL FLOW (POST-MVP) ===== + IF_CONDITION = "if_condition" + FOR_ITERATION = "for_iteration" + WHILE_CONDITION = "while_condition" + SWITCH_CASE = "switch_case" + + # ===== PHASE 5: OOP (POST-MVP) ===== + ATTRIBUTE_ASSIGNMENT = "attribute_assignment" + METHOD_CALL = "method_call" + CONSTRUCTOR = "constructor" + + # ===== PHASE 6: ADVANCED (POST-MVP) ===== + COMPREHENSION = "comprehension" + LAMBDA_CAPTURE = "lambda_capture" + YIELD_STMT = "yield_stmt" + + +class PropagationPrimitive: + """ + Base class for propagation primitives. + + Each primitive describes ONE way taint can flow through code. + """ + + def __init__( + self, prim_type: PropagationType, metadata: Optional[Dict[str, Any]] = None + ): + """ + Args: + prim_type: The type of propagation + metadata: Optional additional configuration + """ + self.type = prim_type + self.metadata = metadata or {} + + def to_ir(self) -> Dict[str, Any]: + """ + Serialize to JSON IR. + + Returns: + { + "type": "assignment", + "metadata": {} + } + """ + return { + "type": self.type.value, + "metadata": self.metadata, + } + + def __repr__(self) -> str: + return f"propagates.{self.type.value}()" + + +class propagates: + """ + Namespace for taint propagation primitives. + + Usage: + propagates.assignment() + propagates.function_args() + propagates.function_returns() + """ + + # ===== PHASE 1: BARE MINIMUM (MVP - THIS PR) ===== + + @staticmethod + def assignment() -> PropagationPrimitive: + """ + Taint propagates through variable assignment. + + Patterns matched: + x = tainted # Simple assignment + a = b = tainted # Chained assignment + x, y = tainted, safe # Tuple unpacking (x is tainted) + + This is the MOST COMMON propagation pattern (~40% of all flows). + + Examples: + user_input = request.GET.get("id") # source + query = user_input # PROPAGATES via assignment + cursor.execute(query) # sink + + Returns: + PropagationPrimitive for assignment + """ + return PropagationPrimitive(PropagationType.ASSIGNMENT) + + @staticmethod + def function_args() -> PropagationPrimitive: + """ + Taint propagates through function arguments. + + Patterns matched: + func(tainted) # Positional argument + func(arg=tainted) # Keyword argument + func(*tainted) # Args unpacking + func(**tainted) # Kwargs unpacking + + Critical for inter-procedural analysis (~30% of flows). + + Examples: + user_input = request.GET.get("id") # source + process_data(user_input) # PROPAGATES via function_args + def process_data(data): + execute(data) # sink (data is tainted) + + Returns: + PropagationPrimitive for function arguments + """ + return PropagationPrimitive(PropagationType.FUNCTION_ARGS) + + @staticmethod + def function_returns() -> PropagationPrimitive: + """ + Taint propagates through return values. + + Patterns matched: + return tainted # Direct return + return tainted if cond else safe # Conditional return + return [tainted, safe] # Return list containing tainted + + Essential for functions that transform tainted data (~20% of flows). + + Examples: + def get_user_id(): + user_input = request.GET.get("id") # source + return user_input # PROPAGATES via return + + query = get_user_id() # query is now tainted + execute(query) # sink + + Returns: + PropagationPrimitive for function returns + """ + return PropagationPrimitive(PropagationType.FUNCTION_RETURNS) + + # ===== PHASE 2: STRING OPERATIONS (MVP - Future PR) ===== + # Will be implemented in PR #4 + # string_concat(), string_format() + + # ===== PHASE 3-6: POST-MVP ===== + # Will be implemented in post-MVP PRs + + +def create_propagation_list( + primitives: List[PropagationPrimitive], +) -> List[Dict[str, Any]]: + """ + Convert a list of propagation primitives to JSON IR. + + Args: + primitives: List of PropagationPrimitive objects + + Returns: + List of JSON IR dictionaries + + Example: + >>> prims = [propagates.assignment(), propagates.function_args()] + >>> create_propagation_list(prims) + [ + {"type": "assignment", "metadata": {}}, + {"type": "function_args", "metadata": {}} + ] + """ + return [prim.to_ir() for prim in primitives] diff --git a/python-dsl/pathfinder/__init__.py b/python-dsl/pathfinder/__init__.py deleted file mode 100644 index be3b2be7..00000000 --- a/python-dsl/pathfinder/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -""" -pathfinder - Python DSL for static analysis security patterns - -Examples: - Basic matchers: - >>> from pathfinder import calls, variable - >>> calls("eval") - >>> variable("user_input") - - Rule definition: - >>> from pathfinder import rule, calls - >>> @rule(id="test", severity="high") - >>> def detect_eval(): - >>> return calls("eval") -""" - -__version__ = "1.0.0" - -from .matchers import calls, variable -from .decorators import rule - -__all__ = ["calls", "variable", "rule", "__version__"] diff --git a/python-dsl/pyproject.toml b/python-dsl/pyproject.toml index b95af6f3..397222cb 100644 --- a/python-dsl/pyproject.toml +++ b/python-dsl/pyproject.toml @@ -14,10 +14,10 @@ license = {text = "MIT"} testpaths = ["tests"] python_files = "test_*.py" python_functions = "test_*" -addopts = "--cov=pathfinder --cov-report=term-missing --cov-report=html" +addopts = "--cov=codepathfinder --cov-report=term-missing --cov-report=html" [tool.coverage.run] -source = ["pathfinder"] +source = ["codepathfinder"] omit = ["tests/*", "setup.py"] [tool.coverage.report] @@ -38,6 +38,6 @@ line-length = 88 target-version = "py38" [tool.mypy] -python_version = "3.8" +python_version = "3.9" warn_return_any = true warn_unused_configs = true diff --git a/python-dsl/setup.py b/python-dsl/setup.py index efd3c6c2..650725d6 100644 --- a/python-dsl/setup.py +++ b/python-dsl/setup.py @@ -1,11 +1,11 @@ -"""Setup script for pathfinder Python DSL.""" +"""Setup script for codepathfinder Python DSL.""" from setuptools import setup, find_packages from pathlib import Path # Read version from __init__.py version = {} -with open("pathfinder/__init__.py") as f: +with open("codepathfinder/__init__.py") as f: for line in f: if line.startswith("__version__"): exec(line, version) diff --git a/python-dsl/tests/test_dataflow.py b/python-dsl/tests/test_dataflow.py new file mode 100644 index 00000000..0143c7aa --- /dev/null +++ b/python-dsl/tests/test_dataflow.py @@ -0,0 +1,399 @@ +""" +Tests for dataflow matcher and flows() function. +""" + +import pytest +from codepathfinder.dataflow import DataflowMatcher, flows +from codepathfinder.matchers import calls, variable +from codepathfinder.propagation import propagates +from codepathfinder.ir import IRType + + +class TestDataflowMatcherInit: + """Tests for DataflowMatcher initialization.""" + + def test_create_with_single_source_and_sink(self): + """Can create matcher with single source and sink.""" + matcher = DataflowMatcher( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + ) + assert len(matcher.sources) == 1 + assert len(matcher.sinks) == 1 + assert matcher.sanitizers == [] + assert matcher.propagates_through == [] + assert matcher.scope == "global" + + def test_create_with_multiple_sources(self): + """Can create matcher with multiple sources.""" + matcher = DataflowMatcher( + from_sources=[calls("request.GET"), calls("request.POST")], + to_sinks=calls("execute"), + ) + assert len(matcher.sources) == 2 + + def test_create_with_multiple_sinks(self): + """Can create matcher with multiple sinks.""" + matcher = DataflowMatcher( + from_sources=calls("request.GET"), + to_sinks=[calls("execute"), calls("executemany")], + ) + assert len(matcher.sinks) == 2 + + def test_create_with_sanitizers(self): + """Can create matcher with sanitizers.""" + matcher = DataflowMatcher( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + sanitized_by=calls("quote_sql"), + ) + assert len(matcher.sanitizers) == 1 + + def test_create_with_multiple_sanitizers(self): + """Can create matcher with multiple sanitizers.""" + matcher = DataflowMatcher( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + sanitized_by=[calls("quote_sql"), calls("escape_sql")], + ) + assert len(matcher.sanitizers) == 2 + + def test_create_with_propagation(self): + """Can create matcher with propagation primitives.""" + matcher = DataflowMatcher( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + propagates_through=[propagates.assignment()], + ) + assert len(matcher.propagates_through) == 1 + + def test_create_with_multiple_propagation(self): + """Can create matcher with multiple propagation primitives.""" + matcher = DataflowMatcher( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + propagates_through=[ + propagates.assignment(), + propagates.function_args(), + propagates.function_returns(), + ], + ) + assert len(matcher.propagates_through) == 3 + + def test_create_with_local_scope(self): + """Can create matcher with local scope.""" + matcher = DataflowMatcher( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + scope="local", + ) + assert matcher.scope == "local" + + def test_empty_sources_raises_error(self): + """Empty sources raises ValueError.""" + with pytest.raises(ValueError, match="requires at least one source"): + DataflowMatcher(from_sources=[], to_sinks=calls("execute")) + + def test_empty_sinks_raises_error(self): + """Empty sinks raises ValueError.""" + with pytest.raises(ValueError, match="requires at least one sink"): + DataflowMatcher(from_sources=calls("request.GET"), to_sinks=[]) + + def test_invalid_scope_raises_error(self): + """Invalid scope raises ValueError.""" + with pytest.raises(ValueError, match="scope must be"): + DataflowMatcher( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + scope="invalid", + ) + + +class TestDataflowMatcherToIR: + """Tests for DataflowMatcher.to_ir() serialization.""" + + def test_minimal_ir(self): + """Minimal matcher serializes correctly.""" + matcher = DataflowMatcher( + from_sources=calls("source"), + to_sinks=calls("sink"), + ) + ir = matcher.to_ir() + assert ir["type"] == IRType.DATAFLOW.value + assert len(ir["sources"]) == 1 + assert len(ir["sinks"]) == 1 + assert ir["sanitizers"] == [] + assert ir["propagation"] == [] + assert ir["scope"] == "global" + + def test_full_ir_with_all_fields(self): + """Full matcher with all fields serializes correctly.""" + matcher = DataflowMatcher( + from_sources=[calls("request.GET"), calls("request.POST")], + to_sinks=[calls("execute"), calls("executemany")], + sanitized_by=[calls("quote_sql")], + propagates_through=[ + propagates.assignment(), + propagates.function_args(), + ], + scope="local", + ) + ir = matcher.to_ir() + assert ir["type"] == IRType.DATAFLOW.value + assert len(ir["sources"]) == 2 + assert len(ir["sinks"]) == 2 + assert len(ir["sanitizers"]) == 1 + assert len(ir["propagation"]) == 2 + assert ir["scope"] == "local" + + def test_sources_ir_structure(self): + """Sources serialize to correct IR structure.""" + matcher = DataflowMatcher( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + ) + ir = matcher.to_ir() + source_ir = ir["sources"][0] + assert source_ir["type"] == "call_matcher" + assert "request.GET" in source_ir["patterns"] + + def test_sinks_ir_structure(self): + """Sinks serialize to correct IR structure.""" + matcher = DataflowMatcher( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + ) + ir = matcher.to_ir() + sink_ir = ir["sinks"][0] + assert sink_ir["type"] == "call_matcher" + assert "execute" in sink_ir["patterns"] + + def test_sanitizers_ir_structure(self): + """Sanitizers serialize to correct IR structure.""" + matcher = DataflowMatcher( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + sanitized_by=calls("quote_sql"), + ) + ir = matcher.to_ir() + sanitizer_ir = ir["sanitizers"][0] + assert sanitizer_ir["type"] == "call_matcher" + assert "quote_sql" in sanitizer_ir["patterns"] + + def test_propagation_ir_structure(self): + """Propagation primitives serialize to correct IR structure.""" + matcher = DataflowMatcher( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + propagates_through=[propagates.assignment()], + ) + ir = matcher.to_ir() + prop_ir = ir["propagation"][0] + assert prop_ir["type"] == "assignment" + assert prop_ir["metadata"] == {} + + +class TestDataflowMatcherRepr: + """Tests for DataflowMatcher.__repr__().""" + + def test_repr_format(self): + """__repr__ returns readable string.""" + matcher = DataflowMatcher( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + propagates_through=[propagates.assignment()], + ) + repr_str = repr(matcher) + assert "flows" in repr_str + assert "sources=1" in repr_str + assert "sinks=1" in repr_str + assert "propagation=1" in repr_str + assert "scope='global'" in repr_str + + def test_repr_counts_multiple(self): + """__repr__ counts multiple sources/sinks/propagation.""" + matcher = DataflowMatcher( + from_sources=[calls("a"), calls("b")], + to_sinks=[calls("x"), calls("y"), calls("z")], + propagates_through=[propagates.assignment(), propagates.function_args()], + ) + repr_str = repr(matcher) + assert "sources=2" in repr_str + assert "sinks=3" in repr_str + assert "propagation=2" in repr_str + + +class TestFlowsFunction: + """Tests for flows() public API function.""" + + def test_flows_returns_dataflow_matcher(self): + """flows() returns DataflowMatcher instance.""" + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + ) + assert isinstance(matcher, DataflowMatcher) + + def test_flows_with_all_parameters(self): + """flows() accepts all parameters.""" + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + sanitized_by=calls("quote_sql"), + propagates_through=[propagates.assignment()], + scope="local", + ) + assert len(matcher.sources) == 1 + assert len(matcher.sinks) == 1 + assert len(matcher.sanitizers) == 1 + assert len(matcher.propagates_through) == 1 + assert matcher.scope == "local" + + def test_flows_default_scope_is_global(self): + """flows() defaults to global scope.""" + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + ) + assert matcher.scope == "global" + + def test_flows_default_propagation_is_empty(self): + """flows() defaults to empty propagation list (EXPLICIT!).""" + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + ) + assert matcher.propagates_through == [] + + def test_flows_default_sanitizers_is_empty(self): + """flows() defaults to empty sanitizers list.""" + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + ) + assert matcher.sanitizers == [] + + +class TestDataflowIntegration: + """Integration tests for realistic OWASP Top 10 patterns.""" + + def test_sql_injection_pattern(self): + """SQL injection pattern with typical configuration.""" + matcher = flows( + from_sources=calls("request.GET", "request.POST"), + to_sinks=calls("execute", "executemany"), + sanitized_by=calls("quote_sql"), + propagates_through=[ + propagates.assignment(), + propagates.function_args(), + ], + scope="global", + ) + ir = matcher.to_ir() + assert ir["type"] == "dataflow" + assert len(ir["sources"]) == 1 # Single calls() with 2 patterns + assert len(ir["sinks"]) == 1 # Single calls() with 2 patterns + assert len(ir["propagation"]) == 2 + + def test_command_injection_pattern(self): + """Command injection pattern with typical configuration.""" + matcher = flows( + from_sources=calls("request.POST"), + to_sinks=calls("os.system", "subprocess.call"), + sanitized_by=calls("shlex.quote"), + propagates_through=[ + propagates.assignment(), + propagates.function_args(), + propagates.function_returns(), + ], + scope="global", + ) + ir = matcher.to_ir() + assert ir["type"] == "dataflow" + assert len(ir["propagation"]) == 3 + + def test_path_traversal_pattern(self): + """Path traversal pattern with typical configuration.""" + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("open", "os.path.join"), + sanitized_by=calls("os.path.abspath"), + propagates_through=[propagates.assignment()], + scope="local", + ) + ir = matcher.to_ir() + assert ir["scope"] == "local" + assert len(ir["propagation"]) == 1 + + def test_ssrf_pattern(self): + """SSRF pattern with typical configuration.""" + matcher = flows( + from_sources=calls("request.GET", "request.POST"), + to_sinks=calls("requests.get", "urllib.request.urlopen"), + propagates_through=[ + propagates.assignment(), + propagates.function_args(), + ], + scope="global", + ) + ir = matcher.to_ir() + assert len(ir["sanitizers"]) == 0 # No sanitizers for SSRF + + def test_insecure_deserialization_pattern(self): + """Insecure deserialization pattern.""" + matcher = flows( + from_sources=calls("request.POST"), + to_sinks=calls("pickle.loads", "yaml.load"), + propagates_through=[propagates.assignment()], + scope="local", + ) + ir = matcher.to_ir() + assert ir["type"] == "dataflow" + + +class TestDataflowEdgeCases: + """Edge cases and error handling tests.""" + + def test_mixed_matcher_types_sources(self): + """Can mix CallMatcher and VariableMatcher as sources.""" + matcher = flows( + from_sources=[calls("request.GET"), variable("user_*")], + to_sinks=calls("execute"), + ) + assert len(matcher.sources) == 2 + + def test_single_propagation_primitive(self): + """Can specify single propagation primitive.""" + matcher = flows( + from_sources=calls("source"), + to_sinks=calls("sink"), + propagates_through=[propagates.assignment()], + ) + assert len(matcher.propagates_through) == 1 + + def test_no_propagation_valid(self): + """No propagation is valid (explicit choice).""" + matcher = flows( + from_sources=calls("source"), + to_sinks=calls("sink"), + propagates_through=[], + ) + assert matcher.propagates_through == [] + + def test_global_scope_string_validation(self): + """'global' scope string is accepted.""" + matcher = flows( + from_sources=calls("source"), + to_sinks=calls("sink"), + scope="global", + ) + assert matcher.scope == "global" + + def test_local_scope_string_validation(self): + """'local' scope string is accepted.""" + matcher = flows( + from_sources=calls("source"), + to_sinks=calls("sink"), + scope="local", + ) + assert matcher.scope == "local" diff --git a/python-dsl/tests/test_decorators.py b/python-dsl/tests/test_decorators.py index c4653099..0b5d5ffc 100644 --- a/python-dsl/tests/test_decorators.py +++ b/python-dsl/tests/test_decorators.py @@ -1,7 +1,7 @@ """Tests for pathfinder.decorators module.""" -from pathfinder import rule, calls, variable -from pathfinder.decorators import Rule +from codepathfinder import rule, calls, variable +from codepathfinder.decorators import Rule class TestRuleDecorator: diff --git a/python-dsl/tests/test_ir.py b/python-dsl/tests/test_ir.py index ebd562f5..731a94d4 100644 --- a/python-dsl/tests/test_ir.py +++ b/python-dsl/tests/test_ir.py @@ -1,8 +1,8 @@ """Tests for pathfinder.ir module.""" import pytest -from pathfinder.ir import IRType, serialize_ir, validate_ir -from pathfinder import calls, variable +from codepathfinder.ir import IRType, serialize_ir, validate_ir +from codepathfinder import calls, variable, flows, propagates class TestIRType: @@ -90,3 +90,107 @@ def test_variable_matcher_empty_pattern(self): """Test variable_matcher IR with empty pattern is invalid.""" ir = {"type": "variable_matcher", "pattern": "", "wildcard": False} assert validate_ir(ir) is False + + def test_valid_dataflow_ir(self): + """Test validating valid dataflow IR.""" + ir = { + "type": "dataflow", + "sources": [ + {"type": "call_matcher", "patterns": ["source"], "wildcard": False} + ], + "sinks": [ + {"type": "call_matcher", "patterns": ["sink"], "wildcard": False} + ], + "sanitizers": [], + "propagation": [], + "scope": "global", + } + assert validate_ir(ir) is True + + def test_dataflow_missing_sources(self): + """Test dataflow IR without 'sources' is invalid.""" + ir = { + "type": "dataflow", + "sinks": [{"type": "call_matcher"}], + "sanitizers": [], + "propagation": [], + "scope": "global", + } + assert validate_ir(ir) is False + + def test_dataflow_empty_sources(self): + """Test dataflow IR with empty sources list is invalid.""" + ir = { + "type": "dataflow", + "sources": [], + "sinks": [{"type": "call_matcher"}], + "sanitizers": [], + "propagation": [], + "scope": "global", + } + assert validate_ir(ir) is False + + def test_dataflow_missing_sinks(self): + """Test dataflow IR without 'sinks' is invalid.""" + ir = { + "type": "dataflow", + "sources": [{"type": "call_matcher"}], + "sanitizers": [], + "propagation": [], + "scope": "global", + } + assert validate_ir(ir) is False + + def test_dataflow_empty_sinks(self): + """Test dataflow IR with empty sinks list is invalid.""" + ir = { + "type": "dataflow", + "sources": [{"type": "call_matcher"}], + "sinks": [], + "sanitizers": [], + "propagation": [], + "scope": "global", + } + assert validate_ir(ir) is False + + def test_dataflow_invalid_scope(self): + """Test dataflow IR with invalid scope is invalid.""" + ir = { + "type": "dataflow", + "sources": [{"type": "call_matcher"}], + "sinks": [{"type": "call_matcher"}], + "sanitizers": [], + "propagation": [], + "scope": "invalid", + } + assert validate_ir(ir) is False + + def test_dataflow_local_scope_valid(self): + """Test dataflow IR with local scope is valid.""" + ir = { + "type": "dataflow", + "sources": [ + {"type": "call_matcher", "patterns": ["source"], "wildcard": False} + ], + "sinks": [ + {"type": "call_matcher", "patterns": ["sink"], "wildcard": False} + ], + "sanitizers": [], + "propagation": [], + "scope": "local", + } + assert validate_ir(ir) is True + + def test_serialize_dataflow_matcher(self): + """Test serializing DataflowMatcher.""" + matcher = flows( + from_sources=calls("request.GET"), + to_sinks=calls("execute"), + propagates_through=[propagates.assignment()], + ) + ir = serialize_ir(matcher) + + assert ir["type"] == "dataflow" + assert len(ir["sources"]) == 1 + assert len(ir["sinks"]) == 1 + assert len(ir["propagation"]) == 1 diff --git a/python-dsl/tests/test_matchers.py b/python-dsl/tests/test_matchers.py index 07e08d5d..db30738e 100644 --- a/python-dsl/tests/test_matchers.py +++ b/python-dsl/tests/test_matchers.py @@ -1,8 +1,8 @@ """Tests for pathfinder.matchers module.""" import pytest -from pathfinder import calls, variable -from pathfinder.matchers import CallMatcher, VariableMatcher +from codepathfinder import calls, variable +from codepathfinder.matchers import CallMatcher, VariableMatcher class TestCallMatcher: diff --git a/python-dsl/tests/test_propagation.py b/python-dsl/tests/test_propagation.py new file mode 100644 index 00000000..c4c71240 --- /dev/null +++ b/python-dsl/tests/test_propagation.py @@ -0,0 +1,174 @@ +""" +Tests for taint propagation primitives. +""" + +from codepathfinder.propagation import ( + PropagationType, + PropagationPrimitive, + propagates, + create_propagation_list, +) + + +class TestPropagationType: + """Tests for PropagationType enum.""" + + def test_phase1_types_exist(self): + """Phase 1 propagation types are defined.""" + assert PropagationType.ASSIGNMENT.value == "assignment" + assert PropagationType.FUNCTION_ARGS.value == "function_args" + assert PropagationType.FUNCTION_RETURNS.value == "function_returns" + + def test_phase2_types_exist(self): + """Phase 2 propagation types are defined (not implemented yet).""" + assert PropagationType.STRING_CONCAT.value == "string_concat" + assert PropagationType.STRING_FORMAT.value == "string_format" + + def test_all_enum_values_unique(self): + """All enum values are unique.""" + values = [t.value for t in PropagationType] + assert len(values) == len(set(values)) + + +class TestPropagationPrimitive: + """Tests for PropagationPrimitive base class.""" + + def test_create_primitive_without_metadata(self): + """Can create primitive without metadata.""" + prim = PropagationPrimitive(PropagationType.ASSIGNMENT) + assert prim.type == PropagationType.ASSIGNMENT + assert prim.metadata == {} + + def test_create_primitive_with_metadata(self): + """Can create primitive with metadata.""" + metadata = {"key": "value"} + prim = PropagationPrimitive(PropagationType.ASSIGNMENT, metadata) + assert prim.type == PropagationType.ASSIGNMENT + assert prim.metadata == metadata + + def test_to_ir_without_metadata(self): + """to_ir() returns correct JSON IR without metadata.""" + prim = PropagationPrimitive(PropagationType.ASSIGNMENT) + ir = prim.to_ir() + assert ir == {"type": "assignment", "metadata": {}} + + def test_to_ir_with_metadata(self): + """to_ir() returns correct JSON IR with metadata.""" + metadata = {"foo": "bar", "baz": 42} + prim = PropagationPrimitive(PropagationType.FUNCTION_ARGS, metadata) + ir = prim.to_ir() + assert ir == {"type": "function_args", "metadata": metadata} + + def test_repr(self): + """__repr__ returns readable string.""" + prim = PropagationPrimitive(PropagationType.ASSIGNMENT) + assert repr(prim) == "propagates.assignment()" + + +class TestPropagatesNamespace: + """Tests for propagates namespace (Phase 1 methods).""" + + def test_assignment_returns_primitive(self): + """propagates.assignment() returns PropagationPrimitive.""" + prim = propagates.assignment() + assert isinstance(prim, PropagationPrimitive) + assert prim.type == PropagationType.ASSIGNMENT + + def test_assignment_ir(self): + """propagates.assignment() serializes correctly.""" + prim = propagates.assignment() + ir = prim.to_ir() + assert ir == {"type": "assignment", "metadata": {}} + + def test_function_args_returns_primitive(self): + """propagates.function_args() returns PropagationPrimitive.""" + prim = propagates.function_args() + assert isinstance(prim, PropagationPrimitive) + assert prim.type == PropagationType.FUNCTION_ARGS + + def test_function_args_ir(self): + """propagates.function_args() serializes correctly.""" + prim = propagates.function_args() + ir = prim.to_ir() + assert ir == {"type": "function_args", "metadata": {}} + + def test_function_returns_returns_primitive(self): + """propagates.function_returns() returns PropagationPrimitive.""" + prim = propagates.function_returns() + assert isinstance(prim, PropagationPrimitive) + assert prim.type == PropagationType.FUNCTION_RETURNS + + def test_function_returns_ir(self): + """propagates.function_returns() serializes correctly.""" + prim = propagates.function_returns() + ir = prim.to_ir() + assert ir == {"type": "function_returns", "metadata": {}} + + +class TestCreatePropagationList: + """Tests for create_propagation_list helper.""" + + def test_empty_list(self): + """Empty list returns empty JSON IR list.""" + ir_list = create_propagation_list([]) + assert ir_list == [] + + def test_single_primitive(self): + """Single primitive returns single JSON IR dict.""" + prims = [propagates.assignment()] + ir_list = create_propagation_list(prims) + assert len(ir_list) == 1 + assert ir_list[0] == {"type": "assignment", "metadata": {}} + + def test_multiple_primitives(self): + """Multiple primitives return multiple JSON IR dicts.""" + prims = [ + propagates.assignment(), + propagates.function_args(), + propagates.function_returns(), + ] + ir_list = create_propagation_list(prims) + assert len(ir_list) == 3 + assert ir_list[0] == {"type": "assignment", "metadata": {}} + assert ir_list[1] == {"type": "function_args", "metadata": {}} + assert ir_list[2] == {"type": "function_returns", "metadata": {}} + + def test_preserves_order(self): + """Primitive order is preserved in JSON IR.""" + prims = [ + propagates.function_returns(), + propagates.assignment(), + propagates.function_args(), + ] + ir_list = create_propagation_list(prims) + assert ir_list[0]["type"] == "function_returns" + assert ir_list[1]["type"] == "assignment" + assert ir_list[2]["type"] == "function_args" + + +class TestPropagationIntegration: + """Integration tests for propagation primitives.""" + + def test_typical_sql_injection_propagation(self): + """Typical SQL injection uses assignment + function_args.""" + prims = [propagates.assignment(), propagates.function_args()] + ir_list = create_propagation_list(prims) + assert len(ir_list) == 2 + assert all(isinstance(ir, dict) for ir in ir_list) + + def test_typical_command_injection_propagation(self): + """Typical command injection uses all three Phase 1 primitives.""" + prims = [ + propagates.assignment(), + propagates.function_args(), + propagates.function_returns(), + ] + ir_list = create_propagation_list(prims) + assert len(ir_list) == 3 + + def test_minimal_propagation(self): + """Can use just assignment for intra-procedural analysis.""" + prims = [propagates.assignment()] + ir_list = create_propagation_list(prims) + assert len(ir_list) == 1 + assert ir_list[0]["type"] == "assignment" diff --git a/sourcecode-parser/build.gradle b/sourcecode-parser/build.gradle index f1820d13..439befa3 100644 --- a/sourcecode-parser/build.gradle +++ b/sourcecode-parser/build.gradle @@ -118,12 +118,12 @@ task createTag(dependsOn: tagExists) { // Python DSL tasks task testPython(type: Exec) { workingDir '../python-dsl' - commandLine 'pytest', '--cov=pathfinder', '--cov-report=term-missing', '--cov-fail-under=95' + commandLine 'pytest', '--cov=codepathfinder', '--cov-report=term-missing', '--cov-fail-under=95' } task lintPython(type: Exec) { workingDir '../python-dsl' - commandLine 'sh', '-c', 'black --check pathfinder/ tests/ && ruff check pathfinder/ tests/ && mypy pathfinder/' + commandLine 'sh', '-c', 'black --check codepathfinder/ tests/ && ruff check codepathfinder/ tests/ && mypy codepathfinder/' } task buildPython(type: Exec, dependsOn: ['testPython', 'lintPython']) {