Optimizer Interface

The optimizer is the attacker agent in superred. It runs as a concurrent actor, receiving events through an EventChannel and deciding what to inject at controllable points.

Lifecycle

1. Instantiate with configuration
2. initialize(goal, controllables, observables, llm_client) — base class stores the LLM client
3. run(channel) — launched as asyncio.Task by the controller
   For each run (until optimizer signals done):
     - Receives RunStartEvent(trajectory) → sets current_trajectory
     - Receives ControllablePreCallEvent / ControllablePostCallEvent [0..N]
     - (controller evaluates)
     - Receives RunEndEvent(evaluation) → archives trajectory
       evaluation is on the event and RunEndEvent is persisted to the trajectory.
       Respond with RunEndResponse(done=True) to stop, done=False to continue
   Channel closes → run() returns
4. teardown()

The optimizer stays alive across all runs for a task. One channel, one run() task. Multiple RunStart/RunEnd cycles flow through the same channel.

What to implement

Subclass Optimizer and override:

Required:

Optional:

LLM access

The controller passes a constrained LLMClient to initialize(). The base class stores it; after calling super().initialize(...), the optimizer accesses it via:

The LLMClient locks the model, API base, and API key — the optimizer cannot change them. Cost budget (max_cost in USD) is enforced by the client via pre-call checks that raise BudgetExhaustedError. Cost is computed per call via litellm.completion_cost().

async def on_event(self, event):
    if isinstance(event, ControllablePreCallEvent):
        response = await self.llm.complete([
            {"role": "system", "content": "You are a red-teaming assistant."},
            {"role": "user", "content": f"Generate an attack for: {event.request}"},
        ])
        attack = response.choices[0].message.content
        return ControllableInjection(event=event, controllable=event.controllable, value=attack)
    ...

Consumption models

The optimizer chooses how to process events by overriding run():

Sequential (default) — inherit run(), just override on_event:

async def on_event(self, event):
    if isinstance(event, ControllablePreCallEvent):
        return ControllableInjection(event=event, controllable=event.controllable, value="...")
    elif isinstance(event, RunStartEvent):
        return EventResponse(event=event)
    elif isinstance(event, RunEndEvent):
        done = self._budget_exhausted()
        return RunEndResponse(event=event, done=done)
    return EventResponse(event=event)

Parallel — override run(), spawn tasks per event:

async def run(self, channel):
    tasks = set()
    async for envelope in channel:
        task = asyncio.create_task(self._dispatch(envelope))
        tasks.add(task)
        task.add_done_callback(tasks.discard)
    if tasks:
        await asyncio.gather(*tasks)

Continuous with events — override run(), do background work + pull events:

async def run(self, channel):
    self._done = False
    async def event_loop():
        async for envelope in channel:
            await self._dispatch(envelope)
    async def background():
        while not self._done:
            await self._evolve_population()
    await asyncio.gather(event_loop(), background())

_dispatch(envelope) — trajectory tracking wrapper

The base class provides _dispatch() which:

  1. Sets _current_trajectory on RunStartEvent
  2. Calls on_event(event) to get the response
  3. Archives trajectory to _past_trajectories on RunEndEvent, clears _current_trajectory
  4. Calls envelope.respond(response) to deliver the response back through the channel

Exception safety: If on_event() raises, _dispatch archives the trajectory only when the failing event is a RunEndEvent, rejects the envelope (propagating the exception to the sender), and re-raises.

Use _dispatch from custom run() implementations to retain automatic trajectory tracking. Advanced optimizers can handle envelopes directly if they want full control.

Exception handling

The default run() iterates the channel via async for envelope in channel: await self._dispatch(envelope).

If on_event() raises, _dispatch rejects the envelope (propagating the exception to the sender) and re-raises. The exception exits run() and the controller detects the failure. The controller then poisons the channel via channel.set_error(), ensuring no other channel.send() call deadlocks.

Lifecycle events

Instead of hook methods, the optimizer receives lifecycle events through the channel:

These flow through the channel like any other event. No special methods to override.

History tracking

Both managed automatically by _dispatch(). If you override run() and don’t use _dispatch, you manage trajectory state yourself.

Thread safety

Design decisions