Controller
The controller is the main orchestrator for red-teaming evaluations. One Controller instance evaluates one security claim against one threat model — a single (scope, llm_config) combination. Sweeping multiple threat models is the caller’s job: instantiate one Controller per combination and run them sequentially or via asyncio.gather.
Construction
from superred.core.controller import Controller, TargetFactory
from superred.core.types.llm import LLMConfig
from superred.core.types.security_domain import Scope
# Produces fresh target instances; declares how many tasks may run in
# parallel against independent instances.
target_factory = TargetFactory(
create=lambda: MyTarget(api_key="sk-..."), # manual values at construction
concurrency=8, # default is 1 (sequential)
)
claim = SecurityClaim.from_tasks([task_a, task_b])
llm_config = LLMConfig(
model="gpt-4o-mini",
api_base="https://api.openai.com",
api_key="sk-...",
max_cost=5.00, # USD budget limit (optional, None = unlimited)
)
scope: Scope = frozenset({external_tag})
controller = Controller(
optimizer_factory=lambda: MyOptimizer(), # fresh optimizer per task
target_factory=target_factory, # fresh target per task; bounded concurrency
security_claim=claim,
scope=scope, # read & write surface (visible + injectable)
read_only=frozenset(), # optional: visible-but-not-injectable tags
llm_config=llm_config, # optional — omit for non-LLM optimizers
max_runs_per_task=100, # safety limit, default 100
include_feedback=True, # populate RunEndEvent.evaluation (default True)
results_dir="results/run-1", # optional — persist threat-model JSON
)
# `scope` is what the attacker can read AND write; `read_only` adds tags it
# can only read. To see the whole system but inject only the prompt:
# Controller(scope=frozenset({prompt_tag}),
# read_only=frozenset({system_tag}), ...)
tmr = await controller.run() # -> ThreatModelResult
scope may be a fixed Scope or a per-task ScopeResolver
scope accepts either a fixed Scope (the classic behavior above, one read & write surface applied to every task) or a ScopeResolver (a Callable[[Task], Scope], exported as superred.core.ScopeResolver) that computes the read & write scope once per task. callable(scope) is the discriminator. read_only independently accepts the same two forms (a fixed Scope or a ScopeResolver), resolved separately per task; the two resolvers are unrelated.
from superred.core import ScopeResolver
from my_target import DB_ORDERS_TAG, DB_CUSTOMERS_TAG
def resolve(task: Task) -> Scope:
# grant each task exactly the surface its goal needs
if task.goal.description.startswith("orders:"):
return frozenset({DB_ORDERS_TAG})
return frozenset({DB_CUSTOMERS_TAG})
controller = Controller(
optimizer_factory=lambda: MyOptimizer(),
target_factory=target_factory,
security_claim=claim,
scope=resolve, # a ScopeResolver, not a frozenset
scope_label="per-goal", # REQUIRED in dynamic mode (see below)
)
scope_labelis required in dynamic mode. When eitherscopeorread_onlyis callable,scope_labelmust be a non-emptystr(elseValueErrorat construction). It names the run, since there is no single concrete scope to name it by. When both are fixedfrozensets,scope_labelmust beNone(elseValueError), and the existing non-empty(scope | read_only)check still applies.- A resolver may skip a task. Either resolver raising
NotApplicablecontributes an empty set for its own dimension, exactly like returningfrozenset(). The task is skipped (lands inThreatModelResult.skipped_tasks, the same channel astask.configure_targetskips) when the resolved visibility (scope | read_only) is empty: no tag is granted in either dimension. Any tag (read or write, from either resolver) means the task runs (e.g.scopeNotApplicablebutread_onlynon-empty yields a read-only-only run with no injectable controllables). Use this when a task has no meaningful surface. - A resolver failure is contained per task. If a resolver raises any exception other than
NotApplicable, that one task becomes a contained error:TaskResult.stop_reason == "error",TaskResult.errorset, and sibling tasks are unaffected, so the threat model is not aborted. - Return the target’s exported tag singletons. Scope matching is by identity, so the resolver MUST return the same
SecurityDomainTaginstances the target exposes (import them from the target module). A freshly constructed tag with the samename/parentwill not match and will gate everything out.
The resolved scope gates all optimizer-facing surfaces for that task: the injectable controllables list, the observables list, read-only controllables re-presented as observables, the FilteredTrajectory view, the security_domain_filter (inject vs ControllableNoInjection), the feedback sub_scores filter, and the RunEndEvent.security_domain.
For single-instance targets (tests, expensive-to-construct resources), use
the TargetFactory.singleton(target) classmethod — concurrency is locked
to 1 since a shared instance can’t safely serve parallel tasks. The
controller still calls target.teardown() once per task, so a multi-task
singleton needs an idempotent teardown.
The controller does not create an asyncio event loop — the caller provides it via asyncio.run() or an existing loop.
Sweeping multiple threat models:
import asyncio, itertools
results = await asyncio.gather(*(
Controller(
scope=s, llm_config=c,
optimizer_factory=..., target_factory=target_factory,
security_claim=claim,
).run()
for s, c in itertools.product(scopes, configs)
))
Run lifecycle
await controller.run() -> ThreatModelResult runs every task in the security claim against the configured (scope, llm_config). Tasks run concurrently bounded by target_factory.concurrency (asyncio.Semaphore + asyncio.gather); results are collected in input order.
For each task:
target = target_factory.create()— freshTargetinstance owned by this task.task.configure_target(target)— ifNotApplicable, the task is collected intoskipped_tasksand not retried.- Create
LLMClientfromllm_config(fresh per task — budget is per-task). If nollm_config, use a noop client. - Create fresh optimizer via
optimizer_factory(). optimizer.initialize(goal, filtered_controllables, filtered_observables, llm_client)— only controllables and observables within the scope are passed.- Create
EventChannel, launchoptimizer.run(channel)as concurrentasyncio.Task. - Run loop (until optimizer signals done or
max_runs_per_task):- Create
Trajectory(filtered_scope=scope). Accesstrajectory.filteredfor the optimizer’s view. - Send
RunStartEvent(filtered_trajectory)through the channel. target.run(emit, send_event)— target emitsObservableEventinstances;send_eventbridges to channel with security domain filtering. Thetrajectory_recordermiddleware records events and responses directly to the trajectory.task.evaluate(trajectory, target)— returnsEvaluationResult; controller filterssub_scoresby scope.- Send
RunEndEvent(evaluation=filtered_eval, security_domain=<scope_tag>)through the channel; it is persisted to the trajectory. Wheninclude_feedback=True(default) the evaluation is attached. - Close the trajectory; call
target.reset_ephemeral_state()to reset ephemeral state for the next run within this task. - Track best score / success across runs.
- On exception inside the run: the partial trajectory is preserved as a final
RunResultwith a zero-score evaluation; the formatted exception lands onTaskResult.error;stop_reason = "error"; loop ends.
- Create
- Close channel, await optimizer task,
optimizer.teardown(). Finaltarget.reset_ephemeral_state()(infinally) followed bytarget.teardown(); the per-task target instance is then discarded.
As each task finishes, its per-task detail JSON is written immediately (when results_dir is set), so an interrupted run still leaves every completed task on disk. After all tasks finish, the claim-level summary file is written as a completion marker, the controller prints a summary to stdout, and returns the ThreatModelResult.
Internal structure
_iterate_tasks(scope, llm_config)— runs the security claim withasyncio.Semaphore(target_factory.concurrency)+asyncio.gather. Each in-flight task acquires the semaphore, callstarget_factory.create(), runs the task, thentarget.teardown()infinallybefore releasing the slot. Results are reassembled in input order._run_task(task, scope, llm_config, target)— manages the per-task lifecycle: configure, create fresh optimizer, initialize, build middleware stack, run loop, collect results._run_single(task, target, channel, scope, run_number, trajectory)— executes one iteration. The trajectory is owned by_run_taskso a partial trajectory survives an exception. Returns(evaluation, done).
The send_event callback passed to target.run is built by composing middleware onto channel.send:
send_event = compose(
trajectory_recorder(trajectory),
security_domain_filter(write_scope),
)(channel.send)
The filter receives the read & write scope (the controller’s scope); all other filtering uses the full visibility scope (scope | read_only). When read_only is empty the two are identical.
Users can add custom middleware (logging, tracing, budget enforcement) by extending the composition.
Security domain filtering
The controller enforces the security domain scope across all optimizer inputs:
- Controllables: Only the injectable ones are passed to
optimizer.initialize()— filtered withscope_includes(write_scope, c.security_domain)(the read & writescope). Out-of-scope and read-only controllables are not in this list, so it means exactly “what the optimizer can inject into.” - Observables: Filtered with
scope_includes(visibility, o.observable.security_domain)beforeoptimizer.initialize(), plus each read-only controllable (visible but not injectable) re-presented as anObservableValue(withcontent=None, since its value is revealed at runtime on the trajectory). Soobservablesmeans “what the optimizer can read,” including read-only controllables. Out-of-scope observables are never exposed. - Events:
ControllablePreCallEventandControllablePostCallEventfor out-of-scope controllables are answered withControllableNoInjectionwithout reaching the optimizer. Implemented as thesecurity_domain_filtermiddleware composed ontochannel.send. The filter is given the read & writescope, so controllable events underread_onlytags — visible but not injectable — are declined the same way. The difference from out-of-scope events is visibility: a read-only event is inside the full visibility scope, so it (and itsControllableNoInjection) remains visible through the filtered trajectory, observables, and feedback. - Trajectory: The optimizer receives a
FilteredTrajectory(viaRunStartEvent) that only exposes items within the security domain scope. - Feedback: Each
sub_scorein theEvaluationResultcarries asecurity_domain. The controller filterssub_scores, dropping only those whosesecurity_domainis out of scope (an untagged sub-score,security_domain=None, is always visible). TheRunEndEventcarries the filtered evaluation directly (wheninclude_feedback=True, the default) and is persisted to the trajectory withsecurity_domainset to a tag from the scope. Theprimary_scorecarries nosecurity_domainand is never filtered; it,success, andrationaleare always included (the optimizer needs the main optimization signal). The optimizer reads feedback fromevent.evaluationonRunEndEvent, or from past trajectories.
A Scope is a frozenset[SecurityDomainTag]. scope_includes(scope, tag) returns True if ANY tag in the scope includes the target tag. This allows testing specific security boundaries — scoping to {external_tag} tests only external-facing surfaces, while scoping to {root_tag} tests everything.
Access level is a property of the scope, not of each tag. The controller takes two sets: scope (read & write — visible and injectable) and an optional read_only set (visible only). read_only defaults to empty, so the whole scope is read & write — the classic behavior. To make part of the surface read-only, list it under read_only instead: Controller(scope={prompt}, read_only={system}) lets the attacker see the whole system subtree but inject only into prompt. A read_only tag already covered by scope has no effect (read & write overrules — only scope drives the injection check, so it stays injectable); scope and read_only cannot both be empty. Internally only the injection check (item 3) uses scope; items 1, 2, 4, 5 and the FilteredTrajectory use the full visibility scope scope | read_only, so read-only information flows through the exact same recording mechanism as read & write surfaces.
Unified trajectory as event log
There is no separate event log. The trajectory_recorder middleware records all events and responses directly into the trajectory as Event | EventResponse objects:
- Controllable events —
ControllablePreCallEvent,ControllablePostCallEvent. - Controllable responses —
ControllableInjection,ControllableNoInjection. - Observable events —
ObservableEventemitted by the target (model requests, model responses, etc.). - RunEndEvent — persisted to the trajectory by the controller after evaluation. Carries
evaluation: EvaluationResult | Noneand hassecurity_domainset from the scope.
The trajectory IS the event log. RunStartEvent is NOT persisted to the trajectory — it carries no additional information and always appears at a fixed position. RunEndEvent IS persisted because it carries the evaluation result. To inspect events and responses for a run, query the trajectory items by type.
Result types
RunResult (frozen)
One target execution + evaluation:
trajectory: Trajectory— the run trajectory.evaluation: EvaluationResult— the evaluation result for this run.llm_usage: LLMUsage— cumulative optimizer LLM usage after this run. This is a cumulative snapshot — each successive run includes all prior usage, enabling budget-vs-performance tracking.
TaskResult (frozen)
All runs for one task:
task: Task[Target]— the task that was evaluated.runs: list[RunResult]— all run results, in order.best_score: Score— highest primary score across all runs.best_evaluation: EvaluationResult— the evaluation that produced the best score.success: bool— whether any run achieved the adversarial goal.llm_usage: LLMUsage— total optimizer LLM usage across all runs.stop_reason: Literal["done", "max_runs", "budget_exhausted", "error"]— why the run loop ended: optimizer signaledRunEndResponse(done=True), hitmax_runs_per_task,BudgetExhaustedErrorwas raised, or an unexpected exception escaped the optimizer/target/evaluator and the task was abandoned.scope: Scope(defaultfrozenset()): the read & write scope enforced for this task. In static mode it equals the controller’sscopefor every task; with aScopeResolverit is the per-task resolved scope.read_only: Scope(defaultfrozenset()): the read-only scope enforced for this task. In static mode it equals the controller’sread_onlyfor every task; with aScopeResolverit is the per-task resolved read-only scope.error: str | None: formatted exception (type + message + traceback) when something went wrong,Noneotherwise. Set whenever the controller observes an exception associated with the task. Most commonly populated withstop_reason="error", but also populated as a bonus diagnostic when the run loop classified the task cleanly ("done"/"max_runs"/"budget_exhausted") yet the optimizer task subsequently raised during teardown. Consumers should treaterrorandstop_reasonas independent fields:error is not Nonedoes not implystop_reason == "error", and vice versa is the common (but not required) case.
ThreatModelResult (frozen)
Results for one (scope, llm_config) combination:
scope: Scope: the visibility scope tested. In dynamic mode (aScopeResolver) this is an empty frozenset; the per-task truth lives on eachTaskResult.scope.read_only: Scope: extra visible-but-not-injectable tags (empty for an all-read & write run; also empty in dynamic mode).scope_label: str | None(defaultNone):Nonein static mode (unchanged); in dynamic mode it is the label passed to the controller and names the run (sincescope/read_onlyare empty here).llm_config: LLMConfig | None— the LLM configuration used, orNonewhen no LLM configs were provided.task_results: list[TaskResult]— results for each evaluated task.skipped_tasks: list[Task[Target]]: tasks that raisedNotApplicable(duringconfigure_targetor, in dynamic mode, from the resolver).
LLM access and budget tracking
The controller mediates LLM access for the optimizer. This is part of the threat model — it defines what computational resources the attacker has.
- Configuration: Pass
llm_config=LLMConfig(...)to the controller constructor (optional — omit for non-LLM optimizers). The config specifies the model, API credentials, and an optional cost budget (max_costin USD). - Per-task budget: A fresh
LLMClientis created for each task. Budget resets per task. - Non-LLM optimizers: When
llm_configisNone/omitted, the optimizer receives a noopLLMClientthat raisesBudgetExhaustedErroron any call. - Constrained client: The
LLMClientlocks the model, API base, and API key. The optimizer cannot override them. - Cost-based budget enforcement: Pre-call checks raise
BudgetExhaustedErrorwhen cumulative cost reachesmax_cost. Cost is computed per call vialitellm.completion_cost(), which uses the model’s pricing to convert token usage to USD. - Usage tracking: Each
RunResultincludes a cumulativellm_usagesnapshot (calls, cost). EachTaskResultincludes the totalllm_usage. This enables budget-vs-performance analysis across runs. - Summary output: The evaluation summary includes call counts and cost.
Design decisions
- Concrete class, not ABC: There is one orchestration logic.
- Optimizer factory: A fresh optimizer is created for each (task, scope, llm_config) combination via
optimizer_factory(). This ensures clean state and allows threat model-specific initialization. - Target factory: A fresh target is created for each task via
target_factory.create(). Concurrent tasks never share mutable target state. The factory carries the per-target concurrency limit, which the controller enforces viaasyncio.Semaphore. Cheap targets (a chatbot wrapping an API) should bump this; heavy targets that hold expensive resources can either stay at the default of 1 or pool internally inside their factory. - Channel-based: Controller creates an
EventChannelper task. Target’ssend_eventcallback bridges tochannel.send()with filtering. Optimizer pulls from channel inrun(). - Multi-run loop: Runs until optimizer signals
RunEndResponse(done=True)ormax_runs_per_tasksafety limit.max_runs_per_taskis validated >= 1 at construction. - Concurrent optimizer:
optimizer.run(channel)is launched as anasyncio.Task. The optimizer stays alive across all runs for a task — one channel, one optimizer task per task. - Ephemeral reset after each run:
target.reset_ephemeral_state()is called after each evaluation to reset ephemeral state. - Exception-safe teardown:
optimizer.teardown()is called per task in afinallyblock. Each task’starget.teardown()is called in_iterate_tasks’s per-taskfinallyso target resources are released before the next task’s semaphore slot opens, regardless of how the task ended. - Exception-safe channel shutdown: If
target.run()ortask.evaluate()raises, thefinallyblock in_run_taskcloses the channel and awaits the optimizer task, preventing deadlock. - Per-task error containment: An unexpected exception escaping
optimizer.on_event,target.run,task.evaluate, ortarget.reset_ephemeral_stateis caught inside the run loop. The task ends withstop_reason="error"and any runs already completed before the failure are preserved inTaskResult.runs. Errors raised outside the run loop (e.g.task.configure_targetnon-NotApplicable,optimizer.initialize) are caught at the_iterate_taskslevel as a backstop and recorded as a synthetic errorTaskResultwithruns=[].BudgetExhaustedErroris preserved asstop_reason="budget_exhausted"wherever it originates inside the optimizer’s run loop oroptimizer.initialize(so an optimizer that exhausts its budget during a warmup call is not misclassified).NotApplicablecontinues to be handled distinctly (skipped_tasks). The rest of the threat model — and every later threat model — still runs and is persisted. - Post-task target reset:
target.reset_ephemeral_state()is called at the end of every task’s run loop in thefinallyblock, even when the loop ended via an error and the inner-loop reset-after-success was skipped. This means the next task in the threat model always starts against a target that has been told to reset its ephemeral state at least once after the previous task’s last run. The call is wrapped so a failingtarget.reset_ephemeral_state()is logged but does not propagate or block the next task. Theoptimizer.initializeearly-return paths (budget-exhausted and generic error) do not invoke this post-task reset because the run loop never started; targets whoseconfigure_targetmutates more than config slots should not rely onreset_ephemeral_staterunning in that case. - Unified trajectory: Events and responses are recorded directly to the trajectory via the
trajectory_recordermiddleware. No separate event log — the trajectory is the single source of truth. - CLI-ready: Constructor takes plain parameters. A future CLI module can parse config, instantiate components, call
asyncio.run(controller.run()).ThreatModelResultprovides structured output for programmatic use. - LLM access as threat model parameter: The model and budget are experiment-level settings, not optimizer choices. The controller creates a constrained
LLMClientper task and the optimizer cannot escape the configured model/credentials. Budget limits are a fairness measure for comparing optimizer strategies. - Per-task LLM budget: Each task gets a fresh
LLMClientwith reset counters. This ensures budget fairness when evaluating across multiple tasks and enables per-task budget analysis. - Cumulative usage snapshots:
RunResult.llm_usageis cumulative (includes all prior runs) rather than per-run delta. This is more useful for budget-vs-performance curves — each point shows (total_budget_spent, score_at_that_point).
Persistence (results_dir)
When results_dir is provided, the controller writes a two-level layout for the threat model when run() completes:
results_dir/
├── {scope}__{model}.json ← claim-level summary
└── {scope}__{model}/
├── 00001__{goal}.json ← per-task detail (one per task)
└── ...
Multiple controllers pointed at the same results_dir (the multi-threat-model sweep pattern) each write their own pair of files, named by their scope and model.
- Naming:
{sorted_tag1.sorted_tag2...}__{sanitized_model}.json. Tag and model strings are sanitized (any character outside[A-Za-z0-9_-]becomes_). When the threat model has read-only tags, their sorted names are appended as a__ro_{read_only}component (e.g.prompt__ro_system__gpt-4o.json) so threat models differing only in access mode don’t collide; all-read & write runs keep the plain{scope}__{model}.jsonname. In dynamic mode (aScopeResolver) the stem is the sanitizedscope_labelinstead of the tag names (claim file{label}__{model}.jsonand subfolder{label}__{model}/), since there is no single run-level scope. Whenllm_configisNone, the model segment isno-llm. Per-task files are named{NNNNN}__{sanitized_truncated_goal}.jsonwhere the index is 1-based and zero-padded to 5 digits. - When: per-task detail files are written incrementally — each one lands on disk as soon as its task finishes (success, error, or budget-exhausted). The claim-level summary file is written at the end of
run()and acts as a completion marker; if a post-mortem sees the subfolder without the matching summary file, the run was interrupted and the detail files are the authoritative record of what completed. - Failed tasks are still persisted: per-task error containment (see Design decisions) means an unexpected exception inside one task does not skip the threat model. The failing task lands in the on-disk file with
stop_reason="error", the partial trajectory accumulated before the crash, and the formatted exception under theerrorfield (which lives outside the trajectory). Sibling tasks still finish and are persisted. - Atomicity: each individual file is written via temp file +
rename. A disk failure on one task’s write is logged and contained — the controller continues running the remaining tasks. The summary file will still point at the would-be path (the missing file at that path is the signal). - Claim-level file:
version(SCHEMA_VERSION, now2),completed_at,scope(read & write tag names) plusread_only(visible-but-not-injectable tags; empty for an all-read & write run), ascope_labelfield (nullin static mode; the run label in dynamic mode, wherescope/read_onlyarrays are empty),llm_config(model + max_cost only), asummaryblock (n_tasks,n_success,n_skipped,max_primary_score,mean_primary_score,total_llm_usage), per-task summary entries each with a relativefilepath pointing at its detail file, andskipped_tasks. No trajectories at this level. - Per-task detail file: self-contained, and repeats
version,scope,read_only,llm_configplus the task’sgoal,success,best_score,best_evaluation,llm_usage,stop_reason, and the fullrunslist (each with its trajectory, evaluation, and cumulativellm_usage). In dynamic mode each detail file records that task’s own resolvedscope/read_only, so different files in the same run carry different scopes. - Aggregates:
mean_primary_scoreexcludesNotApplicabletasks (they are reported separately asn_skipped). When the claim has no evaluable tasks,mean_primary_scoreandmax_primary_scorearenull. stop_reasonper task: one of"done"(optimizer signaledRunEndResponse(done=True)),"max_runs"(hit the safety cap),"budget_exhausted"(BudgetExhaustedErrorwas raised), or"error"(unexpected exception in optimizer/target/evaluator; the task was abandoned).- Secrets:
LLMConfig.api_keyandapi_baseare explicitly excluded from both claim and detail files. Trajectory contents (e.g.ObservableEvent.content) are not scrubbed — keep credentials out of log/observable payloads. - Collisions: if either the claim-level file or the task subfolder already exists, the writer raises
FileExistsErrorrather than overwriting. Pass a per-run subdirectory if you re-run into the same parent.
When results_dir is None (the default), nothing is written and behavior is unchanged.