From b0ca6d28de01d43ae845f6602e0be08dfb5d0a21 Mon Sep 17 00:00:00 2001 From: tommylauren Date: Wed, 1 Apr 2026 10:57:06 -0400 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20ScopeBlind=20protect-mcp=20integrat?= =?UTF-8?q?ion=20=E2=80=94=20Cedar=20policy=20enforcement=20and=20verifiab?= =?UTF-8?q?le=20decision=20receipts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `packages/agentmesh-integrations/scopeblind-protect-mcp/` with four components: - CedarPolicyBridge: maps Cedar allow/deny decisions into AGT evaluate() — Cedar deny is authoritative and cannot be overridden by trust scores - ReceiptVerifier: validates Ed25519-signed decision receipt structure, converts to AGT-compatible context - SpendingGate: enforces issuer-blind spending authority with trust-score gating and utilization band checks - scopeblind_context(): builds AGT-compatible context from protect-mcp artifacts Key architectural difference from mcp-trust-proxy: mcp-trust-proxy gates on trust scores (soft signals). protect-mcp gates on Cedar policies (formal, deterministic, auditable). Decision receipts provide cryptographic proof via IETF Internet-Draft draft-farley-acta-signed-receipts. 36 tests covering: Cedar decision parsing, policy bridge authorization, receipt validation, spending gate limits/categories/utilization bands, and AGT context shape compatibility. protect-mcp: https://www.npmjs.com/package/protect-mcp (v0.4.6, MIT) Docs: https://scopeblind.com/docs/protect-mcp Co-Authored-By: Claude Opus 4.6 (1M context) --- .../scopeblind-protect-mcp/LICENSE | 21 + .../scopeblind-protect-mcp/README.md | 118 ++++ .../scopeblind-protect-mcp/SECURITY.md | 13 + .../scopeblind-protect-mcp/pyproject.toml | 30 + .../scopeblind_protect_mcp/__init__.py | 33 ++ .../scopeblind_protect_mcp/adapter.py | 516 ++++++++++++++++++ .../scopeblind-protect-mcp/tests/__init__.py | 2 + .../tests/test_scopeblind_adapter.py | 331 +++++++++++ 8 files changed, 1064 insertions(+) create mode 100644 packages/agentmesh-integrations/scopeblind-protect-mcp/LICENSE create mode 100644 packages/agentmesh-integrations/scopeblind-protect-mcp/README.md create mode 100644 packages/agentmesh-integrations/scopeblind-protect-mcp/SECURITY.md create mode 100644 packages/agentmesh-integrations/scopeblind-protect-mcp/pyproject.toml create mode 100644 packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/__init__.py create mode 100644 packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py create mode 100644 packages/agentmesh-integrations/scopeblind-protect-mcp/tests/__init__.py create mode 100644 packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/LICENSE b/packages/agentmesh-integrations/scopeblind-protect-mcp/LICENSE new file mode 100644 index 00000000..22aed37e --- /dev/null +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) Microsoft Corporation. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/README.md b/packages/agentmesh-integrations/scopeblind-protect-mcp/README.md new file mode 100644 index 00000000..90a8e0f1 --- /dev/null +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/README.md @@ -0,0 +1,118 @@ +# ScopeBlind protect-mcp Integration + +AgentMesh integration for [protect-mcp](https://www.npmjs.com/package/protect-mcp) — Cedar policy enforcement and verifiable decision receipts for MCP tool calls. + +## What protect-mcp does + +protect-mcp is a security gateway that wraps any MCP server with: + +- **Cedar policy evaluation** (AWS Cedar via WASM, sub-ms latency) +- **Ed25519 decision receipts** (signed proof of every allow/deny decision) +- **Issuer-blind verification** (verifier can confirm receipt validity without learning who issued it) +- **Spending authority** (prove an agent's purchase is authorized without revealing org details) + +Published on npm: `npx protect-mcp@latest` | [GitHub](https://github.com/scopeblind/scopeblind-gateway) | [Docs](https://scopeblind.com/docs/protect-mcp) + +## How it complements AGT + +| Layer | AGT | protect-mcp | +|-------|-----|-------------| +| Analysis | MCP Security Scanner (static) | Cedar WASM (runtime) | +| Identity | DID + trust scores | Ed25519 passports + VOPRF | +| Decisions | PolicyEngine evaluate() | Cedar allow/deny + signed receipts | +| Proof | Audit log | Cryptographic receipts (offline-verifiable) | +| Privacy | Trust scores are visible | Issuer-blind (verifier learns nothing about issuer) | + +They compose naturally: Cedar is the hard constraint, AGT trust is the soft signal. + +## Components + +| Component | Purpose | +|-----------|---------| +| `CedarPolicyBridge` | Maps Cedar allow/deny into AGT `evaluate()` — Cedar deny is authoritative | +| `ReceiptVerifier` | Validates receipt structure, extracts AGT-compatible metadata | +| `SpendingGate` | Enforces issuer-blind spending authority with trust-score gating | +| `scopeblind_context()` | Builds AGT-compatible context dict from protect-mcp artifacts | + +## Quick Start + +```python +from scopeblind_protect_mcp import CedarDecision, CedarPolicyBridge, scopeblind_context + +# Cedar denied this tool call (e.g., clinejection policy blocked shell_exec) +decision = CedarDecision( + effect="deny", + tool_name="shell_exec", + policy_ids=["sb-clinejection-004"], +) + +# Bridge into AGT — Cedar deny is authoritative regardless of trust score +bridge = CedarPolicyBridge(trust_floor=300) +result = bridge.evaluate( + cedar_decision=decision, + agent_trust_score=900, # high trust doesn't override Cedar deny + agent_did="did:mesh:agent-1", +) +assert not result["allowed"] # Cedar deny is final + +# Build AGT-compatible context for policy engine +ctx = scopeblind_context(cedar_decision=decision) +# Pass to AGT: policy_engine.evaluate(action="tool_call", context=ctx) +``` + +## Spending Authority + +```python +from scopeblind_protect_mcp import SpendingGate + +gate = SpendingGate( + max_single_amount=5000.0, + high_util_trust_floor=500, + blocked_categories=["gambling"], +) + +# Low-value spend: auto-approved +result = gate.evaluate_spend(amount=50.0, category="cloud_compute", agent_trust_score=300) +assert result["allowed"] + +# High utilization + low trust: denied +result = gate.evaluate_spend( + amount=50.0, + utilization_band="high", + agent_trust_score=200, +) +assert not result["allowed"] +``` + +## Receipt Verification + +```python +from scopeblind_protect_mcp import ReceiptVerifier + +verifier = ReceiptVerifier() + +receipt = { + "type": "scopeblind:decision", + "payload": {"effect": "allow", "tool": "web_search", "timestamp": 1711929600}, + "signature": "base64_ed25519_signature", + "publicKey": "base64_ed25519_public_key", +} + +result = verifier.validate_structure(receipt) +assert result["valid"] + +# Convert to AGT context +ctx = verifier.to_agt_context(receipt) +assert ctx["issuer_blind"] is True +``` + +## Design Principles + +1. **Cedar deny is authoritative.** No trust score, no override. Formal policy beats behavioral signal. +2. **Receipts are issuer-blind.** The verifier confirms validity without learning which organization issued the receipt. This prevents supply-chain surveillance. +3. **Composable, not competing.** protect-mcp handles the tool-call boundary. AGT handles the agent lifecycle. This adapter maps between them. +4. **Offline-verifiable.** Receipts can be verified without contacting the issuer, using `@veritasacta/verify`. + +## Protocol + +protect-mcp receipts follow the [Veritas Acta signed receipt format](https://datatracker.ietf.org/doc/draft-farley-acta-signed-receipts/), an IETF Internet-Draft for portable, verifiable decision artifacts. diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/SECURITY.md b/packages/agentmesh-integrations/scopeblind-protect-mcp/SECURITY.md new file mode 100644 index 00000000..c43ac7b5 --- /dev/null +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/SECURITY.md @@ -0,0 +1,13 @@ +# Security + +## Reporting + +Please report security vulnerabilities via https://github.com/microsoft/agent-governance-toolkit/security/advisories + +For ScopeBlind-specific issues: tommy@scopeblind.com + +## Design + +This integration does not handle cryptographic material directly. +Receipt verification and key management are delegated to the `protect-mcp` runtime +and the `@veritasacta/verify` offline verifier. diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/pyproject.toml b/packages/agentmesh-integrations/scopeblind-protect-mcp/pyproject.toml new file mode 100644 index 00000000..d55a73bc --- /dev/null +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/pyproject.toml @@ -0,0 +1,30 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "scopeblind-protect-mcp" +version = "0.1.0" +description = "AgentMesh integration for ScopeBlind protect-mcp — Cedar policy enforcement and verifiable decision receipts for MCP tool calls" +readme = "README.md" +license = "MIT" +requires-python = ">=3.9" +authors = [ + { name = "ScopeBlind Pty Ltd" } +] +keywords = ["mcp", "cedar", "policy", "receipts", "agentmesh", "scopeblind", "trust"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", +] +dependencies = [] + +[project.optional-dependencies] +dev = ["pytest>=7.0"] + +[project.urls] +Homepage = "https://github.com/microsoft/agent-governance-toolkit" +Repository = "https://github.com/microsoft/agent-governance-toolkit" +Documentation = "https://scopeblind.com/docs/protect-mcp" diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/__init__.py b/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/__init__.py new file mode 100644 index 00000000..159d3c42 --- /dev/null +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/__init__.py @@ -0,0 +1,33 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +""" +ScopeBlind protect-mcp integration for AgentMesh. + +Bridges protect-mcp's Cedar policy enforcement and Ed25519 decision receipts +into AGT's PolicyEngine as verifiable trust signals. + +protect-mcp provides runtime enforcement (evaluate Cedar policies on every +MCP tool call). AGT provides governance infrastructure (trust scoring, +identity, SLOs). This adapter connects them: + +- CedarPolicyBridge: maps Cedar allow/deny decisions into AGT evaluate() +- ReceiptVerifier: validates Ed25519-signed decision receipts offline +- SpendingGate: enforces issuer-blind spending authority checks +- scopeblind_context(): builds AGT-compatible context from protect-mcp artifacts +""" + +from .adapter import ( + CedarDecision, + CedarPolicyBridge, + ReceiptVerifier, + SpendingGate, + scopeblind_context, +) + +__all__ = [ + "CedarDecision", + "CedarPolicyBridge", + "ReceiptVerifier", + "SpendingGate", + "scopeblind_context", +] diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py b/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py new file mode 100644 index 00000000..18e88095 --- /dev/null +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py @@ -0,0 +1,516 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +""" +ScopeBlind protect-mcp adapter for AgentMesh. + +protect-mcp operates at the MCP transport layer — it intercepts tool calls, +evaluates Cedar policies via WASM, and signs verifiable decision receipts +(Ed25519 + JCS canonicalization). This adapter bridges those runtime artifacts +into AGT's PolicyEngine interface. + +Architecture: + protect-mcp governs the **tool call boundary** (Cedar policy evaluation). + AGT governs the **agent lifecycle** (trust scores, SLOs, circuit breakers). + This adapter maps between the two so they compose rather than compete. + +Key difference from mcp-trust-proxy: + mcp-trust-proxy gates on trust scores (soft signals). + protect-mcp gates on Cedar policies (formal, auditable, deterministic). + Decision receipts provide cryptographic proof of what was decided and why. +""" + +from __future__ import annotations + +import hashlib +import json +import time +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + + +# --------------------------------------------------------------------------- +# Cedar decision representation +# --------------------------------------------------------------------------- + + +@dataclass +class CedarDecision: + """ + A Cedar policy evaluation result from protect-mcp. + + Cedar evaluates to allow or deny with an optional list of policy IDs + that contributed to the decision. This is deterministic — the same + policy set and context always produces the same decision. + """ + + effect: str # "allow" or "deny" + tool_name: str + policy_ids: List[str] = field(default_factory=list) + diagnostics: Dict[str, Any] = field(default_factory=dict) + evaluated_at: float = field(default_factory=time.time) + + @property + def allowed(self) -> bool: + return self.effect == "allow" + + def to_dict(self) -> Dict[str, Any]: + return { + "effect": self.effect, + "tool": self.tool_name, + "policy_ids": self.policy_ids, + "diagnostics": self.diagnostics, + "evaluated_at": self.evaluated_at, + } + + @classmethod + def from_receipt(cls, receipt: Dict[str, Any]) -> "CedarDecision": + """Parse a CedarDecision from a protect-mcp receipt payload.""" + payload = receipt.get("payload", receipt) + return cls( + effect=payload.get("effect", payload.get("decision", "deny")), + tool_name=payload.get("tool", payload.get("resource", "")), + policy_ids=payload.get("policy_ids", []), + diagnostics=payload.get("diagnostics", {}), + evaluated_at=payload.get("timestamp", time.time()), + ) + + +# --------------------------------------------------------------------------- +# Cedar → AGT policy bridge +# --------------------------------------------------------------------------- + + +class CedarPolicyBridge: + """ + Maps Cedar policy decisions into AGT's PolicyEngine evaluate() interface. + + Cedar is a formal policy language (open-sourced by AWS) that evaluates + allow/deny decisions based on principal, action, resource, and context. + protect-mcp uses Cedar WASM for sub-millisecond evaluation at the MCP + transport layer. + + This bridge lets AGT consume Cedar decisions as hard constraints: + - Cedar deny → AGT deny (non-negotiable) + - Cedar allow → check AGT trust score (soft signal) + + This preserves Cedar's formal guarantees while layering AGT's + behavioral trust on top. + """ + + def __init__( + self, + trust_floor: int = 0, + trust_bonus_per_allow: int = 50, + deny_penalty: int = 200, + require_receipt: bool = False, + ): + self.trust_floor = trust_floor + self.trust_bonus = trust_bonus_per_allow + self.deny_penalty = deny_penalty + self.require_receipt = require_receipt + self._history: List[Dict[str, Any]] = [] + + def evaluate( + self, + cedar_decision: CedarDecision, + agent_trust_score: int = 0, + agent_did: str = "", + receipt: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """ + Evaluate an MCP tool call through both Cedar and AGT lenses. + + Returns an AGT-compatible evaluation result with: + - allowed: final decision (Cedar deny is always final) + - cedar_effect: the Cedar decision + - adjusted_trust: trust score after Cedar signal + - reason: human-readable explanation + """ + result: Dict[str, Any] = { + "tool": cedar_decision.tool_name, + "agent_did": agent_did, + "cedar_effect": cedar_decision.effect, + "policy_ids": cedar_decision.policy_ids, + "timestamp": time.time(), + } + + # Receipt required but not provided + if self.require_receipt and receipt is None: + result["allowed"] = False + result["reason"] = "Decision receipt required but not provided" + result["adjusted_trust"] = max(0, agent_trust_score - self.deny_penalty) + self._history.append(result) + return result + + # Cedar deny is authoritative — not overridable by trust score + if not cedar_decision.allowed: + result["allowed"] = False + result["reason"] = ( + f"Cedar policy deny on '{cedar_decision.tool_name}' " + f"(policies: {cedar_decision.policy_ids})" + ) + result["adjusted_trust"] = max(0, agent_trust_score - self.deny_penalty) + self._history.append(result) + return result + + # Cedar allow — layer AGT trust check + adjusted = min(1000, agent_trust_score + self.trust_bonus) + if adjusted < self.trust_floor: + result["allowed"] = False + result["reason"] = ( + f"Cedar allowed but trust score {adjusted} below floor {self.trust_floor}" + ) + result["adjusted_trust"] = adjusted + self._history.append(result) + return result + + result["allowed"] = True + result["reason"] = "Cedar allow + trust check passed" + result["adjusted_trust"] = adjusted + + if receipt: + result["receipt_hash"] = hashlib.sha256( + json.dumps(receipt, sort_keys=True).encode() + ).hexdigest()[:16] + + self._history.append(result) + return result + + def get_history(self) -> List[Dict[str, Any]]: + return list(self._history) + + def get_stats(self) -> Dict[str, Any]: + total = len(self._history) + allowed = sum(1 for r in self._history if r.get("allowed")) + cedar_denies = sum( + 1 for r in self._history if r.get("cedar_effect") == "deny" + ) + return { + "total_evaluations": total, + "allowed": allowed, + "denied": total - allowed, + "cedar_denies": cedar_denies, + "trust_denies": total - allowed - cedar_denies, + } + + +# --------------------------------------------------------------------------- +# Receipt verification +# --------------------------------------------------------------------------- + + +class ReceiptVerifier: + """ + Validates protect-mcp decision receipts. + + Receipts are Ed25519-signed JSON envelopes following the Veritas Acta + artifact format (JCS canonicalization). Each receipt proves: + - What tool call was evaluated + - What Cedar policies applied + - What the decision was (allow/deny) + - When it happened + - Who signed it (tenant public key) + + Crucially, receipts are **issuer-blind** — the verifier can confirm + validity without learning which organization issued the receipt. + This prevents supply-chain surveillance. + + This class validates receipt structure and extracts AGT-compatible + metadata. Cryptographic verification (Ed25519 signature check) + is delegated to @veritasacta/verify or the protect-mcp runtime. + """ + + REQUIRED_FIELDS = {"type", "payload", "signature", "publicKey"} + VALID_TYPES = { + "scopeblind:decision", + "scopeblind:spending_authority", + "scopeblind:policy_evaluation", + "acta:artifact", + } + + def __init__(self, strict: bool = True): + self.strict = strict + self._verified: List[Dict[str, Any]] = [] + + def validate_structure(self, receipt: Dict[str, Any]) -> Dict[str, Any]: + """ + Validate receipt structure (not cryptographic signature). + + Returns a result dict with: + - valid: bool + - receipt_type: str + - tool: str (extracted from payload) + - decision: str (allow/deny) + - reason: str (if invalid) + """ + # Check required fields + missing = self.REQUIRED_FIELDS - set(receipt.keys()) + if missing: + return { + "valid": False, + "reason": f"Missing required fields: {sorted(missing)}", + } + + receipt_type = receipt.get("type", "") + if self.strict and receipt_type not in self.VALID_TYPES: + return { + "valid": False, + "reason": f"Unknown receipt type: {receipt_type}", + "receipt_type": receipt_type, + } + + payload = receipt.get("payload", {}) + if not isinstance(payload, dict): + return {"valid": False, "reason": "Payload must be an object"} + + result = { + "valid": True, + "receipt_type": receipt_type, + "tool": payload.get("tool", payload.get("resource", "")), + "decision": payload.get("effect", payload.get("decision", "")), + "timestamp": payload.get("timestamp"), + "has_signature": bool(receipt.get("signature")), + "has_public_key": bool(receipt.get("publicKey")), + } + + # Spending authority specific fields + if receipt_type == "scopeblind:spending_authority": + result["amount"] = payload.get("amount") + result["currency"] = payload.get("currency", "USD") + result["utilization_band"] = payload.get("utilization_band") + result["category"] = payload.get("category") + + self._verified.append(result) + return result + + def to_agt_context(self, receipt: Dict[str, Any]) -> Dict[str, Any]: + """Convert a validated receipt to AGT-compatible context.""" + validation = self.validate_structure(receipt) + if not validation.get("valid"): + return {"receipt_valid": False, "reason": validation.get("reason", "")} + + payload = receipt.get("payload", {}) + return { + "receipt_valid": True, + "receipt_type": validation["receipt_type"], + "cedar_effect": validation.get("decision", ""), + "tool": validation.get("tool", ""), + "timestamp": validation.get("timestamp"), + "receipt_hash": hashlib.sha256( + json.dumps(receipt, sort_keys=True).encode() + ).hexdigest()[:16], + "issuer_blind": True, # protect-mcp receipts are always issuer-blind + "payload_fields": sorted(payload.keys()), + } + + def get_verification_log(self) -> List[Dict[str, Any]]: + return list(self._verified) + + +# --------------------------------------------------------------------------- +# Spending authority gate +# --------------------------------------------------------------------------- + + +class SpendingGate: + """ + Enforces spending authority for agent financial operations. + + protect-mcp's spending authority system uses VOPRF (RFC 9497) to + produce issuer-blind spending receipts. The receipt proves: + - The spend is within authorized limits + - The category is permitted + - The utilization band (low/medium/high — not exact budget) + + What it does NOT reveal (by design): + - Organization name or identity + - Total budget ceiling + - Exact remaining budget + - Agent identity or delegation chain + + This gate integrates with AGT to add trust-score gating on top of + the cryptographic spending proof. + """ + + UTILIZATION_BANDS = {"low", "medium", "high", "exceeded"} + + def __init__( + self, + max_single_amount: float = 10000.0, + high_util_trust_floor: int = 500, + blocked_categories: Optional[List[str]] = None, + ): + self.max_single_amount = max_single_amount + self.high_util_trust_floor = high_util_trust_floor + self.blocked_categories = set(blocked_categories or []) + self._decisions: List[Dict[str, Any]] = [] + + def evaluate_spend( + self, + amount: float, + currency: str = "USD", + category: str = "general", + utilization_band: str = "low", + agent_trust_score: int = 0, + agent_did: str = "", + receipt: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + """ + Evaluate a spending request through both spending authority and AGT trust. + + Checks (in order): + 1. Amount within single-transaction limit + 2. Category not blocked + 3. Utilization band + trust score check + 4. Receipt present (if high-value) + """ + result: Dict[str, Any] = { + "amount": amount, + "currency": currency, + "category": category, + "utilization_band": utilization_band, + "agent_did": agent_did, + "agent_trust_score": agent_trust_score, + "timestamp": time.time(), + } + + # 1. Amount limit + if amount > self.max_single_amount: + result["allowed"] = False + result["reason"] = ( + f"Amount {amount} {currency} exceeds single-transaction " + f"limit of {self.max_single_amount} {currency}" + ) + self._decisions.append(result) + return result + + if amount <= 0: + result["allowed"] = False + result["reason"] = "Amount must be positive" + self._decisions.append(result) + return result + + # 2. Category check + if category in self.blocked_categories: + result["allowed"] = False + result["reason"] = f"Category '{category}' is blocked" + self._decisions.append(result) + return result + + # 3. Utilization + trust + if utilization_band not in self.UTILIZATION_BANDS: + result["allowed"] = False + result["reason"] = f"Invalid utilization band: {utilization_band}" + self._decisions.append(result) + return result + + if utilization_band == "exceeded": + result["allowed"] = False + result["reason"] = "Budget utilization exceeded" + self._decisions.append(result) + return result + + if utilization_band == "high" and agent_trust_score < self.high_util_trust_floor: + result["allowed"] = False + result["reason"] = ( + f"High utilization requires trust score >= {self.high_util_trust_floor} " + f"(current: {agent_trust_score})" + ) + self._decisions.append(result) + return result + + # 4. Receipt check for high-value transactions + if amount > 1000 and receipt is None: + result["allowed"] = False + result["reason"] = ( + f"Transactions above 1000 {currency} require a spending authority receipt" + ) + self._decisions.append(result) + return result + + result["allowed"] = True + result["reason"] = "Spending authorized" + if receipt: + result["receipt_hash"] = hashlib.sha256( + json.dumps(receipt, sort_keys=True).encode() + ).hexdigest()[:16] + + self._decisions.append(result) + return result + + def get_decisions(self) -> List[Dict[str, Any]]: + return list(self._decisions) + + def get_stats(self) -> Dict[str, Any]: + total = len(self._decisions) + allowed = sum(1 for d in self._decisions if d.get("allowed")) + total_amount = sum(d.get("amount", 0) for d in self._decisions if d.get("allowed")) + return { + "total_requests": total, + "allowed": allowed, + "denied": total - allowed, + "total_authorized_amount": total_amount, + "blocked_categories": sorted(self.blocked_categories), + } + + +# --------------------------------------------------------------------------- +# AGT context builder +# --------------------------------------------------------------------------- + + +def scopeblind_context( + cedar_decision: Optional[CedarDecision] = None, + receipt: Optional[Dict[str, Any]] = None, + spend_amount: Optional[float] = None, + spend_category: Optional[str] = None, + utilization_band: Optional[str] = None, +) -> Dict[str, Any]: + """ + Build an AGT-compatible context dict from protect-mcp artifacts. + + This context can be passed to AGT's PolicyEngine.evaluate() to + incorporate protect-mcp signals into governance decisions. + + Example: + ctx = scopeblind_context( + cedar_decision=decision, + receipt=receipt, + spend_amount=99.50, + ) + agt_result = policy_engine.evaluate(action="purchase", context=ctx) + """ + ctx: Dict[str, Any] = { + "source": "scopeblind:protect-mcp", + "version": "0.4.6", + } + + if cedar_decision is not None: + ctx["cedar"] = { + "effect": cedar_decision.effect, + "tool": cedar_decision.tool_name, + "policy_ids": cedar_decision.policy_ids, + } + + if receipt is not None: + ctx["receipt"] = { + "present": True, + "type": receipt.get("type", ""), + "has_signature": bool(receipt.get("signature")), + "issuer_blind": True, + } + payload = receipt.get("payload", {}) + if payload: + ctx["receipt"]["tool"] = payload.get("tool", payload.get("resource", "")) + ctx["receipt"]["decision"] = payload.get("effect", payload.get("decision", "")) + else: + ctx["receipt"] = {"present": False} + + if spend_amount is not None: + ctx["spending"] = { + "amount": spend_amount, + "category": spend_category or "general", + "utilization_band": utilization_band or "unknown", + } + + return ctx diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/__init__.py b/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/__init__.py new file mode 100644 index 00000000..59e481eb --- /dev/null +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py b/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py new file mode 100644 index 00000000..ef8ce751 --- /dev/null +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py @@ -0,0 +1,331 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +""" +Tests for ScopeBlind protect-mcp AgentMesh adapter. + +Covers: +- Cedar decision parsing and representation +- CedarPolicyBridge: Cedar deny is authoritative, trust layering, receipt requirements +- ReceiptVerifier: structure validation, type checking, AGT context conversion +- SpendingGate: amount limits, category blocks, utilization bands, trust floors +- scopeblind_context: AGT-compatible context shape +""" + +import time + +import pytest + +from scopeblind_protect_mcp import ( + CedarDecision, + CedarPolicyBridge, + ReceiptVerifier, + SpendingGate, + scopeblind_context, +) + + +# ---- Fixtures ---- + + +def make_receipt( + effect="allow", + tool="web_search", + receipt_type="scopeblind:decision", + **extra_payload, +): + """Build a minimal valid receipt for testing.""" + payload = {"effect": effect, "tool": tool, "timestamp": time.time()} + payload.update(extra_payload) + return { + "type": receipt_type, + "payload": payload, + "signature": "ed25519_test_sig_placeholder", + "publicKey": "ed25519_test_pk_placeholder", + } + + +def make_spending_receipt(amount=50.0, category="cloud_compute", band="low"): + """Build a spending authority receipt.""" + return make_receipt( + effect="allow", + tool="purchase", + receipt_type="scopeblind:spending_authority", + amount=amount, + currency="USD", + utilization_band=band, + category=category, + ) + + +# ---- CedarDecision ---- + + +class TestCedarDecision: + def test_allow_decision(self): + d = CedarDecision(effect="allow", tool_name="web_search") + assert d.allowed is True + assert d.effect == "allow" + + def test_deny_decision(self): + d = CedarDecision(effect="deny", tool_name="shell_exec", policy_ids=["sb-001"]) + assert d.allowed is False + assert d.policy_ids == ["sb-001"] + + def test_to_dict_shape(self): + d = CedarDecision(effect="allow", tool_name="read_file") + result = d.to_dict() + assert set(result.keys()) == {"effect", "tool", "policy_ids", "diagnostics", "evaluated_at"} + + def test_from_receipt(self): + receipt = make_receipt(effect="deny", tool="shell_exec") + d = CedarDecision.from_receipt(receipt) + assert d.effect == "deny" + assert d.tool_name == "shell_exec" + assert d.allowed is False + + def test_from_receipt_with_policy_ids(self): + receipt = make_receipt(effect="deny", tool="bash", policy_ids=["sb-clinejection-001"]) + d = CedarDecision.from_receipt(receipt) + assert d.policy_ids == ["sb-clinejection-001"] + + +# ---- CedarPolicyBridge ---- + + +class TestCedarPolicyBridge: + def test_cedar_deny_is_authoritative(self): + """Cedar deny must not be overridable by high trust score.""" + bridge = CedarPolicyBridge() + decision = CedarDecision(effect="deny", tool_name="shell_exec", policy_ids=["sb-001"]) + result = bridge.evaluate(decision, agent_trust_score=999, agent_did="did:mesh:agent-1") + assert result["allowed"] is False + assert result["cedar_effect"] == "deny" + + def test_cedar_allow_passes(self): + bridge = CedarPolicyBridge() + decision = CedarDecision(effect="allow", tool_name="web_search") + result = bridge.evaluate(decision, agent_trust_score=500, agent_did="did:mesh:agent-1") + assert result["allowed"] is True + + def test_cedar_allow_with_trust_floor(self): + """Cedar allow but trust too low should deny.""" + bridge = CedarPolicyBridge(trust_floor=600) + decision = CedarDecision(effect="allow", tool_name="web_search") + result = bridge.evaluate(decision, agent_trust_score=100, agent_did="did:mesh:agent-1") + assert result["allowed"] is False + assert "trust score" in result["reason"].lower() + + def test_trust_bonus_applied(self): + bridge = CedarPolicyBridge(trust_bonus_per_allow=75) + decision = CedarDecision(effect="allow", tool_name="read_file") + result = bridge.evaluate(decision, agent_trust_score=400) + assert result["adjusted_trust"] == 475 + + def test_deny_penalty_applied(self): + bridge = CedarPolicyBridge(deny_penalty=300) + decision = CedarDecision(effect="deny", tool_name="shell_exec") + result = bridge.evaluate(decision, agent_trust_score=200) + assert result["adjusted_trust"] == 0 # clamped at 0 + + def test_receipt_required_but_missing(self): + bridge = CedarPolicyBridge(require_receipt=True) + decision = CedarDecision(effect="allow", tool_name="web_search") + result = bridge.evaluate(decision, agent_trust_score=500) + assert result["allowed"] is False + assert "receipt required" in result["reason"].lower() + + def test_receipt_provided_when_required(self): + bridge = CedarPolicyBridge(require_receipt=True) + decision = CedarDecision(effect="allow", tool_name="web_search") + receipt = make_receipt() + result = bridge.evaluate(decision, agent_trust_score=500, receipt=receipt) + assert result["allowed"] is True + assert "receipt_hash" in result + + def test_stats_tracking(self): + bridge = CedarPolicyBridge() + allow = CedarDecision(effect="allow", tool_name="read_file") + deny = CedarDecision(effect="deny", tool_name="shell_exec") + bridge.evaluate(allow, agent_trust_score=500) + bridge.evaluate(deny, agent_trust_score=500) + bridge.evaluate(allow, agent_trust_score=500) + stats = bridge.get_stats() + assert stats["total_evaluations"] == 3 + assert stats["allowed"] == 2 + assert stats["cedar_denies"] == 1 + + def test_trust_capped_at_1000(self): + bridge = CedarPolicyBridge(trust_bonus_per_allow=200) + decision = CedarDecision(effect="allow", tool_name="read_file") + result = bridge.evaluate(decision, agent_trust_score=900) + assert result["adjusted_trust"] == 1000 + + +# ---- ReceiptVerifier ---- + + +class TestReceiptVerifier: + def test_valid_receipt(self): + verifier = ReceiptVerifier() + receipt = make_receipt() + result = verifier.validate_structure(receipt) + assert result["valid"] is True + assert result["receipt_type"] == "scopeblind:decision" + + def test_missing_fields(self): + verifier = ReceiptVerifier() + result = verifier.validate_structure({"type": "scopeblind:decision"}) + assert result["valid"] is False + assert "Missing required fields" in result["reason"] + + def test_unknown_type_strict(self): + verifier = ReceiptVerifier(strict=True) + receipt = make_receipt() + receipt["type"] = "unknown:type" + result = verifier.validate_structure(receipt) + assert result["valid"] is False + + def test_unknown_type_lenient(self): + verifier = ReceiptVerifier(strict=False) + receipt = make_receipt() + receipt["type"] = "custom:type" + result = verifier.validate_structure(receipt) + assert result["valid"] is True + + def test_spending_authority_receipt(self): + verifier = ReceiptVerifier() + receipt = make_spending_receipt(amount=250.0, category="cloud_compute", band="medium") + result = verifier.validate_structure(receipt) + assert result["valid"] is True + assert result["amount"] == 250.0 + assert result["utilization_band"] == "medium" + + def test_to_agt_context(self): + verifier = ReceiptVerifier() + receipt = make_receipt(effect="allow", tool="web_search") + ctx = verifier.to_agt_context(receipt) + assert ctx["receipt_valid"] is True + assert ctx["issuer_blind"] is True + assert ctx["cedar_effect"] == "allow" + assert "receipt_hash" in ctx + + def test_invalid_receipt_agt_context(self): + verifier = ReceiptVerifier() + ctx = verifier.to_agt_context({"broken": True}) + assert ctx["receipt_valid"] is False + + +# ---- SpendingGate ---- + + +class TestSpendingGate: + def test_basic_spend_allowed(self): + gate = SpendingGate() + result = gate.evaluate_spend(amount=50.0, agent_trust_score=500) + assert result["allowed"] is True + + def test_exceeds_single_limit(self): + gate = SpendingGate(max_single_amount=100.0) + result = gate.evaluate_spend(amount=150.0, agent_trust_score=500) + assert result["allowed"] is False + assert "exceeds" in result["reason"].lower() + + def test_negative_amount_rejected(self): + gate = SpendingGate() + result = gate.evaluate_spend(amount=-10.0) + assert result["allowed"] is False + + def test_blocked_category(self): + gate = SpendingGate(blocked_categories=["gambling", "weapons"]) + result = gate.evaluate_spend(amount=50.0, category="gambling", agent_trust_score=999) + assert result["allowed"] is False + assert "blocked" in result["reason"].lower() + + def test_exceeded_utilization_band(self): + gate = SpendingGate() + result = gate.evaluate_spend(amount=10.0, utilization_band="exceeded", agent_trust_score=999) + assert result["allowed"] is False + assert "exceeded" in result["reason"].lower() + + def test_high_utilization_needs_trust(self): + gate = SpendingGate(high_util_trust_floor=500) + result = gate.evaluate_spend( + amount=50.0, utilization_band="high", agent_trust_score=200 + ) + assert result["allowed"] is False + assert "trust score" in result["reason"].lower() + + def test_high_utilization_with_sufficient_trust(self): + gate = SpendingGate(high_util_trust_floor=500) + result = gate.evaluate_spend( + amount=50.0, utilization_band="high", agent_trust_score=700 + ) + assert result["allowed"] is True + + def test_high_value_requires_receipt(self): + gate = SpendingGate() + result = gate.evaluate_spend(amount=2000.0, agent_trust_score=500) + assert result["allowed"] is False + assert "receipt" in result["reason"].lower() + + def test_high_value_with_receipt(self): + gate = SpendingGate() + receipt = make_spending_receipt(amount=2000.0) + result = gate.evaluate_spend(amount=2000.0, agent_trust_score=500, receipt=receipt) + assert result["allowed"] is True + + def test_stats(self): + gate = SpendingGate() + gate.evaluate_spend(amount=50.0, agent_trust_score=500) + gate.evaluate_spend(amount=25.0, agent_trust_score=500) + stats = gate.get_stats() + assert stats["total_requests"] == 2 + assert stats["allowed"] == 2 + assert stats["total_authorized_amount"] == 75.0 + + +# ---- scopeblind_context ---- + + +class TestScopeblindContext: + def test_minimal_context(self): + ctx = scopeblind_context() + assert ctx["source"] == "scopeblind:protect-mcp" + assert ctx["receipt"]["present"] is False + + def test_with_cedar_decision(self): + decision = CedarDecision(effect="allow", tool_name="read_file", policy_ids=["p1"]) + ctx = scopeblind_context(cedar_decision=decision) + assert ctx["cedar"]["effect"] == "allow" + assert ctx["cedar"]["tool"] == "read_file" + assert ctx["cedar"]["policy_ids"] == ["p1"] + + def test_with_receipt(self): + receipt = make_receipt(effect="allow", tool="web_search") + ctx = scopeblind_context(receipt=receipt) + assert ctx["receipt"]["present"] is True + assert ctx["receipt"]["issuer_blind"] is True + assert ctx["receipt"]["type"] == "scopeblind:decision" + + def test_with_spending(self): + ctx = scopeblind_context(spend_amount=99.50, spend_category="cloud", utilization_band="low") + assert ctx["spending"]["amount"] == 99.50 + assert ctx["spending"]["category"] == "cloud" + assert ctx["spending"]["utilization_band"] == "low" + + def test_full_context_shape(self): + """Full context should be a flat dict compatible with AGT evaluate().""" + decision = CedarDecision(effect="allow", tool_name="purchase") + receipt = make_spending_receipt(amount=99.50) + ctx = scopeblind_context( + cedar_decision=decision, + receipt=receipt, + spend_amount=99.50, + spend_category="cloud_compute", + utilization_band="low", + ) + assert "source" in ctx + assert "cedar" in ctx + assert "receipt" in ctx + assert "spending" in ctx + assert ctx["receipt"]["present"] is True From 9a3be8ac8a4054ee3d3760dd0fb332720a1109f6 Mon Sep 17 00:00:00 2001 From: tommylauren Date: Thu, 2 Apr 2026 03:54:00 -0400 Subject: [PATCH 2/4] =?UTF-8?q?fix:=20address=20code=20review=20=E2=80=94?= =?UTF-8?q?=20rename=20validate=5Fstructure,=20add=20thread=20safety,=20ed?= =?UTF-8?q?ge=20case=20tests,=20full=20hash?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.6 (1M context) --- .../scopeblind-protect-mcp/README.md | 2 +- .../scopeblind_protect_mcp/adapter.py | 93 ++++--- .../tests/test_scopeblind_adapter.py | 229 +++++++++++++++++- 3 files changed, 283 insertions(+), 41 deletions(-) diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/README.md b/packages/agentmesh-integrations/scopeblind-protect-mcp/README.md index 90a8e0f1..02baef01 100644 --- a/packages/agentmesh-integrations/scopeblind-protect-mcp/README.md +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/README.md @@ -98,7 +98,7 @@ receipt = { "publicKey": "base64_ed25519_public_key", } -result = verifier.validate_structure(receipt) +result = verifier.validate_structure_only(receipt) assert result["valid"] # Convert to AGT context diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py b/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py index 18e88095..6ba31c32 100644 --- a/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py @@ -23,6 +23,7 @@ import hashlib import json +import threading import time from dataclasses import dataclass, field from typing import Any, Dict, List, Optional @@ -109,6 +110,7 @@ def __init__( self.deny_penalty = deny_penalty self.require_receipt = require_receipt self._history: List[Dict[str, Any]] = [] + self._history_lock = threading.Lock() def evaluate( self, @@ -139,7 +141,8 @@ def evaluate( result["allowed"] = False result["reason"] = "Decision receipt required but not provided" result["adjusted_trust"] = max(0, agent_trust_score - self.deny_penalty) - self._history.append(result) + with self._history_lock: + self._history.append(result) return result # Cedar deny is authoritative — not overridable by trust score @@ -150,7 +153,8 @@ def evaluate( f"(policies: {cedar_decision.policy_ids})" ) result["adjusted_trust"] = max(0, agent_trust_score - self.deny_penalty) - self._history.append(result) + with self._history_lock: + self._history.append(result) return result # Cedar allow — layer AGT trust check @@ -161,7 +165,8 @@ def evaluate( f"Cedar allowed but trust score {adjusted} below floor {self.trust_floor}" ) result["adjusted_trust"] = adjusted - self._history.append(result) + with self._history_lock: + self._history.append(result) return result result["allowed"] = True @@ -169,22 +174,25 @@ def evaluate( result["adjusted_trust"] = adjusted if receipt: - result["receipt_hash"] = hashlib.sha256( + result["receipt_ref"] = hashlib.sha256( json.dumps(receipt, sort_keys=True).encode() - ).hexdigest()[:16] + ).hexdigest() - self._history.append(result) + with self._history_lock: + self._history.append(result) return result def get_history(self) -> List[Dict[str, Any]]: - return list(self._history) + with self._history_lock: + return list(self._history) def get_stats(self) -> Dict[str, Any]: - total = len(self._history) - allowed = sum(1 for r in self._history if r.get("allowed")) - cedar_denies = sum( - 1 for r in self._history if r.get("cedar_effect") == "deny" - ) + with self._history_lock: + total = len(self._history) + allowed = sum(1 for r in self._history if r.get("allowed")) + cedar_denies = sum( + 1 for r in self._history if r.get("cedar_effect") == "deny" + ) return { "total_evaluations": total, "allowed": allowed, @@ -231,10 +239,16 @@ class ReceiptVerifier: def __init__(self, strict: bool = True): self.strict = strict self._verified: List[Dict[str, Any]] = [] + self._verified_lock = threading.Lock() - def validate_structure(self, receipt: Dict[str, Any]) -> Dict[str, Any]: + def validate_structure_only(self, receipt: Dict[str, Any]) -> Dict[str, Any]: """ - Validate receipt structure (not cryptographic signature). + Validate receipt structure only — no cryptographic verification. + + Cryptographic signature verification (Ed25519) is delegated to + @veritasacta/verify or the protect-mcp runtime. This method only + checks that required fields are present and the receipt type is + recognized. Returns a result dict with: - valid: bool @@ -280,12 +294,13 @@ def validate_structure(self, receipt: Dict[str, Any]) -> Dict[str, Any]: result["utilization_band"] = payload.get("utilization_band") result["category"] = payload.get("category") - self._verified.append(result) + with self._verified_lock: + self._verified.append(result) return result def to_agt_context(self, receipt: Dict[str, Any]) -> Dict[str, Any]: """Convert a validated receipt to AGT-compatible context.""" - validation = self.validate_structure(receipt) + validation = self.validate_structure_only(receipt) if not validation.get("valid"): return {"receipt_valid": False, "reason": validation.get("reason", "")} @@ -296,15 +311,16 @@ def to_agt_context(self, receipt: Dict[str, Any]) -> Dict[str, Any]: "cedar_effect": validation.get("decision", ""), "tool": validation.get("tool", ""), "timestamp": validation.get("timestamp"), - "receipt_hash": hashlib.sha256( + "receipt_ref": hashlib.sha256( json.dumps(receipt, sort_keys=True).encode() - ).hexdigest()[:16], + ).hexdigest(), "issuer_blind": True, # protect-mcp receipts are always issuer-blind "payload_fields": sorted(payload.keys()), } def get_verification_log(self) -> List[Dict[str, Any]]: - return list(self._verified) + with self._verified_lock: + return list(self._verified) # --------------------------------------------------------------------------- @@ -344,6 +360,7 @@ def __init__( self.high_util_trust_floor = high_util_trust_floor self.blocked_categories = set(blocked_categories or []) self._decisions: List[Dict[str, Any]] = [] + self._decisions_lock = threading.Lock() def evaluate_spend( self, @@ -381,33 +398,38 @@ def evaluate_spend( f"Amount {amount} {currency} exceeds single-transaction " f"limit of {self.max_single_amount} {currency}" ) - self._decisions.append(result) + with self._decisions_lock: + self._decisions.append(result) return result if amount <= 0: result["allowed"] = False result["reason"] = "Amount must be positive" - self._decisions.append(result) + with self._decisions_lock: + self._decisions.append(result) return result # 2. Category check if category in self.blocked_categories: result["allowed"] = False result["reason"] = f"Category '{category}' is blocked" - self._decisions.append(result) + with self._decisions_lock: + self._decisions.append(result) return result # 3. Utilization + trust if utilization_band not in self.UTILIZATION_BANDS: result["allowed"] = False result["reason"] = f"Invalid utilization band: {utilization_band}" - self._decisions.append(result) + with self._decisions_lock: + self._decisions.append(result) return result if utilization_band == "exceeded": result["allowed"] = False result["reason"] = "Budget utilization exceeded" - self._decisions.append(result) + with self._decisions_lock: + self._decisions.append(result) return result if utilization_band == "high" and agent_trust_score < self.high_util_trust_floor: @@ -416,7 +438,8 @@ def evaluate_spend( f"High utilization requires trust score >= {self.high_util_trust_floor} " f"(current: {agent_trust_score})" ) - self._decisions.append(result) + with self._decisions_lock: + self._decisions.append(result) return result # 4. Receipt check for high-value transactions @@ -425,26 +448,30 @@ def evaluate_spend( result["reason"] = ( f"Transactions above 1000 {currency} require a spending authority receipt" ) - self._decisions.append(result) + with self._decisions_lock: + self._decisions.append(result) return result result["allowed"] = True result["reason"] = "Spending authorized" if receipt: - result["receipt_hash"] = hashlib.sha256( + result["receipt_ref"] = hashlib.sha256( json.dumps(receipt, sort_keys=True).encode() - ).hexdigest()[:16] + ).hexdigest() - self._decisions.append(result) + with self._decisions_lock: + self._decisions.append(result) return result def get_decisions(self) -> List[Dict[str, Any]]: - return list(self._decisions) + with self._decisions_lock: + return list(self._decisions) def get_stats(self) -> Dict[str, Any]: - total = len(self._decisions) - allowed = sum(1 for d in self._decisions if d.get("allowed")) - total_amount = sum(d.get("amount", 0) for d in self._decisions if d.get("allowed")) + with self._decisions_lock: + total = len(self._decisions) + allowed = sum(1 for d in self._decisions if d.get("allowed")) + total_amount = sum(d.get("amount", 0) for d in self._decisions if d.get("allowed")) return { "total_requests": total, "allowed": allowed, diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py b/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py index ef8ce751..c231964a 100644 --- a/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py @@ -140,7 +140,7 @@ def test_receipt_provided_when_required(self): receipt = make_receipt() result = bridge.evaluate(decision, agent_trust_score=500, receipt=receipt) assert result["allowed"] is True - assert "receipt_hash" in result + assert "receipt_ref" in result def test_stats_tracking(self): bridge = CedarPolicyBridge() @@ -168,13 +168,13 @@ class TestReceiptVerifier: def test_valid_receipt(self): verifier = ReceiptVerifier() receipt = make_receipt() - result = verifier.validate_structure(receipt) + result = verifier.validate_structure_only(receipt) assert result["valid"] is True assert result["receipt_type"] == "scopeblind:decision" def test_missing_fields(self): verifier = ReceiptVerifier() - result = verifier.validate_structure({"type": "scopeblind:decision"}) + result = verifier.validate_structure_only({"type": "scopeblind:decision"}) assert result["valid"] is False assert "Missing required fields" in result["reason"] @@ -182,20 +182,20 @@ def test_unknown_type_strict(self): verifier = ReceiptVerifier(strict=True) receipt = make_receipt() receipt["type"] = "unknown:type" - result = verifier.validate_structure(receipt) + result = verifier.validate_structure_only(receipt) assert result["valid"] is False def test_unknown_type_lenient(self): verifier = ReceiptVerifier(strict=False) receipt = make_receipt() receipt["type"] = "custom:type" - result = verifier.validate_structure(receipt) + result = verifier.validate_structure_only(receipt) assert result["valid"] is True def test_spending_authority_receipt(self): verifier = ReceiptVerifier() receipt = make_spending_receipt(amount=250.0, category="cloud_compute", band="medium") - result = verifier.validate_structure(receipt) + result = verifier.validate_structure_only(receipt) assert result["valid"] is True assert result["amount"] == 250.0 assert result["utilization_band"] == "medium" @@ -207,7 +207,7 @@ def test_to_agt_context(self): assert ctx["receipt_valid"] is True assert ctx["issuer_blind"] is True assert ctx["cedar_effect"] == "allow" - assert "receipt_hash" in ctx + assert "receipt_ref" in ctx def test_invalid_receipt_agt_context(self): verifier = ReceiptVerifier() @@ -329,3 +329,218 @@ def test_full_context_shape(self): assert "receipt" in ctx assert "spending" in ctx assert ctx["receipt"]["present"] is True + + +# ---- Edge case tests ---- + + +class TestEdgeCases: + """Edge cases covering missing/invalid fields, boundary trust scores, + concurrent access, and malformed payloads.""" + + # -- Receipts with missing or invalid fields -- + + def test_receipt_empty_dict(self): + verifier = ReceiptVerifier() + result = verifier.validate_structure_only({}) + assert result["valid"] is False + assert "Missing required fields" in result["reason"] + + def test_receipt_missing_signature(self): + verifier = ReceiptVerifier() + receipt = {"type": "scopeblind:decision", "payload": {}, "publicKey": "pk"} + result = verifier.validate_structure_only(receipt) + assert result["valid"] is False + assert "signature" in str(result["reason"]) + + def test_receipt_missing_public_key(self): + verifier = ReceiptVerifier() + receipt = {"type": "scopeblind:decision", "payload": {}, "signature": "sig"} + result = verifier.validate_structure_only(receipt) + assert result["valid"] is False + assert "publicKey" in str(result["reason"]) + + def test_receipt_payload_not_dict(self): + verifier = ReceiptVerifier() + receipt = { + "type": "scopeblind:decision", + "payload": "not_a_dict", + "signature": "sig", + "publicKey": "pk", + } + result = verifier.validate_structure_only(receipt) + assert result["valid"] is False + assert "object" in result["reason"].lower() + + def test_receipt_payload_is_list(self): + verifier = ReceiptVerifier() + receipt = { + "type": "scopeblind:decision", + "payload": [1, 2, 3], + "signature": "sig", + "publicKey": "pk", + } + result = verifier.validate_structure_only(receipt) + assert result["valid"] is False + + def test_receipt_null_signature(self): + """Signature present but empty string — should still pass structure check.""" + verifier = ReceiptVerifier() + receipt = { + "type": "scopeblind:decision", + "payload": {"effect": "allow", "tool": "test"}, + "signature": "", + "publicKey": "pk", + } + result = verifier.validate_structure_only(receipt) + assert result["valid"] is True + assert result["has_signature"] is False + + def test_to_agt_context_malformed_receipt(self): + verifier = ReceiptVerifier() + ctx = verifier.to_agt_context({"type": "bad"}) + assert ctx["receipt_valid"] is False + + # -- Trust scores at boundaries -- + + def test_trust_score_zero(self): + bridge = CedarPolicyBridge(trust_floor=0) + decision = CedarDecision(effect="allow", tool_name="read_file") + result = bridge.evaluate(decision, agent_trust_score=0) + assert result["allowed"] is True + assert result["adjusted_trust"] == 50 # default bonus + + def test_trust_score_1000(self): + bridge = CedarPolicyBridge(trust_bonus_per_allow=50) + decision = CedarDecision(effect="allow", tool_name="read_file") + result = bridge.evaluate(decision, agent_trust_score=1000) + assert result["adjusted_trust"] == 1000 # capped + + def test_trust_score_zero_with_deny(self): + bridge = CedarPolicyBridge(deny_penalty=100) + decision = CedarDecision(effect="deny", tool_name="shell_exec") + result = bridge.evaluate(decision, agent_trust_score=0) + assert result["adjusted_trust"] == 0 # clamped, no negative + + def test_trust_score_at_floor_boundary(self): + """Trust score exactly at floor should pass.""" + bridge = CedarPolicyBridge(trust_floor=550, trust_bonus_per_allow=50) + decision = CedarDecision(effect="allow", tool_name="read_file") + result = bridge.evaluate(decision, agent_trust_score=500) + assert result["adjusted_trust"] == 550 + assert result["allowed"] is True + + def test_trust_score_one_below_floor(self): + """Trust score one below floor should deny.""" + bridge = CedarPolicyBridge(trust_floor=551, trust_bonus_per_allow=50) + decision = CedarDecision(effect="allow", tool_name="read_file") + result = bridge.evaluate(decision, agent_trust_score=500) + assert result["adjusted_trust"] == 550 + assert result["allowed"] is False + + # -- Concurrent evaluations (thread safety) -- + + def test_concurrent_bridge_evaluations(self): + """Multiple threads appending to bridge history should not lose entries.""" + import threading + + bridge = CedarPolicyBridge() + decision = CedarDecision(effect="allow", tool_name="read_file") + n_threads = 10 + n_per_thread = 50 + barrier = threading.Barrier(n_threads) + + def worker(): + barrier.wait() + for _ in range(n_per_thread): + bridge.evaluate(decision, agent_trust_score=500) + + threads = [threading.Thread(target=worker) for _ in range(n_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(bridge.get_history()) == n_threads * n_per_thread + + def test_concurrent_receipt_verification(self): + """Multiple threads validating receipts should not lose entries.""" + import threading + + verifier = ReceiptVerifier() + receipt = make_receipt() + n_threads = 10 + n_per_thread = 50 + barrier = threading.Barrier(n_threads) + + def worker(): + barrier.wait() + for _ in range(n_per_thread): + verifier.validate_structure_only(receipt) + + threads = [threading.Thread(target=worker) for _ in range(n_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(verifier.get_verification_log()) == n_threads * n_per_thread + + def test_concurrent_spending_decisions(self): + """Multiple threads evaluating spends should not lose entries.""" + import threading + + gate = SpendingGate() + n_threads = 10 + n_per_thread = 50 + barrier = threading.Barrier(n_threads) + + def worker(): + barrier.wait() + for _ in range(n_per_thread): + gate.evaluate_spend(amount=10.0, agent_trust_score=500) + + threads = [threading.Thread(target=worker) for _ in range(n_threads)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(gate.get_decisions()) == n_threads * n_per_thread + + # -- Malformed receipt payloads -- + + def test_cedar_decision_from_receipt_missing_payload(self): + """from_receipt with no payload key should use top-level dict.""" + d = CedarDecision.from_receipt({"effect": "deny", "tool": "bash"}) + assert d.effect == "deny" + assert d.tool_name == "bash" + + def test_cedar_decision_from_receipt_empty(self): + """from_receipt with empty dict should default to deny.""" + d = CedarDecision.from_receipt({}) + assert d.effect == "deny" + assert d.tool_name == "" + + def test_spending_gate_zero_amount(self): + gate = SpendingGate() + result = gate.evaluate_spend(amount=0.0, agent_trust_score=500) + assert result["allowed"] is False + assert "positive" in result["reason"].lower() + + def test_spending_gate_invalid_utilization_band(self): + gate = SpendingGate() + result = gate.evaluate_spend( + amount=10.0, utilization_band="invalid_band", agent_trust_score=500 + ) + assert result["allowed"] is False + assert "invalid utilization band" in result["reason"].lower() + + def test_receipt_ref_is_full_sha256(self): + """receipt_ref should be a full 64-char hex SHA-256, not truncated.""" + bridge = CedarPolicyBridge() + decision = CedarDecision(effect="allow", tool_name="web_search") + receipt = make_receipt() + result = bridge.evaluate(decision, agent_trust_score=500, receipt=receipt) + assert "receipt_ref" in result + assert len(result["receipt_ref"]) == 64 # full SHA-256 hex From 6e538d448d61a6a2c4d93b6d35e7d3b4394b7523 Mon Sep 17 00:00:00 2001 From: tommylauren Date: Thu, 2 Apr 2026 22:25:40 -0400 Subject: [PATCH 3/4] fix: address imran-siddique review - empty sig, bounded lists, links, version - Empty signature/publicKey now returns valid:False (security fix) - All internal lists bounded to MAX_LOG=10000 (prevents memory leak) - README links use correct case: ScopeBlind/scopeblind-gateway - pyproject.toml adds Tom Farley as individual author - Version reference updated from 0.4.6 to 0.5.2 - 56 tests passing --- .../scopeblind-protect-mcp/README.md | 2 +- .../scopeblind-protect-mcp/pyproject.toml | 1 + .../scopeblind_protect_mcp/adapter.py | 77 +++++++++++++------ .../tests/test_scopeblind_adapter.py | 6 +- 4 files changed, 58 insertions(+), 28 deletions(-) diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/README.md b/packages/agentmesh-integrations/scopeblind-protect-mcp/README.md index 02baef01..734ef95d 100644 --- a/packages/agentmesh-integrations/scopeblind-protect-mcp/README.md +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/README.md @@ -11,7 +11,7 @@ protect-mcp is a security gateway that wraps any MCP server with: - **Issuer-blind verification** (verifier can confirm receipt validity without learning who issued it) - **Spending authority** (prove an agent's purchase is authorized without revealing org details) -Published on npm: `npx protect-mcp@latest` | [GitHub](https://github.com/scopeblind/scopeblind-gateway) | [Docs](https://scopeblind.com/docs/protect-mcp) +Published on npm: `npx protect-mcp@latest` | [GitHub](https://github.com/ScopeBlind/scopeblind-gateway) | [Docs](https://scopeblind.com/docs/protect-mcp) ## How it complements AGT diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/pyproject.toml b/packages/agentmesh-integrations/scopeblind-protect-mcp/pyproject.toml index d55a73bc..6d3d44bc 100644 --- a/packages/agentmesh-integrations/scopeblind-protect-mcp/pyproject.toml +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/pyproject.toml @@ -10,6 +10,7 @@ readme = "README.md" license = "MIT" requires-python = ">=3.9" authors = [ + { name = "Tom Farley" }, { name = "ScopeBlind Pty Ltd" } ] keywords = ["mcp", "cedar", "policy", "receipts", "agentmesh", "scopeblind", "trust"] diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py b/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py index 6ba31c32..adc65ea1 100644 --- a/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py @@ -98,17 +98,21 @@ class CedarPolicyBridge: behavioral trust on top. """ + MAX_HISTORY = 10000 # Prevent unbounded memory growth + def __init__( self, trust_floor: int = 0, trust_bonus_per_allow: int = 50, deny_penalty: int = 200, require_receipt: bool = False, + max_history: int = MAX_HISTORY, ): self.trust_floor = trust_floor self.trust_bonus = trust_bonus_per_allow self.deny_penalty = deny_penalty self.require_receipt = require_receipt + self._max_history = max_history self._history: List[Dict[str, Any]] = [] self._history_lock = threading.Lock() @@ -141,8 +145,7 @@ def evaluate( result["allowed"] = False result["reason"] = "Decision receipt required but not provided" result["adjusted_trust"] = max(0, agent_trust_score - self.deny_penalty) - with self._history_lock: - self._history.append(result) + self._record(result) return result # Cedar deny is authoritative — not overridable by trust score @@ -153,8 +156,7 @@ def evaluate( f"(policies: {cedar_decision.policy_ids})" ) result["adjusted_trust"] = max(0, agent_trust_score - self.deny_penalty) - with self._history_lock: - self._history.append(result) + self._record(result) return result # Cedar allow — layer AGT trust check @@ -165,8 +167,7 @@ def evaluate( f"Cedar allowed but trust score {adjusted} below floor {self.trust_floor}" ) result["adjusted_trust"] = adjusted - with self._history_lock: - self._history.append(result) + self._record(result) return result result["allowed"] = True @@ -182,6 +183,13 @@ def evaluate( self._history.append(result) return result + def _record(self, entry: Dict[str, Any]) -> None: + """Append to history with bounded size to prevent memory leaks.""" + with self._history_lock: + self._history.append(entry) + if len(self._history) > self._max_history: + self._history = self._history[-self._max_history:] + def get_history(self) -> List[Dict[str, Any]]: with self._history_lock: return list(self._history) @@ -236,8 +244,11 @@ class ReceiptVerifier: "acta:artifact", } - def __init__(self, strict: bool = True): + MAX_LOG = 10000 # Prevent unbounded memory growth + + def __init__(self, strict: bool = True, max_log: int = MAX_LOG): self.strict = strict + self._max_log = max_log self._verified: List[Dict[str, Any]] = [] self._verified_lock = threading.Lock() @@ -277,14 +288,26 @@ def validate_structure_only(self, receipt: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(payload, dict): return {"valid": False, "reason": "Payload must be an object"} + # Empty signature/publicKey should not pass as valid + sig = receipt.get("signature", "") + pk = receipt.get("publicKey", "") + if not sig or not pk: + return { + "valid": False, + "reason": "Empty signature or publicKey (cryptographic fields must be non-empty)", + "receipt_type": receipt_type, + "has_signature": bool(sig), + "has_public_key": bool(pk), + } + result = { "valid": True, "receipt_type": receipt_type, "tool": payload.get("tool", payload.get("resource", "")), "decision": payload.get("effect", payload.get("decision", "")), "timestamp": payload.get("timestamp"), - "has_signature": bool(receipt.get("signature")), - "has_public_key": bool(receipt.get("publicKey")), + "has_signature": True, + "has_public_key": True, } # Spending authority specific fields @@ -296,6 +319,8 @@ def validate_structure_only(self, receipt: Dict[str, Any]) -> Dict[str, Any]: with self._verified_lock: self._verified.append(result) + if len(self._verified) > self._max_log: + self._verified = self._verified[-self._max_log:] return result def to_agt_context(self, receipt: Dict[str, Any]) -> Dict[str, Any]: @@ -350,15 +375,19 @@ class SpendingGate: UTILIZATION_BANDS = {"low", "medium", "high", "exceeded"} + MAX_LOG = 10000 # Prevent unbounded memory growth + def __init__( self, max_single_amount: float = 10000.0, high_util_trust_floor: int = 500, blocked_categories: Optional[List[str]] = None, + max_log: int = MAX_LOG, ): self.max_single_amount = max_single_amount self.high_util_trust_floor = high_util_trust_floor self.blocked_categories = set(blocked_categories or []) + self._max_log = max_log self._decisions: List[Dict[str, Any]] = [] self._decisions_lock = threading.Lock() @@ -398,38 +427,33 @@ def evaluate_spend( f"Amount {amount} {currency} exceeds single-transaction " f"limit of {self.max_single_amount} {currency}" ) - with self._decisions_lock: - self._decisions.append(result) + self._record(result) return result if amount <= 0: result["allowed"] = False result["reason"] = "Amount must be positive" - with self._decisions_lock: - self._decisions.append(result) + self._record(result) return result # 2. Category check if category in self.blocked_categories: result["allowed"] = False result["reason"] = f"Category '{category}' is blocked" - with self._decisions_lock: - self._decisions.append(result) + self._record(result) return result # 3. Utilization + trust if utilization_band not in self.UTILIZATION_BANDS: result["allowed"] = False result["reason"] = f"Invalid utilization band: {utilization_band}" - with self._decisions_lock: - self._decisions.append(result) + self._record(result) return result if utilization_band == "exceeded": result["allowed"] = False result["reason"] = "Budget utilization exceeded" - with self._decisions_lock: - self._decisions.append(result) + self._record(result) return result if utilization_band == "high" and agent_trust_score < self.high_util_trust_floor: @@ -438,8 +462,7 @@ def evaluate_spend( f"High utilization requires trust score >= {self.high_util_trust_floor} " f"(current: {agent_trust_score})" ) - with self._decisions_lock: - self._decisions.append(result) + self._record(result) return result # 4. Receipt check for high-value transactions @@ -448,8 +471,7 @@ def evaluate_spend( result["reason"] = ( f"Transactions above 1000 {currency} require a spending authority receipt" ) - with self._decisions_lock: - self._decisions.append(result) + self._record(result) return result result["allowed"] = True @@ -463,6 +485,13 @@ def evaluate_spend( self._decisions.append(result) return result + def _record(self, entry: Dict[str, Any]) -> None: + """Append to decisions with bounded size to prevent memory leaks.""" + with self._decisions_lock: + self._decisions.append(entry) + if len(self._decisions) > self._max_log: + self._decisions = self._decisions[-self._max_log:] + def get_decisions(self) -> List[Dict[str, Any]]: with self._decisions_lock: return list(self._decisions) @@ -509,7 +538,7 @@ def scopeblind_context( """ ctx: Dict[str, Any] = { "source": "scopeblind:protect-mcp", - "version": "0.4.6", + "version": "0.5.2", } if cedar_decision is not None: diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py b/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py index c231964a..bdf7b33a 100644 --- a/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py @@ -383,8 +383,8 @@ def test_receipt_payload_is_list(self): result = verifier.validate_structure_only(receipt) assert result["valid"] is False - def test_receipt_null_signature(self): - """Signature present but empty string — should still pass structure check.""" + def test_receipt_empty_signature_rejected(self): + """Empty signature string should be rejected as invalid.""" verifier = ReceiptVerifier() receipt = { "type": "scopeblind:decision", @@ -393,7 +393,7 @@ def test_receipt_null_signature(self): "publicKey": "pk", } result = verifier.validate_structure_only(receipt) - assert result["valid"] is True + assert result["valid"] is False assert result["has_signature"] is False def test_to_agt_context_malformed_receipt(self): From 42c280a76d645e761e6c052d63da81916f34bdfd Mon Sep 17 00:00:00 2001 From: tommylauren Date: Thu, 2 Apr 2026 22:57:22 -0400 Subject: [PATCH 4/4] feat: Ed25519 key validation + replay protection for receipts - Public key format validation: rejects keys that aren't valid Ed25519 (64 hex chars or base64url). Catches malformed/injected keys early. - Replay protection: bounded OrderedDict tracks seen receipt hashes. Same receipt submitted twice returns valid:False with replay:True. Configurable: replay_protection=True (default), max_seen_receipts=50000. Evicts oldest entries when window is full. - Both features are on by default, configurable per-instance. - 8 new tests (64 total, all passing): key format validation (too short, non-hex, valid hex, valid base64url), replay detection, replay disabled, bounded window eviction, different receipts not flagged as replay. v0.1.1 --- .../scopeblind-protect-mcp/pyproject.toml | 2 +- .../scopeblind_protect_mcp/adapter.py | 54 ++++++++- .../tests/test_scopeblind_adapter.py | 114 ++++++++++++++++-- 3 files changed, 156 insertions(+), 14 deletions(-) diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/pyproject.toml b/packages/agentmesh-integrations/scopeblind-protect-mcp/pyproject.toml index 6d3d44bc..b7d2ae97 100644 --- a/packages/agentmesh-integrations/scopeblind-protect-mcp/pyproject.toml +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "scopeblind-protect-mcp" -version = "0.1.0" +version = "0.1.1" description = "AgentMesh integration for ScopeBlind protect-mcp — Cedar policy enforcement and verifiable decision receipts for MCP tool calls" readme = "README.md" license = "MIT" diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py b/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py index adc65ea1..39f13c45 100644 --- a/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/scopeblind_protect_mcp/adapter.py @@ -23,10 +23,12 @@ import hashlib import json +import re import threading import time +from collections import OrderedDict from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Set # --------------------------------------------------------------------------- @@ -245,12 +247,29 @@ class ReceiptVerifier: } MAX_LOG = 10000 # Prevent unbounded memory growth + MAX_SEEN_RECEIPTS = 50000 # Replay protection window - def __init__(self, strict: bool = True, max_log: int = MAX_LOG): + # Ed25519 public key: 64 hex chars (32 bytes) + _ED25519_PK_PATTERN = re.compile(r"^[0-9a-fA-F]{64}$") + # Also accept base64url-encoded keys (43-44 chars) + _ED25519_PK_B64_PATTERN = re.compile(r"^[A-Za-z0-9_-]{43,44}=?$") + + def __init__( + self, + strict: bool = True, + max_log: int = MAX_LOG, + replay_protection: bool = True, + max_seen_receipts: int = MAX_SEEN_RECEIPTS, + ): self.strict = strict + self.replay_protection = replay_protection self._max_log = max_log + self._max_seen = max_seen_receipts self._verified: List[Dict[str, Any]] = [] self._verified_lock = threading.Lock() + # Replay protection: bounded ordered set of seen receipt hashes + self._seen_receipts: OrderedDict[str, None] = OrderedDict() + self._seen_lock = threading.Lock() def validate_structure_only(self, receipt: Dict[str, Any]) -> Dict[str, Any]: """ @@ -300,6 +319,37 @@ def validate_structure_only(self, receipt: Dict[str, Any]) -> Dict[str, Any]: "has_public_key": bool(pk), } + # Validate Ed25519 public key format (32 bytes = 64 hex chars, or base64url) + if not (self._ED25519_PK_PATTERN.match(pk) or self._ED25519_PK_B64_PATTERN.match(pk)): + return { + "valid": False, + "reason": ( + f"Invalid publicKey format: expected 64 hex chars (Ed25519) " + f"or base64url, got {len(pk)} chars" + ), + "receipt_type": receipt_type, + "has_signature": True, + "has_public_key": True, + } + + # Replay protection: reject previously seen receipts + if self.replay_protection: + receipt_hash = hashlib.sha256( + json.dumps(receipt, sort_keys=True).encode() + ).hexdigest() + with self._seen_lock: + if receipt_hash in self._seen_receipts: + return { + "valid": False, + "reason": "Replay detected: this receipt has already been validated", + "receipt_type": receipt_type, + "replay": True, + } + self._seen_receipts[receipt_hash] = None + # Evict oldest entries when the window is full + while len(self._seen_receipts) > self._max_seen: + self._seen_receipts.popitem(last=False) + result = { "valid": True, "receipt_type": receipt_type, diff --git a/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py b/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py index bdf7b33a..4681c493 100644 --- a/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py +++ b/packages/agentmesh-integrations/scopeblind-protect-mcp/tests/test_scopeblind_adapter.py @@ -27,20 +27,24 @@ # ---- Fixtures ---- +_receipt_counter = 0 + def make_receipt( effect="allow", tool="web_search", receipt_type="scopeblind:decision", **extra_payload, ): - """Build a minimal valid receipt for testing.""" - payload = {"effect": effect, "tool": tool, "timestamp": time.time()} + """Build a minimal valid receipt for testing. Each call produces a unique receipt.""" + global _receipt_counter + _receipt_counter += 1 + payload = {"effect": effect, "tool": tool, "timestamp": time.time(), "nonce": _receipt_counter} payload.update(extra_payload) return { "type": receipt_type, "payload": payload, - "signature": "ed25519_test_sig_placeholder", - "publicKey": "ed25519_test_pk_placeholder", + "signature": "a" * 128, + "publicKey": "b" * 64, } @@ -464,21 +468,22 @@ def worker(): assert len(bridge.get_history()) == n_threads * n_per_thread def test_concurrent_receipt_verification(self): - """Multiple threads validating receipts should not lose entries.""" + """Multiple threads validating unique receipts should not lose entries.""" import threading - verifier = ReceiptVerifier() - receipt = make_receipt() + verifier = ReceiptVerifier(replay_protection=False) n_threads = 10 n_per_thread = 50 + # Each thread gets its own unique receipts + all_receipts = [[make_receipt(tool=f"t{t}_{i}") for i in range(n_per_thread)] for t in range(n_threads)] barrier = threading.Barrier(n_threads) - def worker(): + def worker(thread_receipts): barrier.wait() - for _ in range(n_per_thread): - verifier.validate_structure_only(receipt) + for r in thread_receipts: + verifier.validate_structure_only(r) - threads = [threading.Thread(target=worker) for _ in range(n_threads)] + threads = [threading.Thread(target=worker, args=(all_receipts[t],)) for t in range(n_threads)] for t in threads: t.start() for t in threads: @@ -544,3 +549,90 @@ def test_receipt_ref_is_full_sha256(self): result = bridge.evaluate(decision, agent_trust_score=500, receipt=receipt) assert "receipt_ref" in result assert len(result["receipt_ref"]) == 64 # full SHA-256 hex + + # -- Ed25519 public key format validation -- + + def test_invalid_public_key_too_short(self): + """Public key with wrong length should be rejected.""" + verifier = ReceiptVerifier() + receipt = { + "type": "scopeblind:decision", + "payload": {"effect": "allow", "tool": "test"}, + "signature": "a" * 128, + "publicKey": "abcd1234", # 8 chars, not 64 + } + result = verifier.validate_structure_only(receipt) + assert result["valid"] is False + assert "Invalid publicKey format" in result["reason"] + + def test_invalid_public_key_non_hex(self): + """Public key with non-hex characters should be rejected.""" + verifier = ReceiptVerifier() + receipt = { + "type": "scopeblind:decision", + "payload": {"effect": "allow", "tool": "test"}, + "signature": "a" * 128, + "publicKey": "g" * 64, # 'g' is not hex + } + result = verifier.validate_structure_only(receipt) + assert result["valid"] is False + assert "Invalid publicKey format" in result["reason"] + + def test_valid_public_key_hex(self): + """Valid 64-char hex public key should pass.""" + verifier = ReceiptVerifier() + receipt = make_receipt() + result = verifier.validate_structure_only(receipt) + assert result["valid"] is True + + def test_valid_public_key_base64url(self): + """Valid base64url-encoded public key should pass.""" + verifier = ReceiptVerifier() + receipt = { + "type": "scopeblind:decision", + "payload": {"effect": "allow", "tool": "test", "nonce": 99999}, + "signature": "a" * 128, + "publicKey": "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=", # 44 chars base64 + } + result = verifier.validate_structure_only(receipt) + assert result["valid"] is True + + # -- Replay protection -- + + def test_replay_detected(self): + """Same receipt submitted twice should be rejected the second time.""" + verifier = ReceiptVerifier(replay_protection=True) + receipt = make_receipt(tool="replay_test") + result1 = verifier.validate_structure_only(receipt) + assert result1["valid"] is True + result2 = verifier.validate_structure_only(receipt) + assert result2["valid"] is False + assert result2.get("replay") is True + assert "Replay detected" in result2["reason"] + + def test_replay_protection_disabled(self): + """With replay_protection=False, same receipt should pass twice.""" + verifier = ReceiptVerifier(replay_protection=False) + receipt = make_receipt(tool="no_replay_check") + result1 = verifier.validate_structure_only(receipt) + assert result1["valid"] is True + result2 = verifier.validate_structure_only(receipt) + assert result2["valid"] is True + + def test_replay_window_bounded(self): + """Replay window should evict old entries when full.""" + verifier = ReceiptVerifier(replay_protection=True, max_seen_receipts=5) + receipts = [make_receipt(tool=f"tool_{i}") for i in range(10)] + for r in receipts: + verifier.validate_structure_only(r) + # First receipt should have been evicted from the window + result = verifier.validate_structure_only(receipts[0]) + assert result["valid"] is True # Not in window anymore + + def test_different_receipts_not_replay(self): + """Two different receipts should both pass.""" + verifier = ReceiptVerifier(replay_protection=True) + r1 = make_receipt(tool="tool_a") + r2 = make_receipt(tool="tool_b") + assert verifier.validate_structure_only(r1)["valid"] is True + assert verifier.validate_structure_only(r2)["valid"] is True