- 
                Notifications
    You must be signed in to change notification settings 
- Fork 363
Description
Vote this issue if you believe it may help you.
This proposal is introduce a general Env into agent-lightning, with this Env, you will no longer need to write new @agl.rollout function.
You can simply make a rollout from an Env, an LLM and a looper.
This proposal is not complete. Welcome to discuss and contribute.
I see there are two options of interfaces to implement this:
Option 1: Token-in-out interface
This option looks very similar to Tinker's Env.
Both the env's input and output (of step and reset) are token sequences, with some special handling on images.
To put it simply, the env looks like:
class Env:
    async def reset(self) -> list[int]:
        ...
    async def step(self, action: list[int]) -> list[int]:
        ...Many modern env uses text as the computing data underneath. Therefore, to make it compatible with the interface above, we need a Tokenizer to convert text to tokens and vice versa.
This can be non-trivial, especially when you are creating a token list that may involve advanced LLM capabilities, such as tool calls, guided decodings, etc.
To make a rollout, we can do:
def token_completer(llm: LLM, tokens: list[int]) -> list[int]:
    return new_tokens
def make_rollout(env: Env, looper: Looper):
    """
    Returns an Agent-Lightning rollout function.
    - The Trainer will pass in a `task` (you decide its schema).
    - Here we assume `task` can produce an Env (e.g., task.make_env()).
    - You can add more resources (e.g., prompt templates) as extra args if needed.
    """
    complete = make_token_completer(llm, runner)
    @agl.rollout  # <- Agent-Lightning expects a function decorated like this that returns a float reward. :contentReference[oaicite:1]{index=1}
    async def rollout(task) -> float:
        """
        Minimal signature: you can accept additional resources if your Trainer provides them,
        e.g., (task, prompt_template) or (task, policy_state). Return a scalar reward.
        """
        # 0) Build a fresh single-episode environment from the task
        env: Env = await task.make_env()  # or task.env if already constructed
        # 1) Get initial observation + stop condition
        ob, stop = await env.initial_observation()
        # (Optional) open a top-level span for tracing this rollout
        # agl.emit_span(kind="rollout.start", task_id=task.id)
        total_reward = 0.0
        t = 0
        while True:
            t += 1
            # 2) Policy proposes *tokens* as the action
            ac_with_lp: TokensWithLogprobs = await complete(ob, stop)
            # (Optional) record LLM step: prefix length, sampled length, avg logprob, etc.
            # agl.emit_span(kind="policy.step", tokens=len(ac_with_lp.tokens),
            #               avg_lp=float(sum(ac_with_lp.logprobs)/max(1,len(ac_with_lp.logprobs))))
            # 3) Environment step consumes raw token IDs
            sr: StepResult = await env.step(action=ac_with_lp.tokens)
            # (Optional) emit per-step reward shaping for analysis
            # agl.emit_reward(value=sr.reward, step=t, extra=sr.metrics)
            total_reward += sr.reward
            if sr.episode_done:
                # (Optional) close span
                # agl.emit_span(kind="rollout.end", steps=t, return_=total_reward)
                return float(total_reward)
            # 4) Move to next state
            ob = sr.next_observation
            stop = sr.next_stop_condition
        # Traces can be naturally collected because the tokens are all on the surface
        return traces
    return rolloutOption 2: Text-in-out interface
In this case, the env consumes and returns text.
To be completed...