Skip to content
218 changes: 186 additions & 32 deletions environments/templar/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""

import ast
import asyncio
import gc
import hashlib
import importlib.util
Expand All @@ -25,6 +26,7 @@
import sys
import time
import traceback
import uuid as _uuid
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
Expand All @@ -33,6 +35,7 @@
import torch
import torch.nn.functional as F
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field

from crusades.core.security_defs import (
Expand Down Expand Up @@ -2922,6 +2925,13 @@ def _timer_divergence(a: float, b: float) -> float:

app = FastAPI(title="Templar MFU Evaluation", version="2.0.0")

# ---------------------------------------------------------------------------
# In-memory jobs table for async evaluation
# ---------------------------------------------------------------------------
# Maps job_id -> {"status": "pending"|"done"|"failed", "result": dict|None}
_jobs: dict[str, dict] = {}
_jobs_lock = asyncio.Lock()

# Global actor instance (reused for efficiency)
_actor: Actor | None = None

Expand Down Expand Up @@ -2989,12 +2999,82 @@ async def health():
}


_current_torchrun: asyncio.subprocess.Process | None = None


def _get_descendant_pids(pid: int) -> list[int]:
"""Recursively collect all descendant PIDs via /proc before killing."""
descendants: list[int] = []
try:
with open(f"/proc/{pid}/task/{pid}/children") as f:
child_pids = [int(p) for p in f.read().split()]
for cpid in child_pids:
descendants.append(cpid)
descendants.extend(_get_descendant_pids(cpid))
except (FileNotFoundError, PermissionError, ProcessLookupError, ValueError, OSError):
pass
return descendants


def _kill_torchrun_group(proc: asyncio.subprocess.Process) -> None:
"""SIGKILL a torchrun process, its process group, AND all descendants.

torchrun's elastic agent may spawn workers in a different process group
than the launcher, so os.killpg alone is insufficient. We walk
/proc/<pid>/children first (before any kill) to collect every descendant,
then kill the process group *and* each descendant individually.
"""
import signal

if proc.returncode is not None:
return

pid = proc.pid

desc_pids = _get_descendant_pids(pid)

try:
pgid = os.getpgid(pid)
os.killpg(pgid, signal.SIGKILL)
logger.warning(f"Killed torchrun process group (pgid={pgid})")
except (ProcessLookupError, PermissionError, OSError):
pass

killed_extra = 0
for dpid in desc_pids:
try:
os.kill(dpid, signal.SIGKILL)
killed_extra += 1
except (ProcessLookupError, PermissionError, OSError):
pass
if killed_extra:
logger.warning(f"Also killed {killed_extra} descendant processes of torchrun (pid={pid})")

try:
proc.kill()
except (ProcessLookupError, OSError):
pass


async def _evaluate_via_torchrun(request: EvaluateRequest) -> dict:
"""Spawn torchrun for multi-GPU Basilica evaluation (uvicorn is single-process)."""
global _current_torchrun
import asyncio as _aio
import json as _json
import tempfile

if _current_torchrun is not None and _current_torchrun.returncode is None:
logger.warning(
f"Killing stale torchrun (pid={_current_torchrun.pid}) before new evaluation"
)
_kill_torchrun_group(_current_torchrun)
try:
await _aio.wait_for(_current_torchrun.wait(), timeout=10)
except TimeoutError:
logger.warning("Stale torchrun launcher did not exit within 10s after kill")
await _aio.sleep(30)
_current_torchrun = None
Comment on lines +3061 to +3076
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, let's find the file and check its existence
find . -name "env.py" -path "*/templar/*" 2>/dev/null | head -20

Repository: one-covenant/crusades

Length of output: 94


🏁 Script executed:

#!/bin/bash
# Get line count to understand the file structure
wc -l environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 97


🏁 Script executed:

#!/bin/bash
# Read the code at the specified lines (3015-3029)
sed -n '3010,3035p' environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 952


🏁 Script executed:

#!/bin/bash
# Look for the _evaluate_via_torchrun function definition
grep -n "_evaluate_via_torchrun" environments/templar/env.py | head -10

Repository: one-covenant/crusades

Length of output: 197


🏁 Script executed:

#!/bin/bash
# Check the lines mentioned as "Also applies to: 3087-3087, 3118-3133"
sed -n '3082,3092p' environments/templar/env.py
sed -n '3113,3138p' environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 1245


🏁 Script executed:

#!/bin/bash
# Search for any locking mechanisms or async patterns related to torchrun
grep -n "torchrun\|_current_torchrun\|asyncio\.Lock\|Lock\|mutex\|Semaphore" environments/templar/env.py | head -30

Repository: one-covenant/crusades

Length of output: 1693


🏁 Script executed:

#!/antml/bash
# Look for the /evaluate endpoint definition to check if it's single-flight
grep -n "evaluate\|/evaluate" environments/templar/env.py | grep -i "def\|route\|endpoint" | head -20

Repository: one-covenant/crusades

Length of output: 343


🏁 Script executed:

#!/bin/bash
# Read the /evaluate endpoint definition
sed -n '3154,3170p' environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 751


🏁 Script executed:

#!/bin/bash
# Check for any concurrency control like locks or semaphores
grep -n "Lock\|Semaphore\|single.*flight\|concurrent" environments/templar/env.py | head -20

Repository: one-covenant/crusades

Length of output: 47


🏁 Script executed:

#!/bin/bash
# Look for uvicorn or FastAPI configuration to understand concurrency
grep -n "uvicorn\|FastAPI\|app\|Depends" environments/templar/env.py | head -30

Repository: one-covenant/crusades

Length of output: 2201


🏁 Script executed:

#!/bin/bash
# Check task_id usage to understand request tracking
sed -n '2939,2970p' environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 1009


🏁 Script executed:

#!/antml/bash
# Look at the full exception handling in _evaluate_via_torchrun
sed -n '3100,3145p' environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 1603


🏁 Script executed:

#!/bin/bash
# Check the finally block to see how cleanup is handled
sed -n '3140,3155p' environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 546


🏁 Script executed:

#!/bin/bash
# Check if there are any other endpoints or handlers that might serialize requests
grep -B5 -A5 "num_gpus.*>" environments/templar/env.py | head -40

Repository: one-covenant/crusades

Length of output: 1143


Fix race condition in multi-GPU cleanup: track process identity per request.

The global _current_torchrun can be overwritten by concurrent requests, causing one request's exception handler to kill another request's subprocess. Since the /evaluate endpoint is async with no concurrency control, overlapping multi-GPU calls will interleave: Request A sets _current_torchrun = proc_A, Request B overwrites it with _current_torchrun = proc_B, and if Request A times out or errors, it kills proc_B instead. The unconditional stale-process kill at lines 3020–3024 also aborts unrelated jobs without task identity verification.

Use a local proc variable to track the process started by each request. Kill only the local proc in exception handlers (lines 3119–3120, 3132–3133), and only clear the global if it still points to the same process in the finally block:

Fix outline
async def _evaluate_via_torchrun(request: EvaluateRequest) -> dict:
    global _current_torchrun
+   proc: asyncio.subprocess.Process | None = None
    import asyncio as _aio
    import json as _json
    import tempfile

    if _current_torchrun is not None and _current_torchrun.returncode is None:
        logger.warning(
            f"Killing stale torchrun (pid={_current_torchrun.pid}) before new evaluation"
        )
        _kill_torchrun_group(_current_torchrun)
        try:
            await _aio.wait_for(_current_torchrun.wait(), timeout=5)
        except TimeoutError:
            pass
    _current_torchrun = None

    # ... setup code ...

    try:
        proc = await _aio.create_subprocess_exec(...)
        _current_torchrun = proc
        # ... rest of execution ...
    except TimeoutError:
-       if _current_torchrun is not None:
-           _kill_torchrun_group(_current_torchrun)
+       if proc is not None:
+           _kill_torchrun_group(proc)
        return { ... }
    except Exception as e:
-       if _current_torchrun is not None:
-           _kill_torchrun_group(_current_torchrun)
+       if proc is not None:
+           _kill_torchrun_group(proc)
        return { ... }
    finally:
+       if _current_torchrun is proc:
+           _current_torchrun = None
         for p in (params_path, script_path):
             if p:
                 try:
                     os.unlink(p)
                 except OSError:
                     pass
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@environments/templar/env.py` around lines 3015 - 3029, The race is caused by
using the global _current_torchrun directly per request; change the per-request
logic in the evaluate flow to assign the spawned process to a local variable
(e.g., proc) and use that local proc for all per-request exception/timeout
cleanup (call _kill_torchrun_group(proc) and await proc.wait()) so a failing
request only kills its own subprocess; when setting/clearing the global
_current_torchrun keep it as a best-effort shared pointer but only set it to
None in the finally block if _current_torchrun is still the same process
instance (compare identity with proc) to avoid clearing or killing unrelated
jobs, and update the exception handlers that currently reference
_current_torchrun (lines with _kill_torchrun_group and await ...wait()) to use
the local proc instead.


params_path = None
script_path = None
try:
Expand Down Expand Up @@ -3039,14 +3119,19 @@ async def main():
f.write(eval_script)
script_path = f.name

master_port = 29500 + random.randint(0, 10000)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "env.py" -path "*/templar/*" | head -20

Repository: one-covenant/crusades

Length of output: 94


🏁 Script executed:

sed -n '3070,3080p' ./environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 457


🏁 Script executed:

sed -n '3040,3090p' ./environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 1904


🏁 Script executed:

sed -n '3000,3045p' ./environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 1553


🏁 Script executed:

grep -n "_reset_torch_state" ./environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 170


🏁 Script executed:

sed -n '271,310p' ./environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 1641


🏁 Script executed:

sed -n '310,330p' ./environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 716


🏁 Script executed:

sed -n '3000,3015p' ./environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 617


🏁 Script executed:

sed -n '2240,2260p' ./environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 821


🏁 Script executed:

grep -n "def _evaluate" ./environments/templar/env.py | head -10

Repository: one-covenant/crusades

Length of output: 137


🏁 Script executed:

grep -n "def _evaluate" ./environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 137


🏁 Script executed:

grep -n "def evaluate" ./environments/templar/env.py | head -5

Repository: one-covenant/crusades

Length of output: 164


🏁 Script executed:

sed -n '3154,3180p' ./environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 1344


🏁 Script executed:

sed -n '1692,1750p' ./environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 2571


🏁 Script executed:

head -1 ./environments/templar/env.py && grep -n "import random" ./environments/templar/env.py | head -10

Repository: one-covenant/crusades

Length of output: 85


🏁 Script executed:

sed -n '2240,2280p' ./environments/templar/env.py

Repository: one-covenant/crusades

Length of output: 1689


Use a cryptographically trusted entropy source for master_port selection.

The single-GPU evaluation path executes untrusted miner code in-process. If untrusted code patches random.randint via frame globals manipulation, and _reset_torch_state() does not restore the random module (confirmed—it only resets torch/time functions), then subsequent multi-GPU evaluations will use the poisoned random.randint to select master_port. This allows attackers to force repeated or predictable port values, defeating the retry mitigation.

Capture random.SystemRandom().randrange at module import time to secure the entropy source before untrusted code execution:

Fix
_secure_randrange = random.SystemRandom().randrange
-        master_port = 29500 + random.randint(0, 10000)
+        master_port = 29500 + _secure_randrange(10001)
🧰 Tools
🪛 Ruff (0.15.6)

[error] 3075-3075: Standard pseudo-random generators are not suitable for cryptographic purposes

(S311)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@environments/templar/env.py` at line 3075, The master_port selection
currently uses random.randint which can be poisoned by untrusted in-process
code; capture a cryptographically secure randrange at module import time (e.g.
assign random.SystemRandom().randrange to a module-level name like
_secure_randrange) and use that secure function where master_port is computed
(the site setting master_port = 29500 + random.randint(0, 10000)); ensure the
secure capture happens before any untrusted execution and keep usage consistent
(replace random.randint calls for master_port selection with _secure_randrange).

proc = await _aio.create_subprocess_exec(
"torchrun",
"--nproc_per_node",
str(request.num_gpus),
"--master_port",
str(master_port),
script_path,
stdout=_aio.subprocess.PIPE,
stderr=_aio.subprocess.STDOUT,
start_new_session=True,
)
_current_torchrun = proc

collected_lines: list[str] = []

Expand Down Expand Up @@ -3078,6 +3163,8 @@ async def _read_and_tee():
"wall_time_seconds": 0.0,
}
except TimeoutError:
if _current_torchrun is not None:
_kill_torchrun_group(_current_torchrun)
return {
"task_id": request.task_id,
"success": False,
Expand All @@ -3089,6 +3176,8 @@ async def _read_and_tee():
"wall_time_seconds": 0.0,
}
except Exception as e:
if _current_torchrun is not None:
_kill_torchrun_group(_current_torchrun)
return {
"task_id": request.task_id,
"success": False,
Expand All @@ -3108,49 +3197,114 @@ async def _read_and_tee():
pass


@app.post("/evaluate", response_model=EvaluateResponse)
async def evaluate(request: EvaluateRequest) -> EvaluateResponse:
"""Evaluate miner's code. Spawns torchrun when num_gpus > 1."""
async def _run_evaluation(request: EvaluateRequest) -> dict:
"""Run the actual evaluation (sync helper used by background task)."""
if request.num_gpus > 1:
result = await _evaluate_via_torchrun(request)
else:
actor = get_actor()
result = await actor.evaluate(
task_id=request.task_id,
seed=request.seed,
model_url=request.model_url,
data_url=request.data_url,
steps=request.steps,
batch_size=request.batch_size,
timeout=request.timeout,
sequence_length=request.sequence_length,
data_samples=request.data_samples,
code=request.code,
max_loss_difference=request.max_loss_difference,
use_random_init=request.use_random_init,
min_trainable_params_ratio=request.min_trainable_params_ratio,
min_params_changed_ratio=request.min_params_changed_ratio,
weight_relative_error_max=request.weight_relative_error_max,
timer_divergence_threshold=request.timer_divergence_threshold,
gpu_peak_tflops=request.gpu_peak_tflops,
max_plausible_mfu=request.max_plausible_mfu,
min_mfu=request.min_mfu,
require_cuda_timing=True,
num_gpus=request.num_gpus,
)
return await _evaluate_via_torchrun(request)

actor = get_actor()
return await actor.evaluate(
task_id=request.task_id,
seed=request.seed,
model_url=request.model_url,
data_url=request.data_url,
steps=request.steps,
batch_size=request.batch_size,
timeout=request.timeout,
sequence_length=request.sequence_length,
data_samples=request.data_samples,
code=request.code,
max_loss_difference=request.max_loss_difference,
use_random_init=request.use_random_init,
min_trainable_params_ratio=request.min_trainable_params_ratio,
min_params_changed_ratio=request.min_params_changed_ratio,
weight_relative_error_max=request.weight_relative_error_max,
timer_divergence_threshold=request.timer_divergence_threshold,
gpu_peak_tflops=request.gpu_peak_tflops,
max_plausible_mfu=request.max_plausible_mfu,
min_mfu=request.min_mfu,
require_cuda_timing=True,
num_gpus=request.num_gpus,
)
Comment on lines +3205 to +3228
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

asyncio.create_task() does not offload the 1-GPU evaluation path.

_run_evaluation() still executes Actor.evaluate() on the FastAPI event-loop thread. Because Actor.evaluate() in this file has no suspension points, the default num_gpus == 1 path will block /eval-status until the run finishes, so the runner’s short poll requests can still hit timeouts and recreate the same 502 failure mode this PR is trying to remove.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@environments/templar/env.py` around lines 3205 - 3228, The call to
actor.evaluate in _run_evaluation runs synchronously on the FastAPI event loop
(Actor.evaluate has no awaits), blocking the loop for the 1-GPU path; offload
the CPU/GPU-bound work to a background thread or executor instead of calling
actor.evaluate directly. Replace the direct await actor.evaluate(...) in
_run_evaluation with an offload using asyncio.to_thread or loop.run_in_executor
(e.g., create a task that runs asyncio.to_thread(actor.evaluate, task_id=...,
seed=..., model_url=..., ...)) so the evaluation executes off the event loop;
keep the same arguments and ensure the created task is scheduled
(asyncio.create_task) so the endpoint and /eval-status polling remain
responsive.



async def _evaluation_background(job_id: str, request: EvaluateRequest) -> None:
"""Background coroutine: runs evaluation and stores result in _jobs."""
try:
result = await _run_evaluation(request)
async with _jobs_lock:
_jobs[job_id] = {"status": "done", "result": result}
logger.info(f"[JOB {job_id}] Evaluation finished successfully")
except Exception as exc:
error_result = {
"task_id": request.task_id,
"success": False,
"error": f"Background evaluation failed: {exc}",
"seed": request.seed,
"mfu": 0.0,
"tps": 0.0,
"total_tokens": 0,
"wall_time_seconds": 0.0,
}
async with _jobs_lock:
_jobs[job_id] = {"status": "failed", "result": error_result}
logger.error(f"[JOB {job_id}] Evaluation failed: {exc}")


@app.post("/evaluate")
async def evaluate(request: EvaluateRequest):
"""Accept evaluation request, start in background, return job_id immediately.

Returns HTTP 202 with {"job_id": "..."} so the caller can poll
GET /eval-status/{job_id} for results. This avoids proxy timeouts
on long-running evaluations.
"""
job_id = _uuid.uuid4().hex
async with _jobs_lock:
_jobs[job_id] = {"status": "pending", "result": None}
asyncio.create_task(_evaluation_background(job_id, request))
logger.info(
f"[JOB {job_id}] Evaluation accepted (task_id={request.task_id}, "
f"num_gpus={request.num_gpus})"
)
return JSONResponse(status_code=202, content={"job_id": job_id})


@app.get("/eval-status/{job_id}")
async def eval_status(job_id: str):
"""Poll for evaluation result.

Returns:
- 200 {"status": "pending"} while evaluation is running
- 200 {"status": "done", "result": {...}} when evaluation is complete
- 200 {"status": "failed", "result": {...}} on evaluation error
- 404 {"error": "unknown job_id"} if job_id is not found
"""
async with _jobs_lock:
job = _jobs.get(job_id)
if job is None:
return JSONResponse(status_code=404, content={"error": "unknown job_id"})

if job["status"] == "pending":
return {"status": "pending"}

return EvaluateResponse(
task_id=result.get("task_id", request.task_id),
result = job["result"]
response = EvaluateResponse(
task_id=result.get("task_id", 0),
mfu=result.get("mfu", 0.0),
tps=result.get("tps", 0.0),
total_tokens=result.get("total_tokens", 0),
wall_time_seconds=result.get("wall_time_seconds", 0.0),
success=result.get("success", False),
error=result.get("error"),
error_code=result.get("error_code"),
seed=result.get("seed", request.seed),
seed=result.get("seed", ""),
diagnostics=result.get("diagnostics", {}),
)
# Clean up to avoid unbounded memory growth
async with _jobs_lock:
_jobs.pop(job_id, None)
Comment on lines +3304 to +3306
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't delete a finished job on the first status read.

If the 200 carrying the result is lost after this pop(), the next poll gets 404 and a completed evaluation becomes unrecoverable. Keep finished jobs for a short TTL, or delete them only after an explicit ack, instead of removing them on the first successful GET /eval-status/{job_id}.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@environments/templar/env.py` around lines 3304 - 3306, The current cleanup
removes finished jobs immediately via _jobs.pop(job_id, None) inside async with
_jobs_lock when handling GET /eval-status/{job_id}, which can lose results if
the 200 response is dropped; instead, do not delete on first successful
read—either mark the job as finished and store a completed_at timestamp in the
job record and let a background task remove entries after a short TTL, or only
pop the entry after an explicit client ack endpoint (e.g., POST
/eval-ack/{job_id}); update code around _jobs_lock, _jobs.pop and the GET
/eval-status/{job_id} handling to implement one of these strategies so completed
jobs remain available for retries for a short period.

return {"status": job["status"], "result": response.model_dump()}


# Entry point when running directly (for local testing)
Expand Down
Loading