Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -272,6 +273,53 @@ func TestPrepareExecution_AdditionalCoverage(t *testing.T) {
assert.Contains(t, string(plan.requestBody), `"actor_id":"actor-1"`)
})

t.Run("blocks calls to pending_approval agent with 503 (TC-034)", func(t *testing.T) {
pendingAgent := &types.AgentNode{
ID: "node-revoked",
BaseURL: "https://agent.example.com",
Version: "v1",
HealthStatus: types.HealthStatusActive,
LifecycleStatus: types.AgentStatusPendingApproval,
Reasoners: []types.ReasonerDefinition{{ID: "reasoner-a"}},
}
store := newTestExecutionStorage(pendingAgent)
controller := newExecutionController(store, nil, nil, time.Second, "")

rec := httptest.NewRecorder()
ctx, _ := gin.CreateTestContext(rec)
ctx.Params = gin.Params{{Key: "target", Value: "node-revoked.reasoner-a"}}
ctx.Request = httptest.NewRequest(http.MethodPost, "/execute/node-revoked.reasoner-a", strings.NewReader(`{"input":{}}`))
ctx.Request.Header.Set("Content-Type", "application/json")

_, err := controller.prepareExecution(context.Background(), ctx)
require.Error(t, err)

var pe *executionPreconditionError
require.ErrorAs(t, err, &pe)
assert.Equal(t, http.StatusServiceUnavailable, pe.HTTPStatusCode())
assert.Equal(t, ErrorCategoryAgentError, pe.Category())
assert.Equal(t, "agent_pending_approval", pe.ErrorCode())
assert.Contains(t, pe.Error(), "node-revoked")
assert.Contains(t, pe.Error(), "awaiting tag approval")

// Verify the wire-level response contract matches the sibling handlers
// (reasoners/skills/permission middleware): stable code in `error`,
// human text in `message`, 503 Service Unavailable.
writeExecutionError(ctx, err)
assert.Equal(t, http.StatusServiceUnavailable, rec.Code)
var body map[string]interface{}
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &body))
assert.Equal(t, "agent_pending_approval", body["error"])
assert.Equal(t, "agent_error", body["error_category"])
require.Contains(t, body, "message")
assert.Contains(t, body["message"], "awaiting tag approval")

// No execution record should have been persisted before the guard fired.
store.mu.Lock()
defer store.mu.Unlock()
assert.Empty(t, store.executionRecords, "no execution record should be created for a blocked call")
})

t.Run("returns error for invalid target and invalid body", func(t *testing.T) {
store := newTestExecutionStorage(&types.AgentNode{
ID: "node-1",
Expand Down
24 changes: 22 additions & 2 deletions control-plane/internal/handlers/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,18 @@ func (c *executionController) prepareExecution(ctx context.Context, ginCtx *gin.
}
}

// Block calls to agents that are pending approval (e.g. tags revoked).
// Matches the contract used by reasoners.go / skills.go / permission
// middleware: stable machine code in `error`, friendly text in `message`.
if agent.LifecycleStatus == types.AgentStatusPendingApproval {
return nil, &executionPreconditionError{
code: http.StatusServiceUnavailable,
message: fmt.Sprintf("agent node '%s' is awaiting tag approval and cannot execute", target.NodeID),
category: ErrorCategoryAgentError,
errorCode: "agent_pending_approval",
}
}

if agent.DeploymentType == "" && agent.Metadata.Custom != nil {
if v, ok := agent.Metadata.Custom["serverless"]; ok && fmt.Sprint(v) == "true" {
agent.DeploymentType = "serverless"
Expand Down Expand Up @@ -1956,10 +1968,18 @@ func writeExecutionError(ctx *gin.Context, err error) {

var pe *executionPreconditionError
if errors.As(err, &pe) {
ctx.JSON(pe.HTTPStatusCode(), gin.H{
body := gin.H{
"error": pe.Error(),
"error_category": string(pe.Category()),
})
}
// When a stable machine code is set, promote it to `error` and move
// the human-readable text to `message` — matching the contract used
// by reasoners.go / skills.go / permission middleware.
if code := pe.ErrorCode(); code != "" {
body["error"] = code
body["message"] = pe.Error()
}
ctx.JSON(pe.HTTPStatusCode(), body)
return
}

Expand Down
32 changes: 32 additions & 0 deletions control-plane/internal/handlers/execute_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,38 @@ func TestExecuteAsyncHandler_InvalidJSON(t *testing.T) {
require.Equal(t, http.StatusBadRequest, resp.Code)
}

func TestExecuteHandler_PendingApprovalAgent(t *testing.T) {
gin.SetMode(gin.TestMode)

agent := &types.AgentNode{
ID: "node-1",
BaseURL: "http://agent.example",
Reasoners: []types.ReasonerDefinition{{ID: "reasoner-a"}},
LifecycleStatus: types.AgentStatusPendingApproval,
}

store := newTestExecutionStorage(agent)
payloads := services.NewFilePayloadStore(t.TempDir())

router := gin.New()
router.POST("/api/v1/execute/:target", ExecuteHandler(store, payloads, nil, 90*time.Second, ""))

req := httptest.NewRequest(http.MethodPost, "/api/v1/execute/node-1.reasoner-a", strings.NewReader(`{"input":{"foo":"bar"}}`))
req.Header.Set("Content-Type", "application/json")
resp := httptest.NewRecorder()

router.ServeHTTP(resp, req)

require.Equal(t, http.StatusServiceUnavailable, resp.Code)

// Response contract (matches reasoners.go / skills.go / permission middleware):
// { "error": "agent_pending_approval", "message": "<human text>", "error_category": "agent_error" }
var payload map[string]string
require.NoError(t, json.Unmarshal(resp.Body.Bytes(), &payload))
require.Equal(t, "agent_pending_approval", payload["error"])
require.Contains(t, payload["message"], "awaiting tag approval")
}

func TestGetExecutionStatusHandler_ReturnsResult(t *testing.T) {
gin.SetMode(gin.TestMode)

Expand Down
19 changes: 16 additions & 3 deletions control-plane/internal/handlers/execution_guards.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,17 @@ const (
)

// executionPreconditionError carries both an HTTP status code and message.
//
// When errorCode is non-empty, the JSON renderer will emit the stable machine
// code in the top-level "error" field and move the human-readable message to
// "message" — matching the contract already used by sibling handlers
// (reasoners, skills, permission middleware) for conditions like
// agent_pending_approval.
type executionPreconditionError struct {
code int
message string
category ErrorCategory
code int
message string
category ErrorCategory
errorCode string
}

func (e *executionPreconditionError) Error() string {
Expand All @@ -153,6 +160,12 @@ func (e *executionPreconditionError) Category() ErrorCategory {
return e.category
}

// ErrorCode returns the stable machine-readable error code (if set).
// Clients should key on this rather than the human-readable message.
func (e *executionPreconditionError) ErrorCode() string {
return e.errorCode
}

// PublishExecutionLog publishes a structured log event for an execution.
// These events are streamed to SSE clients watching the execution.
func PublishExecutionLog(executionID, workflowID, agentNodeID, level, message string, metadata map[string]interface{}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
import { cn } from "@/lib/utils";
import {
fetchNodeLogsText,
NodeLogsError,
parseNodeLogsNDJSON,
streamNodeLogsEntries,
type NodeLogEntry,
Expand Down Expand Up @@ -210,7 +211,28 @@ export function NodeProcessLogsPanel({
const parsed = parseNodeLogsNDJSON(text);
setEntries(parsed.slice(-MAX_BUFFER));
} catch (e) {
setStreamError(e instanceof Error ? e.message : "Failed to load logs");
// Treat "no base_url" (agent never ran, no upstream URL yet) and 404
// (node has no logs endpoint) as expected empty states — not errors.
// Branch on the structured fields of NodeLogsError rather than
// string-matching the human message, which is brittle to backend
// phrasing changes.
if (
e instanceof NodeLogsError &&
(e.status === 404 || e.code === "agent_unreachable")
) {
// Surface to devtools so developers debugging the panel can still
// see the swallowed error; do not raise a destructive UI alert.
if (import.meta.env?.DEV) {
console.debug(
`[NodeProcessLogsPanel] expected empty state for node ${nodeId}:`,
e.status,
e.code ?? e.message,
);
}
setEntries([]);
} else {
setStreamError(e instanceof Error ? e.message : "Failed to load logs");
}
} finally {
setLoadingTail(false);
}
Expand Down
27 changes: 23 additions & 4 deletions control-plane/web/client/src/services/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,13 +363,34 @@ function nodeLogsAuthHeaders(): HeadersInit {
return h;
}

async function nodeLogsHttpError(response: Response): Promise<Error> {
/**
* Typed error thrown by node-logs fetch/stream helpers.
*
* Carries the HTTP status code and (when available) the stable machine code
* from the response body so callers can branch on structured fields instead
* of string-matching the human message.
*/
export class NodeLogsError extends Error {
readonly status: number;
readonly code?: string;

constructor(message: string, status: number, code?: string) {
super(message);
this.name = "NodeLogsError";
this.status = status;
this.code = code;
}
}

async function nodeLogsHttpError(response: Response): Promise<NodeLogsError> {
let msg = `HTTP ${response.status}`;
let code: string | undefined;
try {
const j = (await response.json()) as {
message?: string;
error?: string;
};
if (j.error) code = String(j.error);
if (j.message) msg = j.message;
else if (j.error) msg = String(j.error);
} catch {
Expand All @@ -380,9 +401,7 @@ async function nodeLogsHttpError(response: Response): Promise<Error> {
/* ignore */
}
}
const err = new Error(msg);
err.name = "NodeLogsError";
return err;
return new NodeLogsError(msg, response.status, code);
}

export function parseNodeLogsNDJSON(text: string): NodeLogEntry[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,30 @@ import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";

import { NodeProcessLogsPanel } from "@/components/nodes/NodeProcessLogsPanel";

const state = vi.hoisted(() => ({
fetchNodeLogsText: vi.fn(),
parseNodeLogsNDJSON: vi.fn(),
streamNodeLogsEntries: vi.fn(),
}));
const state = vi.hoisted(() => {
class MockNodeLogsError extends Error {
readonly status: number;
readonly code?: string;
constructor(message: string, status: number, code?: string) {
super(message);
this.name = "NodeLogsError";
this.status = status;
this.code = code;
}
}
return {
fetchNodeLogsText: vi.fn(),
parseNodeLogsNDJSON: vi.fn(),
streamNodeLogsEntries: vi.fn(),
NodeLogsError: MockNodeLogsError,
};
});

vi.mock("@/services/api", () => ({
fetchNodeLogsText: (...args: unknown[]) => state.fetchNodeLogsText(...args),
parseNodeLogsNDJSON: (...args: unknown[]) => state.parseNodeLogsNDJSON(...args),
streamNodeLogsEntries: (...args: unknown[]) => state.streamNodeLogsEntries(...args),
NodeLogsError: state.NodeLogsError,
}));

vi.mock("@/components/ui/alert", () => ({
Expand Down Expand Up @@ -248,4 +262,31 @@ describe("NodeProcessLogsPanel coverage paths", () => {
await user.click(screen.getAllByRole("button", { name: "Live" })[0]);
expect(await screen.findByText("stream interrupted")).toBeInTheDocument();
});

it("treats 404 NodeLogsError as empty state, not an error (TC-035)", async () => {
state.fetchNodeLogsText.mockRejectedValueOnce(
new state.NodeLogsError("HTTP 404", 404),
);
state.parseNodeLogsNDJSON.mockReturnValue([]);

render(<NodeProcessLogsPanel nodeId="node-404" />);

// Friendly empty state, not the destructive "Logs unavailable" alert.
expect(await screen.findByText(/No log lines yet/i)).toBeInTheDocument();
expect(screen.queryByText("Logs unavailable")).not.toBeInTheDocument();
});

it("treats agent_unreachable (no base_url) as empty state (TC-035)", async () => {
state.fetchNodeLogsText.mockRejectedValueOnce(
new state.NodeLogsError("node has no base_url", 502, "agent_unreachable"),
);
state.parseNodeLogsNDJSON.mockReturnValue([]);

render(<NodeProcessLogsPanel nodeId="node-no-baseurl" />);

expect(await screen.findByText(/No log lines yet/i)).toBeInTheDocument();
expect(screen.queryByText("Logs unavailable")).not.toBeInTheDocument();
// The raw backend message must not leak into the UI.
expect(screen.queryByText("node has no base_url")).not.toBeInTheDocument();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,36 @@ import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { NodeProcessLogsPanel } from "@/components/nodes/NodeProcessLogsPanel";
import type { NodeLogEntry } from "@/services/api";

const state = vi.hoisted(() => ({
fetchNodeLogsText: vi.fn(),
parseNodeLogsNDJSON: vi.fn(),
streamNodeLogsEntries: vi.fn(),
}));
const state = vi.hoisted(() => {
// NodeProcessLogsPanel does `e instanceof NodeLogsError` in its error
// branch. Without exporting a NodeLogsError constructor from the mock,
// the reference is undefined at runtime and `instanceof` throws a
// TypeError — which swallows the destructive-alert render and fails
// unrelated assertions in this suite. Provide a minimal shape-compatible
// mock class.
class MockNodeLogsError extends Error {
readonly status: number;
readonly code?: string;
constructor(message: string, status: number, code?: string) {
super(message);
this.name = "NodeLogsError";
this.status = status;
this.code = code;
}
}
return {
fetchNodeLogsText: vi.fn(),
parseNodeLogsNDJSON: vi.fn(),
streamNodeLogsEntries: vi.fn(),
NodeLogsError: MockNodeLogsError,
};
});

vi.mock("@/services/api", () => ({
fetchNodeLogsText: (...args: unknown[]) => state.fetchNodeLogsText(...args),
parseNodeLogsNDJSON: (...args: unknown[]) => state.parseNodeLogsNDJSON(...args),
streamNodeLogsEntries: (...args: unknown[]) => state.streamNodeLogsEntries(...args),
NodeLogsError: state.NodeLogsError,
}));

vi.mock("@/components/ui/alert", () => ({
Expand Down
Loading