Advanced Patterns

Patterns you reach for once the basics are in place. Each is independent; read the ones you need.

Multi-turn targets

A target can hold a multi-turn conversation. The clean convention (used by the chatbot target) is to let the optimizer control how many turns happen: each turn the target asks for the next user message; an injection continues the conversation, a ControllableNoInjection ends it.

async def run(self, emit, send_event):
    messages = [{"role": "system", "content": self._system_prompt}]

    for turn in range(self._max_turns):
        resp = await send_event(ControllablePreCallEvent(
            controllable=_USER_MESSAGE_CTRL, request=f"Turn {turn + 1} user message"))

        if not isinstance(resp, ControllableInjection):
            break                        # attacker declined: end the conversation
        user_msg = resp.value
        messages.append({"role": "user", "content": user_msg})
        emit(ObservableEvent(observable=_REQUEST_OBS, content=user_msg))

        completion = await acompletion(model=self._model, messages=messages,
                                       api_base=self._api_base, api_key=self._api_key)
        assert isinstance(completion, ModelResponse)
        assistant = completion.choices[0].message.content or ""
        messages.append({"role": "assistant", "content": assistant})
        self._last_response = assistant
        emit(ObservableEvent(observable=_RESPONSE_OBS, content=assistant))

        # Optional: let the attacker see this turn's reply before the next turn.
        await send_event(ControllablePostCallEvent(
            controllable=_USER_MESSAGE_CTRL, request=f"Turn {turn + 1}", answer=assistant))

The attacker sees one ControllablePreCallEvent per turn and can adapt each message from the conversation visible in its filtered trajectory. A max_turns ceiling in the target keeps a run finite even if the attacker keeps injecting.

Parallel branches inside one run

A target may fan out into concurrent branches, each calling send_event independently. Each call suspends only its own branch and resumes when the attacker responds.

import asyncio

async def run(self, emit, send_event):
    async def branch(ctrl, label):
        resp = await send_event(ControllablePreCallEvent(controllable=ctrl, request=label))
        value = resp.value if isinstance(resp, ControllableInjection) else "default"
        emit(ObservableEvent(observable=Observable(name=f"{label}_input",
             security_domain=ctrl.security_domain, description=label), content=value))
        return value

    a, b = await asyncio.gather(branch(_SEARCH_CTRL, "search"), branch(_GEN_CTRL, "generate"))
    self._last_response = await self._combine(a, b)

The default (sequential) optimizer handles the two events one after another; both branches resume once answered. The channel is built for exactly this.

Thread-based targets

If your target drives work on background threads (Docker containers, subprocesses), bridge back to the event loop with asyncio.run_coroutine_threadsafe. The channel and envelope are thread-safe (they use call_soon_threadsafe internally).

import asyncio

async def run(self, emit, send_event):
    loop = asyncio.get_running_loop()

    def in_thread():
        fut = asyncio.run_coroutine_threadsafe(
            send_event(ControllablePreCallEvent(controllable=_CTRL, request="input")), loop)
        return fut.result(timeout=30)        # blocks this thread, not the loop

    resp = await loop.run_in_executor(None, in_thread)

Composing security claims

Build large evaluation suites from small, independent claims. Composition is lazy and re-iterable:

prompt_injection = SecurityClaim.from_tasks([SystemPromptLeakTask(), InstructionIgnoreTask()])
data_exfil       = SecurityClaim.from_tasks([SecretExtractionTask(), PIIExfilTask()])

full = SecurityClaim.from_claims([prompt_injection, data_exfil])

# Analyse by task afterwards:
for tr in result.task_results:
    print(f"{tr.task.goal.description}: {'PASS' if tr.success else 'FAIL'}")

Testing several scopes

Run the same claim under different scopes to chart the attack surface. Build a fresh Controller per scope (see Running Evaluations):

scopes = {"user": frozenset({USER_TAG}),
          "user+system": frozenset({USER_TAG, SYSTEM_PROMPT_TAG})}

for name, scope in scopes.items():
    controller = Controller(optimizer_factory=lambda: MyOptimizer(),
                            target_factory=target_factory, security_claim=claim,
                            scope=scope, llm_config=attacker_cfg, results_dir=f"results/{name}")
    result = await controller.run()
    succ = sum(1 for tr in result.task_results if tr.success)
    print(f"{name}: {succ}/{len(result.task_results)}")

Packaging a module

Each target, optimizer, and claim is its own pip-installable package. The layout is uniform across the repo:

my_optimizer/
  pyproject.toml
  src/my_optimizer/
    __init__.py        # public exports
    optimizer.py       # implementation
  tests/
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "my-optimizer"          # pip name: dashes
version = "0.1.0"
dependencies = ["superred"]

[tool.hatch.build.targets.wheel]
packages = ["src/my_optimizer"]   # import name: underscores
pip install -e ./my_optimizer
from my_optimizer import MyOptimizer   # import by the underscore name

The pip name uses dashes (my-optimizer) and the import name uses underscores (my_optimizer); the folder under superred-modules/ may differ again (the test_* fixtures are a deliberate example). Export your public surface from __init__.py, including any security-domain tag constants callers need to build scopes (the targets export their *_TAG constants for exactly this).

Middleware (how filtering is implemented)

The security filtering and trajectory recording are implemented as middleware: small functions that wrap the event handler. The Controller builds the target’s send_event by composing them onto the channel:

send_event = compose(
    trajectory_recorder(trajectory),         # records every event and response
    security_domain_filter(scope),           # declines non-injectable controllables
)(channel.send)

The filter receives the read & write scope (not the wider visibility scope that also includes read_only tags), so it declines both out-of-scope controllable events and in-scope events under read_only tags (the latter stay recorded and visible; see Security Domains). When read_only is empty the read & write scope equals the full visibility scope.

compose(a, b)(handler) applies a outermost, then b, then the inner handler, with zero extra tasks or channels. The two built-ins (security_domain_filter, trajectory_recorder) live in superred.core.middleware.

This is the mechanism that enforces scope, and it is worth understanding when reading the Controller. Note, though, that wiring custom middleware into a run is not a public extension point today: the Controller composes a fixed stack internally. If you need extra behaviour (rate limiting, tracing), the supported places to put it are inside your target’s run() or your optimizer’s on_event(). For the design rationale, see the Architecture Overview and Controller reference.