Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python-dsl/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ ENV/
# OS
.DS_Store
Thumbs.db

# Mypy
.mypy_cache/
83 changes: 74 additions & 9 deletions python-dsl/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Pathfinder Python DSL
# Code-Pathfinder Python DSL

Python DSL for defining security patterns in code-pathfinder.

Expand All @@ -11,7 +11,7 @@ pip install codepathfinder
## Quick Start

```python
from pathfinder import rule, calls, variable
from codepathfinder import rule, calls, variable

@rule(id="code-injection", severity="critical", cwe="CWE-94")
def detect_eval():
Expand All @@ -31,7 +31,7 @@ def detect_user_input():
Matches function/method calls.

```python
from pathfinder import calls
from codepathfinder import calls

# Exact match
calls("eval")
Expand All @@ -49,7 +49,7 @@ calls("*.execute") # Matches cursor.execute, conn.execute, etc.
Matches variable references.

```python
from pathfinder import variable
from codepathfinder import variable

# Exact match
variable("user_input")
Expand All @@ -59,12 +59,77 @@ variable("user_*") # Matches user_input, user_data, etc.
variable("*_id") # Matches user_id, post_id, etc.
```

## Dataflow Analysis

### `flows(from_sources, to_sinks, sanitized_by=None, propagates_through=None, scope="global")`

Tracks tainted data flow from sources to sinks for OWASP Top 10 vulnerability detection.

```python
from codepathfinder import flows, calls, propagates

# SQL Injection
flows(
from_sources=calls("request.GET", "request.POST"),
to_sinks=calls("execute", "executemany"),
sanitized_by=calls("quote_sql"),
propagates_through=[
propagates.assignment(),
propagates.function_args(),
],
scope="global"
)

# Command Injection
flows(
from_sources=calls("request.POST"),
to_sinks=calls("os.system", "subprocess.call"),
sanitized_by=calls("shlex.quote"),
propagates_through=[
propagates.assignment(),
propagates.function_args(),
propagates.function_returns(),
]
)

# Path Traversal
flows(
from_sources=calls("request.GET"),
to_sinks=calls("open", "os.path.join"),
sanitized_by=calls("os.path.abspath"),
propagates_through=[propagates.assignment()],
scope="local"
)
```

**Parameters:**
- `from_sources`: Source matcher(s) where taint originates (e.g., user input)
- `to_sinks`: Sink matcher(s) for dangerous functions
- `sanitized_by` (optional): Sanitizer matcher(s) that neutralize taint
- `propagates_through` (optional): List of propagation primitives (EXPLICIT!)
- `scope`: `"local"` (intra-procedural) or `"global"` (inter-procedural, default)

### Propagation Primitives

Propagation primitives define HOW taint flows through code:

```python
from codepathfinder import propagates

# Phase 1 (Available Now):
propagates.assignment() # x = tainted
propagates.function_args() # func(tainted)
propagates.function_returns() # return tainted
```

**Important:** Propagation is EXPLICIT - you must specify which primitives to enable. No defaults are applied.

## Rule Decorator

The `@rule` decorator marks functions as security rules with metadata.

```python
from pathfinder import rule, calls
from codepathfinder import rule, calls

@rule(
id="sqli-001",
Expand All @@ -90,7 +155,7 @@ The function docstring becomes the rule description.
Rules serialize to JSON Intermediate Representation (IR) for the Go executor:

```python
from pathfinder import rule, calls
from codepathfinder import rule, calls
import json

@rule(id="test", severity="high")
Expand Down Expand Up @@ -132,13 +197,13 @@ pip install -e ".[dev]"
pytest

# Format code
black pathfinder/ tests/
black codepathfinder/ tests/

# Lint
ruff check pathfinder/ tests/
ruff check codepathfinder/ tests/

# Type check
mypy pathfinder/
mypy codepathfinder/
```

## Requirements
Expand Down
32 changes: 32 additions & 0 deletions python-dsl/codepathfinder/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
codepathfinder - Python DSL for static analysis security patterns

Examples:
Basic matchers:
>>> from codepathfinder import calls, variable
>>> calls("eval")
>>> variable("user_input")

Rule definition:
>>> from codepathfinder import rule, calls
>>> @rule(id="test", severity="high")
>>> def detect_eval():
>>> return calls("eval")

Dataflow analysis:
>>> from codepathfinder import flows, calls, propagates
>>> flows(
... from_sources=calls("request.GET"),
... to_sinks=calls("execute"),
... propagates_through=[propagates.assignment()]
... )
"""

__version__ = "1.0.0"

from .matchers import calls, variable
from .decorators import rule
from .dataflow import flows
from .propagation import propagates

__all__ = ["calls", "variable", "rule", "flows", "propagates", "__version__"]
190 changes: 190 additions & 0 deletions python-dsl/codepathfinder/dataflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
"""
Dataflow matcher for taint analysis.

The flows() function is the core of OWASP Top 10 pattern detection.
It describes how tainted data flows from sources to sinks.
"""

from typing import List, Optional, Union
from .matchers import CallMatcher
from .propagation import PropagationPrimitive, create_propagation_list
from .ir import IRType


class DataflowMatcher:
"""
Matches tainted data flows from sources to sinks.

This is the primary matcher for security vulnerabilities like:
- SQL Injection (A03:2021)
- Command Injection (A03:2021)
- SSRF (A10:2021)
- Path Traversal (A01:2021)
- Insecure Deserialization (A08:2021)

Attributes:
sources: Matchers for taint sources (e.g., user input)
sinks: Matchers for dangerous sinks (e.g., eval, execute)
sanitizers: Optional matchers for sanitizer functions
propagates_through: List of propagation primitives (EXPLICIT!)
scope: "local" (same function) or "global" (cross-function)
"""

def __init__(
self,
from_sources: Union[CallMatcher, List[CallMatcher]],
to_sinks: Union[CallMatcher, List[CallMatcher]],
sanitized_by: Optional[Union[CallMatcher, List[CallMatcher]]] = None,
propagates_through: Optional[List[PropagationPrimitive]] = None,
scope: str = "global",
):
"""
Args:
from_sources: Source matcher(s) - where taint originates
to_sinks: Sink matcher(s) - dangerous functions
sanitized_by: Optional sanitizer matcher(s)
propagates_through: EXPLICIT list of propagation primitives
(default: None = no propagation!)
scope: "local" (intra-procedural) or "global" (inter-procedural)

Raises:
ValueError: If sources/sinks are empty, scope invalid, etc.

Examples:
# SQL Injection
flows(
from_sources=calls("request.GET", "request.POST"),
to_sinks=calls("execute", "executemany"),
sanitized_by=calls("quote_sql"),
propagates_through=[
propagates.assignment(),
propagates.function_args(),
],
scope="global"
)
"""
# Validate sources
if isinstance(from_sources, CallMatcher):
from_sources = [from_sources]
if not from_sources:
raise ValueError("flows() requires at least one source")
self.sources = from_sources

# Validate sinks
if isinstance(to_sinks, CallMatcher):
to_sinks = [to_sinks]
if not to_sinks:
raise ValueError("flows() requires at least one sink")
self.sinks = to_sinks

# Validate sanitizers
if sanitized_by is None:
sanitized_by = []
elif isinstance(sanitized_by, CallMatcher):
sanitized_by = [sanitized_by]
self.sanitizers = sanitized_by

# Validate propagation (EXPLICIT!)
if propagates_through is None:
propagates_through = [] # NO DEFAULT! Developer must specify!
self.propagates_through = propagates_through

# Validate scope
if scope not in ["local", "global"]:
raise ValueError(f"scope must be 'local' or 'global', got '{scope}'")
self.scope = scope

def to_ir(self) -> dict:
"""
Serialize to JSON IR for Go executor.

Returns:
{
"type": "dataflow",
"sources": [
{"type": "call_matcher", "patterns": ["request.GET"], ...}
],
"sinks": [
{"type": "call_matcher", "patterns": ["execute"], ...}
],
"sanitizers": [
{"type": "call_matcher", "patterns": ["quote_sql"], ...}
],
"propagation": [
{"type": "assignment", "metadata": {}},
{"type": "function_args", "metadata": {}}
],
"scope": "global"
}
"""
return {
"type": IRType.DATAFLOW.value,
"sources": [src.to_ir() for src in self.sources],
"sinks": [sink.to_ir() for sink in self.sinks],
"sanitizers": [san.to_ir() for san in self.sanitizers],
"propagation": create_propagation_list(self.propagates_through),
"scope": self.scope,
}

def __repr__(self) -> str:
src_count = len(self.sources)
sink_count = len(self.sinks)
prop_count = len(self.propagates_through)
return (
f"flows(sources={src_count}, sinks={sink_count}, "
f"propagation={prop_count}, scope='{self.scope}')"
)


# Public API
def flows(
from_sources: Union[CallMatcher, List[CallMatcher]],
to_sinks: Union[CallMatcher, List[CallMatcher]],
sanitized_by: Optional[Union[CallMatcher, List[CallMatcher]]] = None,
propagates_through: Optional[List[PropagationPrimitive]] = None,
scope: str = "global",
) -> DataflowMatcher:
"""
Create a dataflow matcher for taint analysis.

This is the PRIMARY matcher for OWASP Top 10 vulnerabilities.

Args:
from_sources: Where taint originates (e.g., user input)
to_sinks: Dangerous functions that consume tainted data
sanitized_by: Optional functions that neutralize taint
propagates_through: HOW taint flows (MUST be explicit!)
scope: "local" or "global" analysis

Returns:
DataflowMatcher instance

Examples:
>>> from codepathfinder import flows, calls, propagates
>>>
>>> # SQL Injection
>>> flows(
... from_sources=calls("request.GET"),
... to_sinks=calls("execute"),
... propagates_through=[propagates.assignment()]
... )
>>>
>>> # Command Injection with sanitization
>>> flows(
... from_sources=calls("request.POST"),
... to_sinks=calls("os.system", "subprocess.call"),
... sanitized_by=calls("shlex.quote"),
... propagates_through=[
... propagates.assignment(),
... propagates.function_args()
... ],
... scope="global"
... )
"""
return DataflowMatcher(
from_sources=from_sources,
to_sinks=to_sinks,
sanitized_by=sanitized_by,
propagates_through=propagates_through,
scope=scope,
)
16 changes: 16 additions & 0 deletions python-dsl/pathfinder/ir.py → python-dsl/codepathfinder/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,20 @@ def validate_ir(ir: Dict[str, Any]) -> bool:
and isinstance(ir["wildcard"], bool)
)

if ir_type == IRType.DATAFLOW:
return (
"sources" in ir
and isinstance(ir["sources"], list)
and len(ir["sources"]) > 0
and "sinks" in ir
and isinstance(ir["sinks"], list)
and len(ir["sinks"]) > 0
and "sanitizers" in ir
and isinstance(ir["sanitizers"], list)
and "propagation" in ir
and isinstance(ir["propagation"], list)
and "scope" in ir
and ir["scope"] in ["local", "global"]
)

return True
Loading
Loading