Architecture Overview
superred is a modular framework for red-teaming AI systems. It models the interaction between an optimizer (the attacker), a target (the AI system under test), and tasks (adversarial objectives), orchestrated by a controller through an event-driven, channel-based architecture.
High-Level Flow
SecurityClaim
| iterates tasks
v
Task[T_Target]
| configure_target(target) (pre-run config)
| evaluate(trajectory, target) (post-run queries)
v
+---------------------------+
| Controller |
| (security domain filter) |
+---------------------------+
send_event ↕ EventChannel ↕ channel
+-------------+ +------------------+
| Target | | Optimizer |
| (async run) | | (actor, run loop) |
+-------------+ +------------------+
Concurrency Model
The controller bridges the target and optimizer through an EventChannel. The target and optimizer run as independent concurrent asyncio tasks on a single event loop. Communication is cooperative — coroutines yield at await points, and the event loop scheduler interleaves them.
Target (asyncio.Task / threads) Controller Optimizer (asyncio.Task)
| | |
| branch_a: await send_event(e1) → | filter → channel → |
| branch_b: await send_event(e2) → | filter → channel → |
| | |
| (branches suspended on futures) | run(): async for envelope in channel:
| | on_event(e1) → respond(r1)
| | on_event(e2) → respond(r2)
| | |
| branch_a: ← r1 (resumes) | |
| branch_b: ← r2 (resumes) | |
Target internal parallelism: The target can spawn concurrent branches (via asyncio.gather or asyncio.create_task), each calling send_event independently. Each call creates its own future and suspends only that branch. Other branches continue independently. For thread-based targets (Docker, subprocesses), use asyncio.run_coroutine_threadsafe to bridge back to the event loop.
Optimizer consumption choice: The default run() processes events sequentially. Override for parallel (spawn tasks per envelope), continuous (background work + event processing), or any custom model. The optimizer controls its own concurrency.
Initialization and Run Loop
0. User builds a TargetFactory wrapping the target constructor:
target_factory = TargetFactory(
create=lambda: MyTarget(api_key="sk-..."),
concurrency=8, # how many tasks may run in parallel
)
1. Controller constructed with optimizer_factory, target_factory,
security_claim, scope (required), llm_config (optional),
max_runs_per_task (optional), results_dir (optional)
2. await controller.run():
For each task in security_claim, bounded by target_factory.concurrency
(asyncio.Semaphore + asyncio.gather; results in input order):
a. target = target_factory.create()
task.configure_target(target)
→ sets pre-run config via target.set_config()
→ raises NotApplicable if incompatible (task skipped)
b. Create LLMClient from llm_config — fresh per task (budget is per-task)
Create fresh optimizer via optimizer_factory()
Filter controllables and observables by scope
optimizer.initialize(goal, filtered_controllables, filtered_observables, llm_client)
c. channel = EventChannel()
optimizer_task = asyncio.create_task(optimizer.run(channel))
d. For each run (until optimizer signals done or max_runs):
Create Trajectory (full) and FilteredTrajectory (optimizer's view)
channel.send(RunStartEvent(filtered_trajectory))
target.run(emit, send_event)
→ target emits ObservableEvent instances via emit(event)
→ send_event bridges to channel with filtering
→ trajectory_recorder middleware records events/responses to trajectory
task.evaluate(trajectory, target) → EvaluationResult
→ controller filters sub_scores by scope
channel.send(RunEndEvent(evaluation=filtered_eval, security_domain=scope_tag))
→ RunEndEvent is persisted to the trajectory
→ optimizer responds with RunEndResponse(done=True/False)
Close the trajectory
target.reset_ephemeral_state() — resets ephemeral target state for next run within this task
If done=True, break
On exception: preserve partial trajectory + zero-score evaluation;
capture exception traceback on TaskResult.error; break
e. channel.close() → optimizer.run() exits
await optimizer_task, optimizer.teardown()
target.reset_ephemeral_state() (final) and target.teardown() — instance is discarded
3. Per-task detail files were written incrementally as each task
finished (when results_dir set). Write the claim-level summary
file now as the completion marker.
4. Print summary to stdout
5. Return ThreatModelResult
Sweeping multiple (scope, llm_config) combinations is the caller's job:
construct one Controller per combination and await asyncio.gather() them.
asyncio Runtime
There is one event loop on one thread. The caller provides it:
result = asyncio.run(controller.run())
The controller does not create its own event loop. This allows embedding in larger async applications (web servers, notebooks, pipelines). Tests use pytest-asyncio which provides the loop.
Key Design Decisions
-
Channel-based communication:
EventChanneldecouples target and optimizer. The target puts events viasend_eventcallback (bridged tochannel.send). The optimizer pulls from the channel at its own pace. Thread-safe:respond()andclose()usecall_soon_threadsafe. -
Optimizer as actor: The optimizer runs as its own
asyncio.Task, not called synchronously. It chooses its consumption model (sequential, parallel, continuous). -
Lifecycle events replace hooks:
RunStartEvent/RunEndEventflow through the channel like any other event. No special method calls. The base Optimizer’s_dispatch()wrapper handles trajectory tracking automatically. -
Target internal parallelism: Multiple concurrent branches each calling
send_eventindependently. Each gets its own response via the channel’s future-based mechanism. Supports asyncio tasks and thread bridging. -
Composable middleware:
Middleware = Callable[[EventResponseHandler], EventResponseHandler]. Wraps the event-response handler with zero overhead (function composition, no extra tasks or channels).compose(a, b)(handler)appliesaoutermost,binner. Built-in:security_domain_filter,trajectory_recorder. Users can add logging, tracing, budget enforcement etc. as additional middleware. -
Manual values are constructor concerns: API keys, credentials, etc. are passed to the target’s constructor. Not part of the framework interface.
-
Config and query are distinct target surfaces:
ConfigSpec/set_configfor task-set pre-run state.QuerySpec/queryfor post-run evaluation queries. Different actors, different lifecycles. -
Tasks are stateless:
configure_targetsets config, returns nothing.evaluatereceives the target for on-demand queries. No internal target reference. Safe to re-iterate from SecurityClaims. -
Tasks are type-bound via generics:
Task[MyRAGTarget]gets type-safe access to the concrete target.Task[Target]discovers capabilities at runtime viaconfig_specs/query_specs. -
Thread-safe at every boundary: Trajectory (
threading.Lock), EventChannel (asyncio.Queue+call_soon_threadsafe), EventEnvelope.respond (Lock+call_soon_threadsafe), LLMClient (threading.Lockon usage counters). -
Process-safe interface: The EventChannel interface (send/receive/respond/close) is designed so a future process-safe implementation (multiprocessing, sockets) can be swapped in with the same contract.
-
SecurityClaim composes: From tasks (
from_tasks) or from other claims (from_claims). Lazy chaining for claims-of-claims. Re-iterable since tasks are stateless. -
Runtime-defined types: SecurityDomainTag is a frozen dataclass, not an enum. Target systems define their own instances at runtime.
-
Values are always text: ConfigSpec and QuerySpec use strings. The description documents the format contract. The target interprets the text.
-
LLM access is part of the threat model: The controller controls which model the optimizer can use and tracks budget (calls, USD cost). The
LLMConfig(model, API base, API key,max_cost) is set at the experiment level. Budget enforcement is cost-based:litellm.completion_cost()computes USD per call from model pricing; pre-call checks raiseBudgetExhaustedErrorwhen cumulative cost reachesmax_cost. The optimizer receives a constrainedLLMClientthat locks the model and credentials — it cannot choose a different model. Budget is per-task (freshLLMClientper task). Uses litellm internally for OpenAI-compatible chat completions.
File Map
src/superred/core/
channel.py -- EventEnvelope, EventChannel (thread-safe)
controller.py -- Controller, TargetFactory, RunResult, TaskResult,
ThreatModelResult, OptimizerFactory
llm.py -- LLMClient (constrained LLM proxy for optimizers)
middleware.py -- Middleware type, compose(), security_domain_filter(),
trajectory_recorder()
persistence.py -- per-threat-model JSON serialization (used when the
Controller is given a results_dir; module-private)
interfaces/
optimizer.py -- Optimizer ABC (actor model: run, on_event, _dispatch)
target.py -- Target ABC, EventHandler type alias
task.py -- Task[T_Target] ABC, NotApplicable exception
security_claim.py -- SecurityClaim (composable task iterator)
types/
goal.py -- Goal
llm.py -- LLMConfig, LLMUsage, BudgetExhaustedError
state.py -- ConfigSpec, QuerySpec, QueryParam
controllable.py -- Controllable
observable.py -- Observable, ObservableValue
event.py -- Event, EventResponse base classes,
EventHandler / EventResponseHandler aliases
events.py -- ControllablePreCallEvent, ControllablePostCallEvent,
ControllableInjection, ControllableNoInjection,
ObservableEvent, RunStartEvent, RunEndEvent,
RunEndResponse
trajectory.py -- Trajectory, FilteredTrajectory, ReadableTrajectory,
TrajectoryItem, get_domain
evaluation.py -- Score, EvaluationResult
security_domain.py -- SecurityDomainTag, SecurityDomain, Scope, scope_includes
Detailed Component Documentation
- Controller – the orchestrator: event bridging, filtering, evaluation
- Optimizer – the optimizer interface: actor model, consumption choices
- Target – target interface, config/query separation
- Task – task generics, stateless design
- SecurityClaim – composable task collections
- Types Reference – all core types, design decisions, relationships