June 16, 2026
Building an Agentic Security Pipeline That Finds, Proves, and Patches Vulnerabilities
A six step defender’s loop on one GPU, where a crash, not the model, decides what counts
Fareed Khan
43 min read
Read this story for free: link
Modern codebases run to millions of lines, change every day, and ship faster than any team can audit by hand. In a memory-unsafe language like C, a single overlooked defect such as a buffer overflow or a use-after-free can become a remotely exploitable vulnerability, and manual review cannot cover that surface. Anthropic recently showed that LLMs are now capable enough to find and fix vulnerabilities at scale, and that discovery is now straightforward to parallelize, so the effort has shifted to proving a finding is exploitable, triaging it, and patching it.
Anthropic calls the whole process a six step find-and-fix loop, and we are going to build every step of it.
- Threat model: decide what counts as a vulnerability before we scan anything.
- Sandbox: a locked-down place to run untrusted inputs and prove an exploit safely.
- Discovery: agents hunt for bugs in parallel, tuned for recall.
- Verification: independently confirm each finding is reproducible, tuned for precision.
- Triage: deduplicate by root cause, assign a severity, and rank.
- Patching: fix the root cause, confirm the crash is gone, and look for variants.
Keeping discovery and verification apart is the key move, because a single step asked to do both gets cautious and starts filtering out true positives.
The twist in this build is the model. Instead of a frontier model, we run the whole loop on a small open 7B model on one GPU, with no API calls at all. We make it trustworthy with one rule. A program that crashes under a sanitizer is proof, and the model's opinion is not.
We target C memory-safety bugs because for those a sanitizer gives an objective yes or no. On a canary with three planted C bugs, the pipeline finds, proves, and patches all three, and then we point the same loop at an open source C library. Here is the whole system, a small model in the middle and the sanitizer as the judge.
All of the code, with every cell and the actual run outputs, is on GitHub:
GitHub - FareedKhan-dev/agentic-security-pipeline: An autonomous agentic pipeline that finds… An autonomous agentic pipeline that finds, proves, and patches real C memory-safety vulnerabilities end-to-end using a…
Let's get started!
Table of Contents
- Setting up the machine and the model server
- A small reliability layer
- The sandbox and the agent's tools
- The tool-use loop
- The records that flow between stages
- The canary: three planted bugs
- Stage 1: the threat model
- Stage 2: the sandbox boundary and reading
- Stage 3: discovery
- Stage 4: verification
- Stage 5: triage
- Stage 6: patching
- The whole loop on the canary
- Pointing it at production code
- What made a 7B model usable, and where this
Setting up the machine and the model server
Let us start the way any project starts, with the imports. There are not many of them, because almost all of the work is plain Python talking to a model server and to Docker, so the standard library carries most of the load.
import os
import re
import json
import time
import base64
import asyncio
import subprocess
import urllib.request
from dataclasses import dataclass, field, asdict
from typing import Optional
from openai import OpenAI, AsyncOpenAI # the one third-party client, pointed at local vLLMimport os
import re
import json
import time
import base64
import asyncio
import subprocess
import urllib.request
from dataclasses import dataclass, field, asdict
from typing import Optional
from openai import OpenAI, AsyncOpenAI # the one third-party client, pointed at local vLLMA quick tour of why each one is here. We lean on json and re constantly, because the pipeline passes JSON between stages and parses crash traces with regular expressions. base64 and asyncio come up later, for encoding proof-of-concept bytes and running the swarm in parallel.
subprocess and urllib.request drive Docker and poll the model server. And dataclasses plus typing are for the typed records. The one third-party import is the OpenAI client, pointed at our own vLLM server rather than a hosted API.
Next we put every knob in one place, a small Settings object, so the rest of the notebook never hard-codes a port or a model name in two places.
@dataclass
class Settings:
model_id: str = "Qwen/Qwen2.5-Coder-7B-Instruct" # the open weights model we run
served_name: str = "qwen2.5-coder-7b" # the short name our client calls
vllm_host: str = "127.0.0.1"
vllm_port: int = 8000
context_len: int = 32768 # Qwen2.5-Coder's native window
gpu_memory_utilization: float = 0.90
max_num_seqs: int = 32 # how many sequences vLLM batches
max_parallel_agents: int = 8 # the swarm width cap
@property
def base_url(self) -> str:
return f"http://{self.vllm_host}:{self.vllm_port}/v1"
settings = Settings()
CLIP = 5000 # max characters of tool output we feed back per turn
MAX_PARALLEL_AGENTS = settings.max_parallel_agents@dataclass
class Settings:
model_id: str = "Qwen/Qwen2.5-Coder-7B-Instruct" # the open weights model we run
served_name: str = "qwen2.5-coder-7b" # the short name our client calls
vllm_host: str = "127.0.0.1"
vllm_port: int = 8000
context_len: int = 32768 # Qwen2.5-Coder's native window
gpu_memory_utilization: float = 0.90
max_num_seqs: int = 32 # how many sequences vLLM batches
max_parallel_agents: int = 8 # the swarm width cap
@property
def base_url(self) -> str:
return f"http://{self.vllm_host}:{self.vllm_port}/v1"
settings = Settings()
CLIP = 5000 # max characters of tool output we feed back per turn
MAX_PARALLEL_AGENTS = settings.max_parallel_agentsWe tune these defaults for the one machine we run on, so later code can ask for settings.served_name or settings.context_len instead of repeating literals. We also define two constants for later, CLIP, which bounds how much tool output we hand back per turn, and MAX_PARALLEL_AGENTS, the swarm width.
With the settings in place, we establish the ground truth about the machine itself, because the resources we have decide how wide that swarm can actually be. We read the GPU, the memory, and the context budget, and we print them so there are no surprises later.
#### OUTPUT ####
GPU 0: NVIDIA H100 PCIe | 80 GB | driver 570.195.03
OK: 80 GB is comfortable for a 7B model plus a wide KV cache.
context_len : 32768
max_num_seqs (vLLM) : 32
max_parallel_agents (cap) : 8
=> swarm width we will use : 8
(canary fan-out is 3; the larger library run can use up to 8)#### OUTPUT ####
GPU 0: NVIDIA H100 PCIe | 80 GB | driver 570.195.03
OK: 80 GB is comfortable for a 7B model plus a wide KV cache.
context_len : 32768
max_num_seqs (vLLM) : 32
max_parallel_agents (cap) : 8
=> swarm width we will use : 8
(canary fan-out is 3; the larger library run can use up to 8)So we have one H100 with 80GB, a 32768 token context window, and a swarm capped at eight parallel workers. The canary needs only three agents for its three parsers, but the cap of eight is what we use on a larger codebase later. These numbers are not guesses, we read them from the card.
The other thing we check at the start is Docker, because every piece of execution happens inside a container, never on the host.
The model runs a deliberately crashing binary, and we want all of that walled off, so we would rather find a missing Docker now than halfway through a discovery swarm. With the GPU confirmed and Docker present, we can bring up the model.
The model is Qwen2.5-Coder-7B-Instruct, an open weights coding model, and we serve it with vLLM. vLLM gives us an OpenAI compatible API on a local port, so our notebook talks to it exactly like it would talk to a hosted API, except the weights live on our own GPU and nothing leaves the machine.
Here is the command that brings the server up. Read the flags, because each one is doing a specific job.
def vllm_launch_command(s: Settings) -> list[str]:
return [
"vllm", "serve", s.model_id,
"--host", s.vllm_host,
"--port", str(s.vllm_port),
"--dtype", "bfloat16", # H100 native; avoids fp16 overflow
"--max-model-len", str(s.context_len), # 32768 is Qwen2.5-Coder's native window
"--gpu-memory-utilization", str(s.gpu_memory_utilization),
"--max-num-seqs", str(s.max_num_seqs), # how many sequences vLLM batches
"--served-model-name", s.served_name, # the short name our client will use
]def vllm_launch_command(s: Settings) -> list[str]:
return [
"vllm", "serve", s.model_id,
"--host", s.vllm_host,
"--port", str(s.vllm_port),
"--dtype", "bfloat16", # H100 native; avoids fp16 overflow
"--max-model-len", str(s.context_len), # 32768 is Qwen2.5-Coder's native window
"--gpu-memory-utilization", str(s.gpu_memory_utilization),
"--max-num-seqs", str(s.max_num_seqs), # how many sequences vLLM batches
"--served-model-name", s.served_name, # the short name our client will use
]We use bfloat16 because it is native on the H100 and avoids the overflow problems of fp16. We set the context length to the model's native window, and we let vLLM batch up to 32 sequences at once, which is what makes a swarm of parallel agents cheap to run.
The server takes a little while to load the weights, so we poll its health endpoint until it answers, rather than guessing at a fixed sleep.
def wait_for_vllm_health(s: Settings, timeout: int = 600, interval: int = 5) -> bool:
deadline = time.time() + timeout
while time.time() < deadline:
status, body = _http_get(s.base_url + "/models", timeout=4.0)
if status == 200: # the API is answering
models = [m["id"] for m in json.loads(body).get("data", [])]
print(f"vLLM is READY. Served model(s): {models}")
return True
time.sleep(interval) # not up yet, wait and retry
print("Timed out waiting for vLLM.")
return Falsedef wait_for_vllm_health(s: Settings, timeout: int = 600, interval: int = 5) -> bool:
deadline = time.time() + timeout
while time.time() < deadline:
status, body = _http_get(s.base_url + "/models", timeout=4.0)
if status == 200: # the API is answering
models = [m["id"] for m in json.loads(body).get("data", [])]
print(f"vLLM is READY. Served model(s): {models}")
return True
time.sleep(interval) # not up yet, wait and retry
print("Timed out waiting for vLLM.")
return FalseThis is a small thing, but it matters for a headless run, because the notebook should wait for the model to be genuinely ready and not just assume it. Once the server reports ready, we send it one tiny smoke test, just to confirm it is awake and answering.
#### OUTPUT ####
vLLM is READY. Served model(s): ['qwen2.5-coder-7b']
Prompt : In one sentence, what is a heap buffer overflow?
Answer : A heap buffer overflow occurs when more data is written to a memory location in
the heap than it can hold, potentially overwriting adjacent memory and causing
unpredictable behavior or security vulnerabilities.
Tokens : prompt=40 completion=36#### OUTPUT ####
vLLM is READY. Served model(s): ['qwen2.5-coder-7b']
Prompt : In one sentence, what is a heap buffer overflow?
Answer : A heap buffer overflow occurs when more data is written to a memory location in
the heap than it can hold, potentially overwriting adjacent memory and causing
unpredictable behavior or security vulnerabilities.
Tokens : prompt=40 completion=36The model is up and it answers correctly, in 36 tokens.
There is one more thing worth saying about this server. Qwen2.5-Coder does not reliably emit the special tool-call format that some models use, so we deliberately do not turn on native tool calling in vLLM. It fails silently on this model, which is the worst kind of failure.
Instead we are going to build our own tool protocol on top of plain JSON, and that is the next piece.
A small reliability layer
A 7B model is perfectly capable of doing useful work, but it is not as obedient as a frontier model. Ask it for JSON and it will sometimes wrap the JSON in a friendly sentence, or add a code fence, or ramble.
If our pipeline is going to make decisions based on the model's output, we need that output to be machine readable every single time. So before anything else, we build a thin reliability layer.
The base call is just a wrapper around the OpenAI client, with the temperature pinned to zero by default so the runs are repeatable.
def chat(messages: list[dict], temperature: float = 0.0, max_tokens: int = 1024) -> str:
resp = client.chat.completions.create(
model=settings.served_name,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
return resp.choices[0].message.content or ""def chat(messages: list[dict], temperature: float = 0.0, max_tokens: int = 1024) -> str:
resp = client.chat.completions.create(
model=settings.served_name,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
return resp.choices[0].message.content or ""That gives us free text. The problem is free text. Let me show you why with a concrete example.
I asked the model a simple question, what are the typical bug classes in C, and to return structured fields. Watch what the free text part does.
#### OUTPUT ####
Raw model text:
Sure! Here is the result:
{ "finding": "use-after-free", "line": 58 }
Hope that helps.
Parsed dict : {
"language": "C",
"memory_safe": false,
"typical_bugs": [
"buffer_overflows", "use_after_free", "integer_overflow",
"null_pointer_dereference", "format_string_vulnerabilities",
... "sql_injection", "command_injection", "path_traversal",
"phishing_attacks", "social_engineering", "zero_day_exploits" ]
}
memory_safe : False (C is not, so this should be false)#### OUTPUT ####
Raw model text:
Sure! Here is the result:
{ "finding": "use-after-free", "line": 58 }
Hope that helps.
Parsed dict : {
"language": "C",
"memory_safe": false,
"typical_bugs": [
"buffer_overflows", "use_after_free", "integer_overflow",
"null_pointer_dereference", "format_string_vulnerabilities",
... "sql_injection", "command_injection", "path_traversal",
"phishing_attacks", "social_engineering", "zero_day_exploits" ]
}
memory_safe : False (C is not, so this should be false)Two things are happening here. The model wrapped its JSON in "Sure! Here is the result" and "Hope that helps", which would break a naive json.loads. And asked for typical bugs, it wandered into a long duplicated list that drifts into phishing and social engineering, which is not what we asked.
This is a 7B model being a 7B model. We cannot trust its prose, but the one field we care about, memory_safe: false, is correct, so our job is to ignore the rambling and extract just the structured field we need.
The first defense is a small extractor that pulls the first valid JSON object out of any text, whether it is fenced, wrapped in chatter, or bare.
def extract_first_json(text: str) -> Optional[dict]:
# Scan for the first brace-balanced object, ignoring any prose wrapped around it.
start = text.find("{")
if start == -1:
return None
depth = 0
for i in range(start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0: # found the matching close brace
try:
return json.loads(text[start:i + 1])
except json.JSONDecodeError:
return None
return Nonedef extract_first_json(text: str) -> Optional[dict]:
# Scan for the first brace-balanced object, ignoring any prose wrapped around it.
start = text.find("{")
if start == -1:
return None
depth = 0
for i in range(start, len(text)):
if text[i] == "{":
depth += 1
elif text[i] == "}":
depth -= 1
if depth == 0: # found the matching close brace
try:
return json.loads(text[start:i + 1])
except json.JSONDecodeError:
return None
return NoneIt walks the text, finds the first opening brace, and tracks nesting depth until the matching close brace, then parses just that slice. The chatter before and after the object is simply ignored. There is a sibling helper, parse_xml_tag, for the cases where we ask the model to wrap a single answer in a named tag, which a small model often handles more reliably than raw JSON when we only need one free-text field back.
def parse_xml_tag(text: str, tag: str) -> Optional[str]:
m = re.search(rf"<{re.escape(tag)}>(.*?)</{re.escape(tag)}>", text, re.DOTALL)
return m.group(1).strip() if m else Nonedef parse_xml_tag(text: str, tag: str) -> Optional[str]:
m = re.search(rf"<{re.escape(tag)}>(.*?)</{re.escape(tag)}>", text, re.DOTALL)
return m.group(1).strip() if m else NoneThat handles the chatter. But there is a stronger tool available. vLLM supports guided decoding, where you hand it a JSON Schema and the server constrains the model so it can only produce tokens that fit that schema. The output is valid JSON by construction, not by luck. We wrap that in a single function, with a fallback for the rare case where the guided backend refuses a schema.
def chat_json(messages: list[dict], schema: dict, temperature: float = 0.0,
max_tokens: int = 1024) -> dict:
try:
resp = client.chat.completions.create(
model=settings.served_name, messages=messages,
temperature=temperature, max_tokens=max_tokens,
extra_body={"guided_json": schema}, # vLLM constrains output to the schema
)
return json.loads(resp.choices[0].message.content or "{}")
except Exception as primary_err:
# Fallback: ask plainly for schema-only JSON, then defensively extract it.
guide = ("Respond with ONLY a single JSON object that conforms to this JSON Schema. "
"Output no prose and no code fences.\nSCHEMA:\n" + json.dumps(schema))
resp = client.chat.completions.create(
model=settings.served_name,
messages=messages + [{"role": "user", "content": guide}],
temperature=temperature, max_tokens=max_tokens,
)
parsed = extract_first_json(resp.choices[0].message.content or "")
if parsed is None:
raise RuntimeError(f"chat_json could not obtain JSON (guided error {primary_err!r})")
return parseddef chat_json(messages: list[dict], schema: dict, temperature: float = 0.0,
max_tokens: int = 1024) -> dict:
try:
resp = client.chat.completions.create(
model=settings.served_name, messages=messages,
temperature=temperature, max_tokens=max_tokens,
extra_body={"guided_json": schema}, # vLLM constrains output to the schema
)
return json.loads(resp.choices[0].message.content or "{}")
except Exception as primary_err:
# Fallback: ask plainly for schema-only JSON, then defensively extract it.
guide = ("Respond with ONLY a single JSON object that conforms to this JSON Schema. "
"Output no prose and no code fences.\nSCHEMA:\n" + json.dumps(schema))
resp = client.chat.completions.create(
model=settings.served_name,
messages=messages + [{"role": "user", "content": guide}],
temperature=temperature, max_tokens=max_tokens,
)
parsed = extract_first_json(resp.choices[0].message.content or "")
if parsed is None:
raise RuntimeError(f"chat_json could not obtain JSON (guided error {primary_err!r})")
return parsed
This one function is what makes a small model usable for the rest of the pipeline. Every stage that needs a decision, a threat table, a severity, a verdict, calls chat_json with a schema, and gets back a clean dictionary.
We stop fighting the model's prose and start working with structured data. With reliable output in hand, we can give the model something to act on.
The sandbox and the agent's tools
The model is going to write proof-of-concept inputs and run a target binary on them, and that binary is designed to crash. We absolutely do not want any of that touching the host.
This is the article's second step, run agents safely and verify exploitability, so every bit of execution happens inside a Docker container that is locked down hard, and we throw the container away after each use.
ISOLATION_FLAGS = [
"--network", "none", # no egress at all
"--memory", "2g", "--memory-swap", "2g",
"--pids-limit", "256",
"--cpus", "2",
"--cap-drop", "ALL",
"--security-opt", "no-new-privileges",
]
def docker_run_isolated(image: str, name: str, writable: bool = False) -> str:
flags = list(ISOLATION_FLAGS)
if not writable:
# read-only root filesystem, with a small writable scratch space at /tmp
flags += ["--read-only", "--tmpfs", "/tmp:rw,size=128m,exec"]
cmd = (["docker", "run", "-d", "--name", name] + flags +
[image, "tail", "-f", "/dev/null"]) # keepalive so we can docker exec into it
out = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if out.returncode != 0:
raise RuntimeError(f"docker run failed: {out.stderr.strip()}")
return nameISOLATION_FLAGS = [
"--network", "none", # no egress at all
"--memory", "2g", "--memory-swap", "2g",
"--pids-limit", "256",
"--cpus", "2",
"--cap-drop", "ALL",
"--security-opt", "no-new-privileges",
]
def docker_run_isolated(image: str, name: str, writable: bool = False) -> str:
flags = list(ISOLATION_FLAGS)
if not writable:
# read-only root filesystem, with a small writable scratch space at /tmp
flags += ["--read-only", "--tmpfs", "/tmp:rw,size=128m,exec"]
cmd = (["docker", "run", "-d", "--name", name] + flags +
[image, "tail", "-f", "/dev/null"]) # keepalive so we can docker exec into it
out = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if out.returncode != 0:
raise RuntimeError(f"docker run failed: {out.stderr.strip()}")
return name
Read those flags slowly, because they are the safety boundary. There is no network at all, so even a tricked model has nowhere to send anything.
The root filesystem is read only, with only a small /tmp for the proof-of-concept. We drop all Linux capabilities so the process cannot gain new privileges, and we cap memory and process counts so a runaway input cannot eat the box.
A discovery agent gets the read only version, and we allow writes only when we genuinely need them.
Inside that box, the agent gets exactly six tools, and no more. The model never gets to touch the host directly, it can only ask for one of these six things.
TOOLS = [
{"name": "list_dir", "description": "List files in a directory inside the target container."},
{"name": "read_file", "description": "Read a UTF-8 text file (such as C source) from the container."},
{"name": "grep", "description": "Recursively search for a regex pattern across files."},
{"name": "write_file","description": "Write base64-encoded bytes to a path (a PoC input file)."},
{"name": "run_in_sandbox", "description": "Run a shell command and get exit_code, stdout, stderr."},
{"name": "write_poc", "description": "Write a PoC input from compact byte segments. Prefer this for binary."},
]TOOLS = [
{"name": "list_dir", "description": "List files in a directory inside the target container."},
{"name": "read_file", "description": "Read a UTF-8 text file (such as C source) from the container."},
{"name": "grep", "description": "Recursively search for a regex pattern across files."},
{"name": "write_file","description": "Write base64-encoded bytes to a path (a PoC input file)."},
{"name": "run_in_sandbox", "description": "Run a shell command and get exit_code, stdout, stderr."},
{"name": "write_poc", "description": "Write a PoC input from compact byte segments. Prefer this for binary."},
]The first five are obvious. The sixth one, write_poc, is the interesting one, and it exists because of a genuine limitation of small models.
A 7B model cannot reliably hand-encode base64 of arbitrary binary bytes, and it cannot reliably type a literal string of two hundred bytes without making a mistake. So instead of asking it to, we give it a compact way to describe a binary input as a list of segments.
def tool_write_poc(container: str, segments: list, path: str = "/tmp/poc.bin") -> str:
# Each segment is either {"text": "ABC"} for ASCII, or {"value": 0-255, "count": N}
# for N repeats of one byte. This is something a 7B model CAN produce reliably.
data = bytearray()
try:
for seg in segments:
if "text" in seg:
data += str(seg["text"]).encode("latin-1", "replace")
else:
data += bytes([int(seg["value"]) & 0xFF]) * int(seg.get("count", 1))
except (TypeError, ValueError, KeyError) as e:
return f"ERROR: bad segments ({e})."
write_file_in(container, path, bytes(data))
return f"Wrote {len(data)} bytes to {path}. First 16 bytes (hex): {bytes(data[:16]).hex()}"def tool_write_poc(container: str, segments: list, path: str = "/tmp/poc.bin") -> str:
# Each segment is either {"text": "ABC"} for ASCII, or {"value": 0-255, "count": N}
# for N repeats of one byte. This is something a 7B model CAN produce reliably.
data = bytearray()
try:
for seg in segments:
if "text" in seg:
data += str(seg["text"]).encode("latin-1", "replace")
else:
data += bytes([int(seg["value"]) & 0xFF]) * int(seg.get("count", 1))
except (TypeError, ValueError, KeyError) as e:
return f"ERROR: bad segments ({e})."
write_file_in(container, path, bytes(data))
return f"Wrote {len(data)} bytes to {path}. First 16 bytes (hex): {bytes(data[:16]).hex()}"So the model can say "write the byte for the letter A, then two hundred bytes of value 65" and we turn that into raw bytes for it. This small accommodation is a recurring theme, we meet the small model where it is strong, structured choices, and we take away the things it is weak at, like typing raw binary.
The most important helper in this whole part is detonate, because it is the function that turns a guess into a fact. It drops a proof-of-concept into the box, runs the target on it, and decides whether the program crashed.
def detonate(container: str, poc_bytes: bytes, repro_cmd: str,
poc_path: str = "/tmp/poc.bin", timeout: int = 30) -> dict:
write_file_in(container, poc_path, poc_bytes) # drop the PoC into the box
rc, so, se = exec_sh(container, repro_cmd, timeout=timeout)
combined = (so + "\n" + se).strip()
# "crashed" means ASAN fired, or the process died from a signal (exit not 0 or 1)
return {"exit_code": rc, "stdout": so, "stderr": se, "output": combined,
"crashed": ("AddressSanitizer" in combined) or (rc not in (0, 1))}def detonate(container: str, poc_bytes: bytes, repro_cmd: str,
poc_path: str = "/tmp/poc.bin", timeout: int = 30) -> dict:
write_file_in(container, poc_path, poc_bytes) # drop the PoC into the box
rc, so, se = exec_sh(container, repro_cmd, timeout=timeout)
combined = (so + "\n" + se).strip()
# "crashed" means ASAN fired, or the process died from a signal (exit not 0 or 1)
return {"exit_code": rc, "stdout": so, "stderr": se, "output": combined,
"crashed": ("AddressSanitizer" in combined) or (rc not in (0, 1))}Read the definition of crashed, because it is the closest thing this system has to a single source of truth. A finding is confirmed precisely when detonate reports crashed=True with an AddressSanitizer trace.
Every later stage, discovery, verification, and patching, calls this same function and trusts its answer over anything the model says. When we run it on a normal, non-crashing command, it correctly reports no crash.
#### OUTPUT ####
detonate() on a benign command: {'exit_code': 0, 'crashed': False}
Tool names exposed to the model: ['list_dir', 'read_file', 'grep', 'write_file', 'run_in_sandbox', 'write_poc']
Good call : total 419264 drwxr-xr-x 2 root root 12288 May ...
Bad args : ERROR: tool 'read_file' is missing required argument(s): ['path']
Unknown : ERROR: unknown tool 'nope'. Valid tools: [...]#### OUTPUT ####
detonate() on a benign command: {'exit_code': 0, 'crashed': False}
Tool names exposed to the model: ['list_dir', 'read_file', 'grep', 'write_file', 'run_in_sandbox', 'write_poc']
Good call : total 419264 drwxr-xr-x 2 root root 12288 May ...
Bad args : ERROR: tool 'read_file' is missing required argument(s): ['path']
Unknown : ERROR: unknown tool 'nope'. Valid tools: [...]Notice the last two lines. When the model calls a tool with missing arguments, or invents a tool that does not exist, the dispatcher does not throw an exception and crash the run.
It returns a clear ERROR string, which becomes the observation the model sees next, so the model can read its own mistake and correct it. The tools are defined, so now we need the engine that lets the model use them.
The tool-use loop
Every agent in this notebook, the threat modeler, the discovery agents, the patcher, all share one small engine. The model is asked for a single action as JSON, we run that action, we feed the result back, and we repeat. We constrain the action with a schema so it is always parseable.
ACTION_SCHEMA = {
"type": "object",
"properties": {
"thought": {"type": "string", "description": "brief reasoning for this step"},
"action": {"type": "string", "enum": list(TOOL_IMPLS.keys()) + ["final"]},
"args": {"type": "object"},
},
"required": ["thought", "action", "args"],
}ACTION_SCHEMA = {
"type": "object",
"properties": {
"thought": {"type": "string", "description": "brief reasoning for this step"},
"action": {"type": "string", "enum": list(TOOL_IMPLS.keys()) + ["final"]},
"args": {"type": "object"},
},
"required": ["thought", "action", "args"],
}The model emits a thought, an action name from the fixed list, and the arguments. The action is either one of our six tools or the special word final, which means the agent is done. Here is the loop itself, trimmed to the core.
def run_tool_loop(system: str, user: str, container: str, *,
max_turns: int = 20, temperature: float = 0.0) -> tuple[dict, list[dict]]:
messages = [{"role": "system", "content": system}, {"role": "user", "content": user}]
consecutive_bad = 0
for turn in range(1, max_turns + 1):
try:
action = chat_json(messages, ACTION_SCHEMA, temperature=temperature)
except Exception:
# A small model can emit unparseable junk; nudge it instead of crashing.
consecutive_bad += 1
if consecutive_bad >= 4:
return {"error": "too_many_bad_calls"}, transcript
messages.append({"role": "user", "content": "ERROR: reply with ONE small JSON action."})
continue
name, args = action.get("action", ""), action.get("args", {}) or {}
if name == "final":
return args, transcript # the agent says it is done
result = dispatch_tool(name, container, args) # run the tool in the sandbox
clipped = result[:CLIP] + ("\n...[truncated]" if len(result) > CLIP else "")
messages.append({"role": "user", "content": f"TOOL_RESULT {name}:\n{clipped}"})
return {"error": "max_turns"}, transcript # ran out of turns without finishingdef run_tool_loop(system: str, user: str, container: str, *,
max_turns: int = 20, temperature: float = 0.0) -> tuple[dict, list[dict]]:
messages = [{"role": "system", "content": system}, {"role": "user", "content": user}]
consecutive_bad = 0
for turn in range(1, max_turns + 1):
try:
action = chat_json(messages, ACTION_SCHEMA, temperature=temperature)
except Exception:
# A small model can emit unparseable junk; nudge it instead of crashing.
consecutive_bad += 1
if consecutive_bad >= 4:
return {"error": "too_many_bad_calls"}, transcript
messages.append({"role": "user", "content": "ERROR: reply with ONE small JSON action."})
continue
name, args = action.get("action", ""), action.get("args", {}) or {}
if name == "final":
return args, transcript # the agent says it is done
result = dispatch_tool(name, container, args) # run the tool in the sandbox
clipped = result[:CLIP] + ("\n...[truncated]" if len(result) > CLIP else "")
messages.append({"role": "user", "content": f"TOOL_RESULT {name}:\n{clipped}"})
return {"error": "max_turns"}, transcript # ran out of turns without finishing
There are three safety valves here. We clip every tool result to CLIP characters so a giant output cannot blow up the context window. We cap the number of turns so a confused agent cannot loop forever.
And we count consecutive bad calls, because a small model sometimes emits a reply that is not valid JSON, so instead of crashing we nudge it and retry, giving up only after four bad calls in a row.
The dispatch_tool call is the same allow-list idea from the sandbox section, returning a plain ERROR string for anything it does not recognize, so even a malformed call becomes a normal observation rather than an exception.
When the loop hits the turn cap it returns a clear sentinel instead of hanging. Here is that sentinel firing on a deliberately impossible task, finding a file that does not exist.
#### OUTPUT ####
[turn 1] grep(pattern='secret.txt', path='/tmp') // locate the file secret.txt
[turn 2] grep(pattern='secret.txt', path='/') // not under /tmp, check everywhere
[turn 3] list_dir(path='/etc') // searching took too long
[turn 4] list_dir(path='/home') // not in /etc either
[turn 5] list_dir(path='/root') // not in /home either
[turn 6] list_dir(path='/var/log') // not in /root either
reached max_turns (6) without a final answer
Final answer from the agent: {'error': 'max_turns'}#### OUTPUT ####
[turn 1] grep(pattern='secret.txt', path='/tmp') // locate the file secret.txt
[turn 2] grep(pattern='secret.txt', path='/') // not under /tmp, check everywhere
[turn 3] list_dir(path='/etc') // searching took too long
[turn 4] list_dir(path='/home') // not in /etc either
[turn 5] list_dir(path='/root') // not in /home either
[turn 6] list_dir(path='/var/log') // not in /root either
reached max_turns (6) without a final answer
Final answer from the agent: {'error': 'max_turns'}The agent tried six different things, never found the file, and came back with {'error': 'max_turns'}. This is exactly what we want.
The agent did not crash, it did not loop forever, and it told the caller plainly that it did not converge.
Every stage in the pipeline checks for this sentinel and treats it as "no result", which keeps a single stuck agent from taking down the whole run. The engine is ready, so now we define the data it passes around.
The records that flow between stages
Each stage produces a typed record and hands it to the next stage. Writing these down as small dataclasses keeps the pipeline disciplined, because every stage knows exactly what shape of data it receives and returns.
The first record is the crash itself, the thing discovery produces when it proves a bug.
@dataclass
class CrashArtifact:
poc_path: str # where the PoC was written, e.g. /tmp/poc.bin
poc_bytes: bytes # the crashing input itself
reproduction_command: str # e.g. /work/entry /tmp/poc.bin
crash_type: str # e.g. heap-buffer-overflow
crash_output: str # the ASAN trace (clipped)
exit_code: int # 134 = SIGABRT from ASAN
focus_area: Optional[str] = None # which partition found it@dataclass
class CrashArtifact:
poc_path: str # where the PoC was written, e.g. /tmp/poc.bin
poc_bytes: bytes # the crashing input itself
reproduction_command: str # e.g. /work/entry /tmp/poc.bin
crash_type: str # e.g. heap-buffer-overflow
crash_output: str # the ASAN trace (clipped)
exit_code: int # 134 = SIGABRT from ASAN
focus_area: Optional[str] = None # which partition found itThe key field is poc_bytes, the actual input that triggers the crash. That is the thing we can re-run later to prove the bug all over again, independent of anything the model said.
Verification then produces a grade, and triage produces a judgment about whether two findings are the same bug.
@dataclass
class GraderVerdict:
passed: bool
score: float # fraction of detonations that crashed, 0.0 .. 1.0
evidence: str = ""
reproductions: int = 0 # how many of N detonations crashed
@dataclass
class JudgeVerdict:
judgment: str # NEW | DUP_BETTER | DUP_SKIP
bug_id: Optional[int] = None
reasoning: str = ""
@dataclass
class Finding:
id: str # F-001, F-002, ...
file: str
line: int
category: str # the crash type / vulnerability class
severity: str = "UNKNOWN"
crash: Optional[CrashArtifact] = None@dataclass
class GraderVerdict:
passed: bool
score: float # fraction of detonations that crashed, 0.0 .. 1.0
evidence: str = ""
reproductions: int = 0 # how many of N detonations crashed
@dataclass
class JudgeVerdict:
judgment: str # NEW | DUP_BETTER | DUP_SKIP
bug_id: Optional[int] = None
reasoning: str = ""
@dataclass
class Finding:
id: str # F-001, F-002, ...
file: str
line: int
category: str # the crash type / vulnerability class
severity: str = "UNKNOWN"
crash: Optional[CrashArtifact] = NoneThe GraderVerdict is what the re-detonation oracle returns, and its reproductions field counts how many runs crashed, which is the number we will actually trust.
The JudgeVerdict is a small three-way verdict for deduplication, where a finding is either new, a better version of one we already have, or a duplicate to skip.
The Finding is the clean object that carries a confirmed crash through triage. The remaining records carry the result of triage and patching, a triaged finding and a patch verdict.
@dataclass
class TriageRecord:
id: str
file: str
line: int
category: str
verdict: str # true_positive | false_positive | duplicate
severity: Optional[str] = None # CRITICAL | HIGH | MEDIUM | LOW
confidence: float = 0.0
preconditions: list = field(default_factory=list)
access_level: str = ""
rationale: str = ""
owner_hint: str = ""
@dataclass
class PatchVerdict:
t0_builds: bool = False # patch applies and the target rebuilds
t1_poc_stops: bool = False # the original PoC no longer crashes
t2_tests_pass: Optional[bool] = None # regression suite (None if absent)
re_attack_clean: Optional[bool] = None # fresh discovery finds no variant
t3_style_score: Optional[float] = None # advisory, does not gate
@property
def passed(self) -> bool:
return (self.t0_builds and self.t1_poc_stops and
self.t2_tests_pass is not False and
self.re_attack_clean is not False)@dataclass
class TriageRecord:
id: str
file: str
line: int
category: str
verdict: str # true_positive | false_positive | duplicate
severity: Optional[str] = None # CRITICAL | HIGH | MEDIUM | LOW
confidence: float = 0.0
preconditions: list = field(default_factory=list)
access_level: str = ""
rationale: str = ""
owner_hint: str = ""
@dataclass
class PatchVerdict:
t0_builds: bool = False # patch applies and the target rebuilds
t1_poc_stops: bool = False # the original PoC no longer crashes
t2_tests_pass: Optional[bool] = None # regression suite (None if absent)
re_attack_clean: Optional[bool] = None # fresh discovery finds no variant
t3_style_score: Optional[float] = None # advisory, does not gate
@property
def passed(self) -> bool:
return (self.t0_builds and self.t1_poc_stops and
self.t2_tests_pass is not False and
self.re_attack_clean is not False)Look at the passed property on the patch verdict, because the logic is deliberate.
A patch passes only if it builds and the original proof-of-concept no longer crashes, and the test and re-attack checks are allowed to be missing, but they are never allowed to be a hard False. That is the spine of the patch ladder we build later.
One more reason to write these as plain dataclasses is that they serialize cleanly to JSON. Every stage writes its result to disk, the threat model as Markdown, the findings as a TRIAGE.json, and each patch as a diff plus a result file.
So at the end of a run we have a folder of structured artifacts a human can open and check, not a chat log to read. That is what a security team wants from a scan.
With the records defined, we need something to attack, so let me introduce the canary.
The canary: three planted bugs
Before we point this at production code, we need a target where we already know the answer, so we can tell whether the pipeline is working or fooling itself.
This is the canary, a tiny C program with three deliberately planted memory bugs. It is like a test fixture, we know exactly what bugs exist and what inputs trigger them.
If the pipeline misses a planted bug, that is a recall problem, and if it reports a bug that is not there, that is a precision problem. A known-answer target is the only way to measure either one before we move to code where nobody knows the answer in advance. The first byte of the input picks which parser runs.
Here is the whole program. Read each parser, because these three bugs are the three crash types we are going to hunt.
/* parse_alpha: heap-buffer-overflow.
Allocates 8 bytes, then copies `claimed` bytes (the first input byte, fully
attacker controlled, up to 255) into it. No bounds check. */
static void parse_alpha(const unsigned char *data, size_t n) {
if (n < 1) return;
size_t claimed = data[0];
char *buf = (char *)malloc(8);
memcpy(buf, data, claimed); /* OOB write whenever claimed > 8 */
__asm__ volatile("" : : "r"(buf) : "memory");
printf("alpha copied %zu bytes\n", claimed);
free(buf);
}
/* parse_bravo: stack-buffer-overflow.
Copies the entire payload into a fixed 16-byte stack array, no size check. */
static void parse_bravo(const unsigned char *data, size_t n) {
char name[16];
memcpy(name, data, n); /* OOB write whenever n > 16 */
name[15] = 0;
printf("bravo got a name of %zu bytes\n", n);
}
/* parse_charlie: heap-use-after-free.
Frees the record on a sentinel id byte, then writes through the freed pointer. */
struct rec { unsigned char id; unsigned char value; };
static void parse_charlie(const unsigned char *data, size_t n) {
if (n < 1) return;
struct rec *r = (struct rec *)malloc(sizeof(struct rec));
r->id = data[0];
if (r->id == 0xff) {
free(r); /* freed here ... */
}
r->value = (n > 1) ? data[1] : 0; /* ... and used here: use-after-free */
printf("charlie id=%u value=%u\n", r->id, r->value);
}/* parse_alpha: heap-buffer-overflow.
Allocates 8 bytes, then copies `claimed` bytes (the first input byte, fully
attacker controlled, up to 255) into it. No bounds check. */
static void parse_alpha(const unsigned char *data, size_t n) {
if (n < 1) return;
size_t claimed = data[0];
char *buf = (char *)malloc(8);
memcpy(buf, data, claimed); /* OOB write whenever claimed > 8 */
__asm__ volatile("" : : "r"(buf) : "memory");
printf("alpha copied %zu bytes\n", claimed);
free(buf);
}
/* parse_bravo: stack-buffer-overflow.
Copies the entire payload into a fixed 16-byte stack array, no size check. */
static void parse_bravo(const unsigned char *data, size_t n) {
char name[16];
memcpy(name, data, n); /* OOB write whenever n > 16 */
name[15] = 0;
printf("bravo got a name of %zu bytes\n", n);
}
/* parse_charlie: heap-use-after-free.
Frees the record on a sentinel id byte, then writes through the freed pointer. */
struct rec { unsigned char id; unsigned char value; };
static void parse_charlie(const unsigned char *data, size_t n) {
if (n < 1) return;
struct rec *r = (struct rec *)malloc(sizeof(struct rec));
r->id = data[0];
if (r->id == 0xff) {
free(r); /* freed here ... */
}
r->value = (n > 1) ? data[1] : 0; /* ... and used here: use-after-free */
printf("charlie id=%u value=%u\n", r->id, r->value);
}There is one line in parse_alpha that deserves a word, the __asm__ volatile line. Without it, the compiler at optimization level one treats the overflowing copy as a dead store and quietly deletes it, so the bug vanishes at compile time and never fires.
That assembly line is a barrier that forces the optimizer to treat the buffer as genuinely used, so the overflow actually happens. It is a common pitfall when you write canaries.
We compile this with AddressSanitizer turned on, and we tell the sanitizer to abort the moment it sees anything wrong.
FROM gcc:14
WORKDIR /work
COPY canary.c /work/canary.c
RUN gcc -O1 -g -fsanitize=address -fno-omit-frame-pointer -o /work/entry /work/canary.c
ENV ASAN_OPTIONS=abort_on_error=1:detect_leaks=0:halt_on_error=1FROM gcc:14
WORKDIR /work
COPY canary.c /work/canary.c
RUN gcc -O1 -g -fsanitize=address -fno-omit-frame-pointer -o /work/entry /work/canary.c
ENV ASAN_OPTIONS=abort_on_error=1:detect_leaks=0:halt_on_error=1To make sure our target and our sanitizer actually work, we feed each parser a known crashing input by hand, before any model is involved. We write these proof-of-concept bytes ourselves, so if any of them fails to crash, we know the problem is in our build, not in the model.
KNOWN_POCS = {
"parse_alpha (heap-buffer-overflow)": b"A\xff" + b"\x00" * 40, # 'A', then claim 255 bytes
"parse_bravo (stack-buffer-overflow)": b"B" + b"\x41" * 64, # 'B', then 64 bytes into name[16]
"parse_charlie (use-after-free)": b"C\xff\x41", # 'C', id 0xff frees, then writes
}KNOWN_POCS = {
"parse_alpha (heap-buffer-overflow)": b"A\xff" + b"\x00" * 40, # 'A', then claim 255 bytes
"parse_bravo (stack-buffer-overflow)": b"B" + b"\x41" * 64, # 'B', then 64 bytes into name[16]
"parse_charlie (use-after-free)": b"C\xff\x41", # 'C', id 0xff frees, then writes
}Each input starts with the routing byte, then carries just enough payload to trip the bug.
The alpha input claims 255 bytes into an 8 byte buffer, the bravo input sends 64 bytes into a 16 byte stack array, and the charlie input sets the id to the sentinel 0xff so the record is freed and then written. This is the sanity check.
#### OUTPUT ####
parse_alpha (heap-buffer-overflow)
exit_code=134 crashed=True
SUMMARY: AddressSanitizer: heap-buffer-overflow (.../libasan.so.8+0xf27ee) in memcpy
parse_bravo (stack-buffer-overflow)
exit_code=134 crashed=True
SUMMARY: AddressSanitizer: memcpy-param-overlap (.../libasan.so.8+0xf264d) in memcpy
parse_charlie (use-after-free)
exit_code=134 crashed=True
SUMMARY: AddressSanitizer: heap-use-after-free /work/canary.c:46 in parse_charlie#### OUTPUT ####
parse_alpha (heap-buffer-overflow)
exit_code=134 crashed=True
SUMMARY: AddressSanitizer: heap-buffer-overflow (.../libasan.so.8+0xf27ee) in memcpy
parse_bravo (stack-buffer-overflow)
exit_code=134 crashed=True
SUMMARY: AddressSanitizer: memcpy-param-overlap (.../libasan.so.8+0xf264d) in memcpy
parse_charlie (use-after-free)
exit_code=134 crashed=True
SUMMARY: AddressSanitizer: heap-use-after-free /work/canary.c:46 in parse_charlieAll three crash with exit code 134, which is the abort signal from the sanitizer.
There is one nice detail here. We planted parse_bravo as a stack overflow, but AddressSanitizer reports it as memcpy-param-overlap, because the source and destination overlap on the stack.
That is fine, and it is correct, the pipeline carries through whatever the sanitizer actually says, not what we expected it to say. The sanitizer is the authority on what kind of bug this is. The target works, so now we run the full loop on it, starting with the threat model.
Stage 1: the threat model
The first stage decides what we are even looking for, which is the article's first step, define what counts as a vulnerability.
A good threat model raises the hit rate sharply, because it tells the discovery agents which trust boundaries matter, and it gives later stages context for judging severity. The article is clear on this, with one team finding their results were exploitable around ninety percent of the time once the threat model was well documented.
TM_EXPLORE_SYS = (
"You are a senior security engineer building a threat model for a C program. This is "
"authorized defensive security work on a self-contained target. Use the tools to read "
"the source under /work, then finalize with action 'final' and a summary describing "
"what the program does, its entry point, and the parsers an attacker can reach."
)TM_EXPLORE_SYS = (
"You are a senior security engineer building a threat model for a C program. This is "
"authorized defensive security work on a self-contained target. Use the tools to read "
"the source under /work, then finalize with action 'final' and a summary describing "
"what the program does, its entry point, and the parsers an attacker can reach."
)We do this in two phases. First the model explores the source with the read-only tools, then it emits a structured threat model that we force through a schema so it always comes back as a clean table.
The explore phase uses the tool-use loop we just built, with a system prompt that tells the model what kind of work this is and how to finish.
Here is the model exploring on its own. It lists the directory, reads the source, greps for the parser functions, and then finalizes with a summary of the attack surface.
#### OUTPUT ####
Phase 1: agent explores the source ...
[turn 1] list_dir(path='/work') // see what is here
[turn 2] read_file(path='/work/canary.c') // read the C source
[turn 3] grep(pattern='parse_(alpha|bravo|charlie)', path='/work')
[turn 6] FINAL: canary.c defines parse_alpha, parse_bravo, parse_charlie ...
Agent summary of the attack surface:
The program canary.c contains three functions with known vulnerabilities: parse_alpha
suffers from a heap-buffer-overflow, parse_bravo has a stack-buffer-overflow, and
parse_charlie experiences a heap-use-after-free. The main function acts as the entry
point, and an attacker can exploit these by providing input starting with 'A', 'B', or 'C'.#### OUTPUT ####
Phase 1: agent explores the source ...
[turn 1] list_dir(path='/work') // see what is here
[turn 2] read_file(path='/work/canary.c') // read the C source
[turn 3] grep(pattern='parse_(alpha|bravo|charlie)', path='/work')
[turn 6] FINAL: canary.c defines parse_alpha, parse_bravo, parse_charlie ...
Agent summary of the attack surface:
The program canary.c contains three functions with known vulnerabilities: parse_alpha
suffers from a heap-buffer-overflow, parse_bravo has a stack-buffer-overflow, and
parse_charlie experiences a heap-use-after-free. The main function acts as the entry
point, and an attacker can exploit these by providing input starting with 'A', 'B', or 'C'.The model read the code and understood it. It correctly named all three parsers, the bug class in each, and the fact that the first byte of the file selects which one runs.
That summary becomes the input to the second phase, where we ask for the structured table.
TM_STRUCT_SYS = (
"You are a senior security engineer. Given a program summary, its source, and its "
"vulnerability history, produce a structured threat model. Name trust boundaries "
"explicitly (untrusted file bytes cross into process memory). For each plausible "
"memory-safety threat add a row with a stable id (T1, T2, ...). Ground the evidence "
"column in the history and the source."
)TM_STRUCT_SYS = (
"You are a senior security engineer. Given a program summary, its source, and its "
"vulnerability history, produce a structured threat model. Name trust boundaries "
"explicitly (untrusted file bytes cross into process memory). For each plausible "
"memory-safety threat add a row with a stable id (T1, T2, ...). Ground the evidence "
"column in the history and the source."
)The model reads the three parsers and produces this. I am showing it as text rather than a rendered table, but it is the actual output.
#### OUTPUT ####
## 3. Entry points and trust boundaries
| entry point | trust boundary |
| main | Untrusted File Bytes Cross Into Process Memory |
## 4. Threats
| id | threat | surface | impact | likelihood | evidence |
| T1 | Heap Buffer Overflow | parse_alpha | critical | likely | fixed-size copy routines exceed the dest buf |
| T2 | Stack Buffer Overflow| parse_bravo | critical | likely | similar issues reported in the parser family |
| T3 | Heap Use After Free | parse_charlie | critical | likely | a record freed on a sentinel, used afterward |#### OUTPUT ####
## 3. Entry points and trust boundaries
| entry point | trust boundary |
| main | Untrusted File Bytes Cross Into Process Memory |
## 4. Threats
| id | threat | surface | impact | likelihood | evidence |
| T1 | Heap Buffer Overflow | parse_alpha | critical | likely | fixed-size copy routines exceed the dest buf |
| T2 | Stack Buffer Overflow| parse_bravo | critical | likely | similar issues reported in the parser family |
| T3 | Heap Use After Free | parse_charlie | critical | likely | a record freed on a sentinel, used afterward |The model correctly named the trust boundary, untrusted file bytes crossing into process memory, and it found all three parsers as attack surface with the right bug classes.
Notice the evidence column is grounded in the bug history we gave it, which is the model doing genuine reasoning rather than guessing.
This threat table now travels with every later stage, so when we assign severity at triage time, the model has this context in front of it. We know what to look for, so now we make sure we can read a crash when we cause one.
Stage 2: the sandbox boundary and reading
Discovery is going to produce raw AddressSanitizer output, which is verbose and noisy.
Before we run discovery, we build the small parser that turns a wall of sanitizer text into a clean, comparable signature. This is what lets us deduplicate findings later and confirm a fix actually changed the crash.
_RE_SUMMARY = re.compile(r"SUMMARY:\s*AddressSanitizer:\s*([a-zA-Z0-9_\-]+)")
_RE_OP = re.compile(r"\b(READ|WRITE) of size (\d+)")
_ASAN_RUNTIME = ("__asan", "__interceptor", "__sanitizer", "memcpy", "memmove", "memset")
def crash_reason(output: str) -> dict:
ct = _RE_SUMMARY.search(output)
op = _RE_OP.search(output)
return {
"crash_type": ct.group(1) if ct else "unknown",
"operation": op.group(1) if op else None,
"size": int(op.group(2)) if op else None,
}
def project_frames(output: str, limit: int = 10) -> list[str]:
frames = []
for m in _RE_FRAME.finditer(output):
_, func, file_, line = m.groups()
if any(p in func for p in _ASAN_RUNTIME):
continue # skip sanitizer runtime and libc frames, keep our code
frames.append(f"{func} {file_}:{line}")
if len(frames) >= limit:
break
return frames_RE_SUMMARY = re.compile(r"SUMMARY:\s*AddressSanitizer:\s*([a-zA-Z0-9_\-]+)")
_RE_OP = re.compile(r"\b(READ|WRITE) of size (\d+)")
_ASAN_RUNTIME = ("__asan", "__interceptor", "__sanitizer", "memcpy", "memmove", "memset")
def crash_reason(output: str) -> dict:
ct = _RE_SUMMARY.search(output)
op = _RE_OP.search(output)
return {
"crash_type": ct.group(1) if ct else "unknown",
"operation": op.group(1) if op else None,
"size": int(op.group(2)) if op else None,
}
def project_frames(output: str, limit: int = 10) -> list[str]:
frames = []
for m in _RE_FRAME.finditer(output):
_, func, file_, line = m.groups()
if any(p in func for p in _ASAN_RUNTIME):
continue # skip sanitizer runtime and libc frames, keep our code
frames.append(f"{func} {file_}:{line}")
if len(frames) >= limit:
break
return framesThe important move in project_frames is that we throw away the sanitizer's own internal frames and the libc interceptors, like the memcpy frame, and keep only the frames in the target's own code.
That gives us the exact location of the bug. Here is the parser running on a live parse_alpha crash.
#### OUTPUT ####
Raw trace (first lines):
==20==ERROR: AddressSanitizer: heap-buffer-overflow on address 0x502000000018
WRITE of size 255 at 0x502000000018 thread T0
#0 ... in memcpy
#1 0x4014a6 in parse_alpha /work/canary.c:14
#2 0x4014a6 in main /work/canary.c:58
Parsed crash_reason : {'crash_type': 'heap-buffer-overflow', 'operation': 'WRITE', 'size': 255}
Top project frame : parse_alpha /work/canary.c:14
Signature (type,frame): ('heap-buffer-overflow', 'parse_alpha /work/canary.c:14')#### OUTPUT ####
Raw trace (first lines):
==20==ERROR: AddressSanitizer: heap-buffer-overflow on address 0x502000000018
WRITE of size 255 at 0x502000000018 thread T0
#0 ... in memcpy
#1 0x4014a6 in parse_alpha /work/canary.c:14
#2 0x4014a6 in main /work/canary.c:58
Parsed crash_reason : {'crash_type': 'heap-buffer-overflow', 'operation': 'WRITE', 'size': 255}
Top project frame : parse_alpha /work/canary.c:14
Signature (type,frame): ('heap-buffer-overflow', 'parse_alpha /work/canary.c:14')We turned a noisy multi-line trace into one clean tuple, the crash type and the exact line in our code where it happens. That signature, heap-buffer-overflow at parse_alpha /work/canary.c:14, is what we compare against when we deduplicate findings and when we check whether a patch changed the outcome.
There is one more helper, asan_excerpt, and it matters because of what the later model stages see. A full ASAN trace is thousands of characters, and if we pasted all of it into the severity prompt or the verifier prompt it would crowd out everything else.
So we trim the trace down to the one summary line plus the project frames.
def asan_excerpt(output: str, max_frames: int = 10) -> str:
summary = next((ln for ln in output.splitlines() if "SUMMARY:" in ln), "")
frames = project_frames(output, limit=max_frames)
return (summary + "\n" + "\n".join(frames)).strip()
#### OUTPUT ####
asan_excerpt (what downstream stages actually see):
SUMMARY: AddressSanitizer: heap-buffer-overflow (.../libasan.so.8+0xf27ee) in memcpy
parse_alpha /work/canary.c:14
main /work/canary.c:58def asan_excerpt(output: str, max_frames: int = 10) -> str:
summary = next((ln for ln in output.splitlines() if "SUMMARY:" in ln), "")
frames = project_frames(output, limit=max_frames)
return (summary + "\n" + "\n".join(frames)).strip()
#### OUTPUT ####
asan_excerpt (what downstream stages actually see):
SUMMARY: AddressSanitizer: heap-buffer-overflow (.../libasan.so.8+0xf27ee) in memcpy
parse_alpha /work/canary.c:14
main /work/canary.c:58That short excerpt is what we hand to the severity rubric and the adversarial verifier, never the raw wall of text.
It keeps the model focused on the crash class and the location, which is all it needs to reason about. Now we can read a crash, so let us go find some.
Stage 3: discovery
Discovery is where we optimize for recall. We want to find as many genuine bugs as we can, and we are happy to be noisy, because verification will clean up afterward.
The first step is recon, where one agent partitions the attack surface into independent focus areas, so that the agents we fan out next do not all pile onto the same spot. We constrain recon with its own small schema and prompt.
FOCUS_SCHEMA = {
"type": "object",
"properties": {
"focus_areas": {"type": "array", "items": {"type": "object", "properties": {
"name": {"type": "string"}, "hint": {"type": "string"}},
"required": ["name", "hint"]}}},
"required": ["focus_areas"],
}
RECON_SYS = (
"You are partitioning a C program's attack surface for parallel security review. The "
"program reads a file; byte 0 selects a parser ('A','B','C') and the rest is the "
"payload. Return one focus area per independently reachable parser, each with a "
"one-line hint about how an attacker reaches it."
)FOCUS_SCHEMA = {
"type": "object",
"properties": {
"focus_areas": {"type": "array", "items": {"type": "object", "properties": {
"name": {"type": "string"}, "hint": {"type": "string"}},
"required": ["name", "hint"]}}},
"required": ["focus_areas"],
}
RECON_SYS = (
"You are partitioning a C program's attack surface for parallel security review. The "
"program reads a file; byte 0 selects a parser ('A','B','C') and the rest is the "
"payload. Return one focus area per independently reachable parser, each with a "
"one-line hint about how an attacker reaches it."
)For the canary that gives us one area per parser, which is exactly what we want, because the three bugs are independent and can be attacked in parallel.
#### OUTPUT ####
Recon produced 3 focus area(s):
[0] parse_alpha: By providing a file starting with 'A' followed by more than 8 bytes.
[1] parse_bravo: By providing a file starting with 'B' followed by more than 16 bytes.
[2] parse_charlie: By providing a file starting with 'C' followed by a byte equal to 0xff.#### OUTPUT ####
Recon produced 3 focus area(s):
[0] parse_alpha: By providing a file starting with 'A' followed by more than 8 bytes.
[1] parse_bravo: By providing a file starting with 'B' followed by more than 16 bytes.
[2] parse_charlie: By providing a file starting with 'C' followed by a byte equal to 0xff.Each agent gets a system prompt that tells it exactly what to do, find one bug in its focus area and prove it with a crashing input.
def find_system_prompt(focus: str) -> str:
return (
"You are an offensive security agent. Your goal is to find ONE memory-safety bug "
"in your assigned focus area and PROVE it by making the target binary crash under "
"AddressSanitizer.\n"
"How the target works: it reads a file given as argv[1]. Byte 0 selects the parser: "
"'A' -> parse_alpha, 'B' -> parse_bravo, 'C' -> parse_charlie. The bytes after byte 0 "
"are the payload. Write your PoC with write_poc to /tmp/poc.bin, run /work/entry "
"/tmp/poc.bin, and confirm it crashes.\n"
f"YOUR FOCUS AREA: {focus}"
)def find_system_prompt(focus: str) -> str:
return (
"You are an offensive security agent. Your goal is to find ONE memory-safety bug "
"in your assigned focus area and PROVE it by making the target binary crash under "
"AddressSanitizer.\n"
"How the target works: it reads a file given as argv[1]. Byte 0 selects the parser: "
"'A' -> parse_alpha, 'B' -> parse_bravo, 'C' -> parse_charlie. The bytes after byte 0 "
"are the payload. Write your PoC with write_poc to /tmp/poc.bin, run /work/entry "
"/tmp/poc.bin, and confirm it crashes.\n"
f"YOUR FOCUS AREA: {focus}"
)There is a tension worth naming here. The article found that long, prescriptive checklists actually make discovery worse, because they box in the model's creativity.
But a 7B model on a tiny target needs more hand-holding than a frontier model does, so for the canary we give it a fairly exact recipe.
On a larger codebase you would loosen this prompt and let the model roam. It is a dial, and where you set it depends on how strong your model is.
Then we fan out, one agent per focus area, running in parallel.
Each agent gets a read-only sandbox, explores the source, and writes a proof-of-concept input that it thinks will crash the target. Here is the agent function, and the most important part is the end of it.
def discovery_agent_sync(focus: str, image: str, idx: int, settings: Settings,
max_turns: int = 40) -> Optional[CrashArtifact]:
name = f"find-{idx}"
docker_run_isolated(image, name, writable=False) # read-only root, /tmp tmpfs
try:
run_tool_loop(find_system_prompt(focus),
f"Find and prove a memory-safety bug in: {focus}",
name, max_turns=max_turns, temperature=0.1)
# Host-side confirmation, independent of whatever the agent claimed.
poc = read_file_in(name, FIXED_POC)
if not poc:
return None
det = detonate(name, poc, FIXED_REPRO)
if not det["crashed"]:
return None # agent's claim does not matter
reason = crash_reason(det["output"])
return CrashArtifact(poc_path=FIXED_POC, poc_bytes=poc,
reproduction_command=FIXED_REPRO,
crash_type=reason["crash_type"], crash_output=det["output"],
exit_code=det["exit_code"], focus_area=focus)
finally:
docker_rm(name)def discovery_agent_sync(focus: str, image: str, idx: int, settings: Settings,
max_turns: int = 40) -> Optional[CrashArtifact]:
name = f"find-{idx}"
docker_run_isolated(image, name, writable=False) # read-only root, /tmp tmpfs
try:
run_tool_loop(find_system_prompt(focus),
f"Find and prove a memory-safety bug in: {focus}",
name, max_turns=max_turns, temperature=0.1)
# Host-side confirmation, independent of whatever the agent claimed.
poc = read_file_in(name, FIXED_POC)
if not poc:
return None
det = detonate(name, poc, FIXED_REPRO)
if not det["crashed"]:
return None # agent's claim does not matter
reason = crash_reason(det["output"])
return CrashArtifact(poc_path=FIXED_POC, poc_bytes=poc,
reproduction_command=FIXED_REPRO,
crash_type=reason["crash_type"], crash_output=det["output"],
exit_code=det["exit_code"], focus_area=focus)
finally:
docker_rm(name)
Read the part after the tool loop returns. We do not trust what the agent says. The agent can claim it found a terrible bug, and we ignore that claim entirely.
Instead, the host reads back the proof-of-concept file the agent wrote, and the host re-runs it against the binary itself with detonate. A CrashArtifact is only created if the host reproduces the crash.
This is the article's biggest lever, running the proof-of-concept, written directly into the code. A finding exists only when the host reproduces the crash, full stop.
The swarm itself is small. It runs the agents concurrently, capped at our swarm width, and it is careful that one failing agent cannot bring down the others.
async def run_swarm(focus_areas, image, settings, max_turns=40) -> list[CrashArtifact]:
sem = asyncio.Semaphore(MAX_PARALLEL_AGENTS) # cap concurrency at the swarm width
async def bounded(idx, focus):
async with sem:
print(f" agent {idx} starting on: {focus[:60]}")
crash = await asyncio.to_thread(discovery_agent_sync, focus, image, idx, settings)
print(f" agent {idx} done: {crash.crash_type if crash else 'no crash'}")
return crash
# return_exceptions=True so one dying agent never kills the whole swarm
results = await asyncio.gather(*[bounded(i, f) for i, f in enumerate(focus_areas)],
return_exceptions=True)
return [r for r in results if isinstance(r, CrashArtifact)]async def run_swarm(focus_areas, image, settings, max_turns=40) -> list[CrashArtifact]:
sem = asyncio.Semaphore(MAX_PARALLEL_AGENTS) # cap concurrency at the swarm width
async def bounded(idx, focus):
async with sem:
print(f" agent {idx} starting on: {focus[:60]}")
crash = await asyncio.to_thread(discovery_agent_sync, focus, image, idx, settings)
print(f" agent {idx} done: {crash.crash_type if crash else 'no crash'}")
return crash
# return_exceptions=True so one dying agent never kills the whole swarm
results = await asyncio.gather(*[bounded(i, f) for i, f in enumerate(focus_areas)],
return_exceptions=True)
return [r for r in results if isinstance(r, CrashArtifact)]The semaphore is what lets us point this at bigger code later without spawning a hundred containers at once, it just queues agents up to the width we set. And return_exceptions=True means a single crashed agent becomes a None we filter out, not an exception that kills the run. Here is the swarm running on the canary.
#### OUTPUT ####
Dispatching 3 discovery agents with swarm width 8
agent 0 starting on: parse_alpha (heap-buffer-overflow)
agent 1 starting on: parse_bravo (stack-buffer-overflow)
agent 2 starting on: parse_charlie (use-after-free)
agent 2 done: heap-use-after-free
agent 1 done: memcpy-param-overlap
agent 0 done: heap-buffer-overflow
Discovery returned 3 confirmed crash(es):
- heap-buffer-overflow top frame: parse_alpha /work/canary.c:14
- memcpy-param-overlap top frame: parse_bravo /work/canary.c:29
- heap-use-after-free top frame: parse_charlie /work/canary.c:46#### OUTPUT ####
Dispatching 3 discovery agents with swarm width 8
agent 0 starting on: parse_alpha (heap-buffer-overflow)
agent 1 starting on: parse_bravo (stack-buffer-overflow)
agent 2 starting on: parse_charlie (use-after-free)
agent 2 done: heap-use-after-free
agent 1 done: memcpy-param-overlap
agent 0 done: heap-buffer-overflow
Discovery returned 3 confirmed crash(es):
- heap-buffer-overflow top frame: parse_alpha /work/canary.c:14
- memcpy-param-overlap top frame: parse_bravo /work/canary.c:29
- heap-use-after-free top frame: parse_charlie /work/canary.c:46All three agents found and proved their bug, and the host confirmed every crash independently.
Notice the swarm is robust, the agents run concurrently and even if one had died, the others would still report.
We have three confirmed crashes. Now we put them through a stricter gate.
Stage 4: verification
This is the most important stage in the blog, so I want to slow right down. Verification optimizes for precision.
Discovery was allowed to be noisy, and verification's job is to throw out anything that is not solidly proven. We do this two ways, and the contrast between them is the whole lesson.
The first way is the deterministic oracle. We take the proof-of-concept and we re-detonate it three times, each time in a brand new container, and we require a majority of those runs to crash.
def verify_crash(crash: CrashArtifact, image: str, settings: Settings,
runs: int = 3) -> GraderVerdict:
crashed = 0
with sandbox(image, "verify-fresh") as c:
for i in range(runs):
det = detonate(c, crash.poc_bytes, crash.reproduction_command)
if det["crashed"]:
crashed += 1
passed = crashed >= (runs // 2 + 1) # majority of the runs must crash
return GraderVerdict(passed=passed, score=crashed / runs,
evidence=f"{crashed}/{runs} detonations crashed with {crash.crash_type}",
reproductions=crashed)def verify_crash(crash: CrashArtifact, image: str, settings: Settings,
runs: int = 3) -> GraderVerdict:
crashed = 0
with sandbox(image, "verify-fresh") as c:
for i in range(runs):
det = detonate(c, crash.poc_bytes, crash.reproduction_command)
if det["crashed"]:
crashed += 1
passed = crashed >= (runs // 2 + 1) # majority of the runs must crash
return GraderVerdict(passed=passed, score=crashed / runs,
evidence=f"{crashed}/{runs} detonations crashed with {crash.crash_type}",
reproductions=crashed)The second way is the one the article suggests as a model based check. We run an adversarial verifier, an agent whose entire job is to be skeptical and try to refute the finding.
def adversarial_verify(crash: CrashArtifact, source: str, votes: int = 3) -> dict:
tally = {"TRUE_POSITIVE": 0, "FALSE_POSITIVE": 0, "CANNOT_VERIFY": 0}
for i in range(votes):
sys = ("You are an adversarial security verifier. Assume the reported finding is a "
"FALSE POSITIVE and try to refute it. Only conclude TRUE_POSITIVE if the "
"evidence is conclusive. Be skeptical and precise.")
user = (f"Reported crash type: {crash.crash_type}\nASAN evidence:\n"
f"{asan_excerpt(crash.crash_output)}\n\nSource:\n{source}")
v = chat_json([{"role": "system", "content": sys}, {"role": "user", "content": user}],
VERIFIER_SCHEMA, temperature=0.4)
tally[v["verdict"]] += 1
return {"tally": tally, "verdict": max(tally, key=tally.get)}def adversarial_verify(crash: CrashArtifact, source: str, votes: int = 3) -> dict:
tally = {"TRUE_POSITIVE": 0, "FALSE_POSITIVE": 0, "CANNOT_VERIFY": 0}
for i in range(votes):
sys = ("You are an adversarial security verifier. Assume the reported finding is a "
"FALSE POSITIVE and try to refute it. Only conclude TRUE_POSITIVE if the "
"evidence is conclusive. Be skeptical and precise.")
user = (f"Reported crash type: {crash.crash_type}\nASAN evidence:\n"
f"{asan_excerpt(crash.crash_output)}\n\nSource:\n{source}")
v = chat_json([{"role": "system", "content": sys}, {"role": "user", "content": user}],
VERIFIER_SCHEMA, temperature=0.4)
tally[v["verdict"]] += 1
return {"tally": tally, "verdict": max(tally, key=tally.get)}Now watch what happens when we run both on the same three crashes, every one of which is genuine.
#### OUTPUT ####
heap-buffer-overflow -> passed=True (3/3 detonations crashed)
memcpy-param-overlap -> passed=True (3/3 detonations crashed)
heap-use-after-free -> passed=True (3/3 detonations crashed)
3 of 3 crash(es) verified as truly reproducible.
Adversarial vote on the first verified crash:
tally : {'TRUE_POSITIVE': 0, 'FALSE_POSITIVE': 3, 'CANNOT_VERIFY': 0}
verdict : FALSE_POSITIVE
- FALSE_POSITIVE (conf 95): The reported heap-buffer-overflow in parse_alpha is likely a false ...#### OUTPUT ####
heap-buffer-overflow -> passed=True (3/3 detonations crashed)
memcpy-param-overlap -> passed=True (3/3 detonations crashed)
heap-use-after-free -> passed=True (3/3 detonations crashed)
3 of 3 crash(es) verified as truly reproducible.
Adversarial vote on the first verified crash:
tally : {'TRUE_POSITIVE': 0, 'FALSE_POSITIVE': 3, 'CANNOT_VERIFY': 0}
verdict : FALSE_POSITIVE
- FALSE_POSITIVE (conf 95): The reported heap-buffer-overflow in parse_alpha is likely a false ...
Stop and look at this, because it is the whole point. The deterministic oracle re-detonated each proof-of-concept three times and saw three crashes every time, so it correctly passed all three findings.
But the model based adversarial verifier voted FALSE_POSITIVE three times out of three, with ninety five percent confidence, on a bug that provably crashes the program on demand.
The model was confidently and completely wrong.
Why did it go wrong? Look back at its prompt. We told it to assume the finding is a false positive and try to refute it.
A frontier model can hold that skeptical stance and still concede when the evidence is overwhelming. A 7B model, told to be skeptical, just leans all the way into skepticism and refutes everything.
This is the single most important thing I learned building this. You cannot ask a small model to be the judge. So we do not.
The deterministic re-detonation is what decides whether a finding is reproducible, and the model's vote is advisory only. The oracle overrules the model, because a reproduced crash is ground truth and an opinion is not.
This lines up with the article. Teams that added an independent verifier roughly halved their rate of non-exploitable findings, and teams that required a running proof-of-concept drove false positives close to zero.
Our re-detonation is the strongest version of that, the proof-of-concept does not just run, it runs three times in fresh containers and has to crash a majority of them.
We are not asking whether the bug seems exploitable, we are demanding that it be exploited before we believe it. With three findings verified, we can rank them.
Stage 5: triage
Triage takes the verified findings and turns them into a short, ordered list a human would actually act on. First we deduplicate by root cause, because if two agents find the same bug from two angles, we do not want to report it twice.
The article is blunt about why this matters, if you send engineers a pile of findings where most are not exploitable, they will lose trust in the whole report.
def dedup_deterministic(findings: list[Finding]) -> tuple[list[Finding], dict]:
canon: list[Finding] = []
absorbed: dict[str, list[str]] = {}
for f in findings:
match = None
for c in canon:
# same file, same crash class, and within ten lines = same root cause
if (c.file == f.file and c.category.lower() == f.category.lower()
and abs(c.line - f.line) <= 10):
match = c
break
if match:
absorbed.setdefault(match.id, []).append(f.id)
else:
canon.append(f)
return canon, absorbeddef dedup_deterministic(findings: list[Finding]) -> tuple[list[Finding], dict]:
canon: list[Finding] = []
absorbed: dict[str, list[str]] = {}
for f in findings:
match = None
for c in canon:
# same file, same crash class, and within ten lines = same root cause
if (c.file == f.file and c.category.lower() == f.category.lower()
and abs(c.line - f.line) <= 10):
match = c
break
if match:
absorbed.setdefault(match.id, []).append(f.id)
else:
canon.append(f)
return canon, absorbedThen we assign a severity. This is where the model does have a role, but we constrain it hard with a rubric, and we explicitly tell it not to inflate.
SEVERITY_SYS = (
"You assign severity to a CONFIRMED memory-safety finding. Assume it is a true positive. Answer "
"each rubric field from the evidence before naming a severity. Guidance: zero "
"preconditions with unauthenticated remote access is HIGH or CRITICAL; one or two "
"preconditions or an authenticated path is MEDIUM; three or more preconditions or "
"local-only is LOW. Do not inflate."
)SEVERITY_SYS = (
"You assign severity to a CONFIRMED memory-safety finding. Assume it is a true positive. Answer "
"each rubric field from the evidence before naming a severity. Guidance: zero "
"preconditions with unauthenticated remote access is HIGH or CRITICAL; one or two "
"preconditions or an authenticated path is MEDIUM; three or more preconditions or "
"local-only is LOW. Do not inflate."
)The trick that keeps a small model from blurting out CRITICAL is the schema. We force it to fill in reachability, preconditions, and access level first, and only then to name a severity.
Answering the rubric fields before the verdict is what makes the verdict defensible.
SEVERITY_SCHEMA = {
"type": "object",
"properties": {
"reachability": {"type": "string"},
"preconditions": {"type": "array", "items": {"type": "string"}},
"access_level": {"type": "string",
"enum": ["unauthenticated_remote", "authenticated", "local", "physical"]},
"severity": {"type": "string", "enum": ["CRITICAL", "HIGH", "MEDIUM", "LOW"]},
"rationale": {"type": "string"},
},
"required": ["reachability", "preconditions", "access_level", "severity", "rationale"],
}
def severity_rubric(finding: Finding, tm_context: str, settings: Settings) -> dict:
user = (f"FINDING: {finding.category} at {finding.file}:{finding.line}\n"
f"EVIDENCE:\n{asan_excerpt(finding.crash.crash_output)}\n\n"
f"THREAT MODEL CONTEXT:\n{tm_context}\n\nDerive the severity.")
return chat_json([{"role": "system", "content": SEVERITY_SYS},
{"role": "user", "content": user}], SEVERITY_SCHEMA, temperature=0.0)SEVERITY_SCHEMA = {
"type": "object",
"properties": {
"reachability": {"type": "string"},
"preconditions": {"type": "array", "items": {"type": "string"}},
"access_level": {"type": "string",
"enum": ["unauthenticated_remote", "authenticated", "local", "physical"]},
"severity": {"type": "string", "enum": ["CRITICAL", "HIGH", "MEDIUM", "LOW"]},
"rationale": {"type": "string"},
},
"required": ["reachability", "preconditions", "access_level", "severity", "rationale"],
}
def severity_rubric(finding: Finding, tm_context: str, settings: Settings) -> dict:
user = (f"FINDING: {finding.category} at {finding.file}:{finding.line}\n"
f"EVIDENCE:\n{asan_excerpt(finding.crash.crash_output)}\n\n"
f"THREAT MODEL CONTEXT:\n{tm_context}\n\nDerive the severity.")
return chat_json([{"role": "system", "content": SEVERITY_SYS},
{"role": "user", "content": user}], SEVERITY_SCHEMA, temperature=0.0)Notice we pass tm_context, the threat model from Stage 1, straight into the severity call. That is the whole reason we built the threat model first.
The severity decision is the one place the model needs to know how the program is deployed, and the threat model is where that context lives. Here is the triage output for the canary.
#### OUTPUT ####
Deterministic dedup: 3 -> 3 canonical finding(s).
F-001 heap-buffer-overflow -> LOW access=unauthenticated_remote preconds=1
F-002 memcpy-param-overlap -> LOW access=unauthenticated_remote preconds=1
F-003 heap-use-after-free -> LOW access=unauthenticated_remote preconds=1
{
"input_count": 3,
"true_positives": 3,
"by_severity": {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 3}
}
Ranked findings:
LOW F-001 heap-buffer-overflow at /work/canary.c:14
LOW F-002 memcpy-param-overlap at /work/canary.c:29
LOW F-003 heap-use-after-free at /work/canary.c:46
Owner routing:
F-001: module canary.c (no CODEOWNERS or git history, assign to the file's maintainer)#### OUTPUT ####
Deterministic dedup: 3 -> 3 canonical finding(s).
F-001 heap-buffer-overflow -> LOW access=unauthenticated_remote preconds=1
F-002 memcpy-param-overlap -> LOW access=unauthenticated_remote preconds=1
F-003 heap-use-after-free -> LOW access=unauthenticated_remote preconds=1
{
"input_count": 3,
"true_positives": 3,
"by_severity": {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 3}
}
Ranked findings:
LOW F-001 heap-buffer-overflow at /work/canary.c:14
LOW F-002 memcpy-param-overlap at /work/canary.c:29
LOW F-003 heap-use-after-free at /work/canary.c:46
Owner routing:
F-001: module canary.c (no CODEOWNERS or git history, assign to the file's maintainer)
Three confirmed findings, all rated LOW. That rating is correct for this target, because these bugs need local file access to trigger, and local-only is LOW by the rubric.
But notice the small mistake, the model labeled the access as unauthenticated_remote while still landing on LOW. This is the exact thing the article warns about. The model has good context of the code, but it does not have good context of us, of how this program is actually deployed and reached.
Severity is the one judgment that needs human context, which is why triage produces a ranked list for a person to confirm, not a final verdict. The findings are ranked, so now we fix them.
Stage 6: patching
Patching is the last stage, and it is where the deterministic oracle pays off the most. The patch agent proposes a complete fixed version of the file, and then we put that fix through a ladder of executable checks.
We load the agent's prompt with the specific mistakes a small model tends to make.
PATCH_SYS = (
"You are fixing ONE memory-safety vulnerability in a C program. Return the COMPLETE "
"corrected source. Fix the ROOT CAUSE, change as little else as possible.\n"
"- For a buffer overflow: clamp the copy length to the destination size BEFORE the copy.\n"
"- For a use-after-free: do NOT free while later code still uses the object.\n"
"- NEVER set a pointer to NULL and then dereference it later. That is not a fix.\n"
"- After writing your fix, re-read it and confirm the crash input can no longer reach "
"out-of-bounds or freed memory, and that no NEW crash was introduced."
)PATCH_SYS = (
"You are fixing ONE memory-safety vulnerability in a C program. Return the COMPLETE "
"corrected source. Fix the ROOT CAUSE, change as little else as possible.\n"
"- For a buffer overflow: clamp the copy length to the destination size BEFORE the copy.\n"
"- For a use-after-free: do NOT free while later code still uses the object.\n"
"- NEVER set a pointer to NULL and then dereference it later. That is not a fix.\n"
"- After writing your fix, re-read it and confirm the crash input can no longer reach "
"out-of-bounds or freed memory, and that no NEW crash was introduced."
)Notice the instruction to change as little else as possible. The article makes the point that minimal patches are easier to review and less likely to introduce a new bug, and that matters even more with a small model, because a model that starts rewriting unrelated code is a model that is about to break something.
We want the smallest change that kills the root cause, nothing more. The independent reviewer at the end checks for exactly this, a patch that wanders out of scope gets marked down even if it stops the crash.
The ladder is the gate. It is the same idea as discovery, we do not trust the patch, we test it. Here is the core of it.
def grade_patch(crash: CrashArtifact, fixed_source: str, settings: Settings) -> PatchVerdict:
v = PatchVerdict()
v.t0_builds = build_patched_image(fixed_source) # T0: does it even compile?
if not v.t0_builds:
return v
with sandbox(PATCHED_IMAGE, "patch-t1") as c: # T1: the PoC must NOT crash now
det = detonate(c, crash.poc_bytes, crash.reproduction_command)
v.t1_poc_stops = not det["crashed"]
if v.t1_poc_stops: # T2: benign inputs must still work
ok = True
with sandbox(PATCHED_IMAGE, "patch-t2") as c:
for label, inp in BENIGN_INPUTS.items():
d = detonate(c, inp, crash.reproduction_command)
if d["crashed"] or d["exit_code"] != 0:
ok = False
break
v.t2_tests_pass = ok
if v.t1_poc_stops and v.t2_tests_pass: # Re-attack: hunt for a variant
variant = discovery_agent_sync(crash.focus_area, PATCHED_IMAGE, idx=99, settings=settings)
v.re_attack_clean = variant is None
return vdef grade_patch(crash: CrashArtifact, fixed_source: str, settings: Settings) -> PatchVerdict:
v = PatchVerdict()
v.t0_builds = build_patched_image(fixed_source) # T0: does it even compile?
if not v.t0_builds:
return v
with sandbox(PATCHED_IMAGE, "patch-t1") as c: # T1: the PoC must NOT crash now
det = detonate(c, crash.poc_bytes, crash.reproduction_command)
v.t1_poc_stops = not det["crashed"]
if v.t1_poc_stops: # T2: benign inputs must still work
ok = True
with sandbox(PATCHED_IMAGE, "patch-t2") as c:
for label, inp in BENIGN_INPUTS.items():
d = detonate(c, inp, crash.reproduction_command)
if d["crashed"] or d["exit_code"] != 0:
ok = False
break
v.t2_tests_pass = ok
if v.t1_poc_stops and v.t2_tests_pass: # Re-attack: hunt for a variant
variant = discovery_agent_sync(crash.focus_area, PATCHED_IMAGE, idx=99, settings=settings)
v.re_attack_clean = variant is None
return v
Read the ladder from the top. T0 confirms the patch compiles. T1 re-detonates the original proof-of-concept and demands that it no longer crashes. T2 runs a set of benign inputs and demands they all still work, so the fix did not break normal behavior.
And the re-attack step sends a fresh discovery agent at the patched binary to hunt for a variant of the same bug.
If any gating step fails, we re-detonate, feed the new crash trace back to the patch agent, and let it try again, up to five times. After the ladder, an independent reviewer looks at the diff alone, with no idea what the scanner said.
def independent_reviewer(diff_text: str, category: str, settings: Settings) -> dict:
sys = ("You are a maintainer reviewing a security patch. You have NOT seen the scanner "
"description. Judge only from the diff: is it in scope, does it fix the root cause "
"(not just the symptom), does it add new attack surface, would you merge it? "
"Score style 0-10. ACCEPT needs a root-cause fix, no new surface, and style >= 5.")
return chat_json([{"role": "system", "content": sys},
{"role": "user", "content": f"CATEGORY: {category}\nDIFF:\n{diff_text}"}],
REVIEW_SCHEMA, temperature=0.0)def independent_reviewer(diff_text: str, category: str, settings: Settings) -> dict:
sys = ("You are a maintainer reviewing a security patch. You have NOT seen the scanner "
"description. Judge only from the diff: is it in scope, does it fix the root cause "
"(not just the symptom), does it add new attack surface, would you merge it? "
"Score style 0-10. ACCEPT needs a root-cause fix, no new surface, and style >= 5.")
return chat_json([{"role": "system", "content": sys},
{"role": "user", "content": f"CATEGORY: {category}\nDIFF:\n{diff_text}"}],
REVIEW_SCHEMA, temperature=0.0)Here is the patch for the first finding, the heap overflow in parse_alpha, and how it scored.
#### OUTPUT ####
Root cause : heap-buffer-overflow
Ladder : t0_builds=True t1_poc_stops=True t2_tests_pass=True re_attack_clean=True passed=True
Review : ACCEPT style_score=6
reason: "The patch clamps the 'claimed' value to 8, which prevents an out-of-bounds
write. It does not introduce new attack surface and addresses the root cause."
Unified diff:
static void parse_alpha(const unsigned char *data, size_t n) {
if (n < 1) return;
size_t claimed = data[0];
+ if (claimed > 8) claimed = 8; // Clamp claimed to the buffer size
char *buf = (char *)malloc(8);
memcpy(buf, data, claimed);#### OUTPUT ####
Root cause : heap-buffer-overflow
Ladder : t0_builds=True t1_poc_stops=True t2_tests_pass=True re_attack_clean=True passed=True
Review : ACCEPT style_score=6
reason: "The patch clamps the 'claimed' value to 8, which prevents an out-of-bounds
write. It does not introduce new attack surface and addresses the root cause."
Unified diff:
static void parse_alpha(const unsigned char *data, size_t n) {
if (n < 1) return;
size_t claimed = data[0];
+ if (claimed > 8) claimed = 8; // Clamp claimed to the buffer size
char *buf = (char *)malloc(8);
memcpy(buf, data, claimed);The model found the correct fix, clamp the copy length to the size of the buffer before the copy, and the reviewer accepted it for fixing the root cause without adding any new surface.
The ladder confirmed it builds, the proof-of-concept no longer crashes, benign inputs still work, and a fresh attack found nothing. Not every patch landed on the first try, though.
#### OUTPUT ####
Patching F-002 (memcpy-param-overlap) ...
iteration 1: T0=True T1=False -> passed=False
iteration 2: T0=True T1=True T2=True re-attack=True -> passed=True
Patching F-003 (heap-use-after-free) ...
iteration 1: T0=True T1=False -> passed=False
iteration 2: T0=True T1=False -> passed=False
iteration 3: T0=True T1=True T2=True re-attack=True -> passed=True#### OUTPUT ####
Patching F-002 (memcpy-param-overlap) ...
iteration 1: T0=True T1=False -> passed=False
iteration 2: T0=True T1=True T2=True re-attack=True -> passed=True
Patching F-003 (heap-use-after-free) ...
iteration 1: T0=True T1=False -> passed=False
iteration 2: T0=True T1=False -> passed=False
iteration 3: T0=True T1=True T2=True re-attack=True -> passed=TrueThis is the feedback loop working. The second finding took two tries, and the third took three.
On each failed try, the original proof-of-concept still crashed the patched binary, so T1 failed, we fed that fresh crash trace back to the model, and it tried again.
The independent reviewer accepted all three patches, with style scores of six, eight, and seven. Here are the two harder fixes, so you can see what the model actually settled on.
#### OUTPUT ####
F-002 memcpy-param-overlap (accepted, style_score=8):
char name[16];
- memcpy(name, data, n); /* OOB write whenever n > 16 */
+ if (n <= sizeof(name)) {
+ memcpy(name, data, n);
+ } else {
+ memset(name, 0, sizeof(name));
+ }
name[15] = 0;
F-003 heap-use-after-free (accepted, style_score=7):
if (r->id == 0xff) {
free(r);
+ r = NULL; /* set the pointer to NULL after freeing */
}
- r->value = (n > 1) ? data[1] : 0;
- printf("charlie id=%u value=%u\n", r->id, r->value);
+ if (r != NULL) {
+ r->value = (n > 1) ? data[1] : 0;
+ printf("charlie id=%u value=%u\n", r->id, r->value);
+ }#### OUTPUT ####
F-002 memcpy-param-overlap (accepted, style_score=8):
char name[16];
- memcpy(name, data, n); /* OOB write whenever n > 16 */
+ if (n <= sizeof(name)) {
+ memcpy(name, data, n);
+ } else {
+ memset(name, 0, sizeof(name));
+ }
name[15] = 0;
F-003 heap-use-after-free (accepted, style_score=7):
if (r->id == 0xff) {
free(r);
+ r = NULL; /* set the pointer to NULL after freeing */
}
- r->value = (n > 1) ? data[1] : 0;
- printf("charlie id=%u value=%u\n", r->id, r->value);
+ if (r != NULL) {
+ r->value = (n > 1) ? data[1] : 0;
+ printf("charlie id=%u value=%u\n", r->id, r->value);
+ }Look at that use-after-free fix next to the patch prompt. We told the model never to set a pointer to NULL and then dereference it, and the fix it landed on does set the pointer to NULL, but then it guards every later use with a NULL check, so nothing is dereferenced after the free.
That is the correct shape of the fix, and the ladder confirmed it, the proof-of-concept stopped crashing and a fresh attack found no variant.
The second fix is just as clean, it checks the length against the buffer size before the copy and zeroes the buffer when the input is too long. Now we can run the whole thing as one piece.
The whole loop on the canary
We have built all six stages, so let us package the middle of the loop into one function. Discovery, verification, and triage always run together in the same order, so we wrap them up once and reuse them for any file-input target.
async def run_discovery_verify_triage(image, source_path, recon_sys, prompt_builder,
repro_cmd, tm_context, settings, target_name="target") -> dict:
print(f"[{target_name}] recon ...")
with sandbox(image, f"{target_name}-recon") as c:
focus = run_recon(c, settings, source_path=source_path, recon_sys=recon_sys)
focus_strings = [f"{a['name']}: {a['hint']}" for a in focus]
print(f"[{target_name}] {len(focus_strings)} focus area(s); dispatching swarm ...")
crashes = await run_swarm(focus_strings, image, settings, prompt_builder=prompt_builder)
print(f"[{target_name}] verifying {len(crashes)} candidate crash(es) ...")
verified = [c for c in crashes if verify_crash(c, image, settings).passed] # the oracle gate
findings = findings_from_crashes(verified)
deduped, _ = dedup_deterministic(findings) # triage: dedup
for f in deduped:
f.severity = severity_rubric(f, tm_context, settings)["severity"] # triage: severity
deduped.sort(key=lambda f: SEV_RANK.get(f.severity, 4)) # triage: rank
return {"focus": focus, "crashes": crashes, "verified": verified, "findings": deduped}async def run_discovery_verify_triage(image, source_path, recon_sys, prompt_builder,
repro_cmd, tm_context, settings, target_name="target") -> dict:
print(f"[{target_name}] recon ...")
with sandbox(image, f"{target_name}-recon") as c:
focus = run_recon(c, settings, source_path=source_path, recon_sys=recon_sys)
focus_strings = [f"{a['name']}: {a['hint']}" for a in focus]
print(f"[{target_name}] {len(focus_strings)} focus area(s); dispatching swarm ...")
crashes = await run_swarm(focus_strings, image, settings, prompt_builder=prompt_builder)
print(f"[{target_name}] verifying {len(crashes)} candidate crash(es) ...")
verified = [c for c in crashes if verify_crash(c, image, settings).passed] # the oracle gate
findings = findings_from_crashes(verified)
deduped, _ = dedup_deterministic(findings) # triage: dedup
for f in deduped:
f.severity = severity_rubric(f, tm_context, settings)["severity"] # triage: severity
deduped.sort(key=lambda f: SEV_RANK.get(f.severity, 4)) # triage: rank
return {"focus": focus, "crashes": crashes, "verified": verified, "findings": deduped}Read the order of operations, because it is the defender's loop in one function. We recon, we fan out the swarm for recall, we keep only the crashes the oracle re-detonates for precision, and then we dedup, score, and rank.
The exact same function runs on the canary and, later, on actual code, because nothing in it is specific to a particular target. Now let us run the whole thing on the canary and read the scoreboard.
#### OUTPUT ####
CANARY SCOREBOARD
============================================================
focus areas from recon : 3
crashes discovered (swarm) : 3
crashes verified : 3
distinct signatures : 3
triaged findings : 3
patches passing the ladder : 3 / 3
============================================================
Acceptance checks:
>=3 distinct crash types verified: True (['heap-buffer-overflow', 'heap-use-after-free', 'memcpy-param-overlap'])
every verified crash reproduces : True
at least one patch passed : True#### OUTPUT ####
CANARY SCOREBOARD
============================================================
focus areas from recon : 3
crashes discovered (swarm) : 3
crashes verified : 3
distinct signatures : 3
triaged findings : 3
patches passing the ladder : 3 / 3
============================================================
Acceptance checks:
>=3 distinct crash types verified: True (['heap-buffer-overflow', 'heap-use-after-free', 'memcpy-param-overlap'])
every verified crash reproduces : True
at least one patch passed : TrueThis is the line we opened the blog with, and now every number in it should make sense. Three focus areas from recon, three crashes discovered by the swarm, three verified by re-detonation, three triaged, and three of three patches passing the full ladder.
Every acceptance check is True. A 7B model, running on one GPU, drove the entire defender's loop on this target with zero error cells.
The reason it worked is that the model never had to be the judge. It generated candidates and patches, and a deterministic sanitizer decided what held up.
That works on a toy. The harder question is what happens on actual code.
Pointing it at production code
The whole point of the design is that the pipeline does not care what the target is, as long as it reads input and we can build it with a sanitizer.
So we point the exact same loop at an open source C library, cJSON, pinned to an older commit. Recon finds four focus areas this time instead of three.
#### OUTPUT ####
[cjson] 4 focus area(s); dispatching swarm ...
agent 0 starting on: Deeply Nested Arrays/Objects: deep nesting to test for stack overflow
agent 1 starting on: Long Strings: a very long string to test length handling
agent 2 starting on: Number Parsing: a very large or small number
agent 3 starting on: Unicode Escape Handling: unicode escape sequences
agent 1 done: no crash
agent 2 done: no crash
agent 3 done: no crash
agent 0 done: no crash
[cjson] verifying 0 candidate crash(es) ...#### OUTPUT ####
[cjson] 4 focus area(s); dispatching swarm ...
agent 0 starting on: Deeply Nested Arrays/Objects: deep nesting to test for stack overflow
agent 1 starting on: Long Strings: a very long string to test length handling
agent 2 starting on: Number Parsing: a very large or small number
agent 3 starting on: Unicode Escape Handling: unicode escape sequences
agent 1 done: no crash
agent 2 done: no crash
agent 3 done: no crash
agent 0 done: no crash
[cjson] verifying 0 candidate crash(es) ...Four agents, four sensible attack ideas, and zero crashes.
None of the proof-of-concept files the agents wrote actually made the library crash, so the host confirmed nothing, and there is nothing to verify. Here is how the pipeline reports that.
#### OUTPUT ####
cJSON RESULT
============================================================
focus areas : 4
candidate crashes: 0
verified crashes : 0
No crash was proven this run. That is an expected outcome for a 7B model on production code.
We keep recall high: the focus areas below are reported as UNPROVEN candidates for a
human or a stronger model to investigate, rather than dropped.
- UNPROVEN: Deeply Nested Arrays/Objects
- UNPROVEN: Long Strings
- UNPROVEN: Number Parsing
- UNPROVEN: Unicode Escape Handling#### OUTPUT ####
cJSON RESULT
============================================================
focus areas : 4
candidate crashes: 0
verified crashes : 0
No crash was proven this run. That is an expected outcome for a 7B model on production code.
We keep recall high: the focus areas below are reported as UNPROVEN candidates for a
human or a stronger model to investigate, rather than dropped.
- UNPROVEN: Deeply Nested Arrays/Objects
- UNPROVEN: Long Strings
- UNPROVEN: Number Parsing
- UNPROVEN: Unicode Escape Handling
I want to be plain about this result. The 7B model did not prove a bug in production code this run. And the pipeline does the right thing with that, it does not invent a finding it cannot back up, and it does not silently drop the leads either.
It reports the four focus areas as unproven candidates, so a human or a stronger model can pick them up. This is exactly the recall versus precision split the article describes.
Generating attack ideas is cheap, and a small model can do it all day. Turning an idea into a proven, reproduced crash is the hard part, and that is where the effort and the bigger models earn their keep.
Think about what those four focus areas are, deeply nested JSON, very long strings, extreme numbers, and tricky unicode escapes. They are sound attack ideas, the same places a human auditor would look first.
The gap is not knowing where to look, it is the fiddly work of crafting an input that drives the parser into a genuine memory error and proving it under the sanitizer. That last mile is where a 7B model runs out of road, and it is exactly the part a frontier model is better at.
The good news is the pipeline does not change at all. You swap in a stronger discovery model, and everything downstream, the host re-detonation, the triage, the patch ladder, stays as it is.
What made a 7B model usable, and where this
We built the full six step loop and ran it with a small open model on a single GPU. So what made a 7B model usable?
Three things. We constrained it to a schema with structured output, we met it where it is strong with helpers like write_poc, and above all we never let it be the judge.
The model does the creative work, reading code, proposing an input, writing a fix, and a deterministic oracle decides every yes-or-no question of truth. That is why a model that voted three confirmed bugs as false positives could still drive a correct pipeline.
Let me be clear about scope. On the canary the loop is fully proven, three bugs found, verified, and patched.
On production code the small model came back empty, which is the precision frontier. The cost is easy too, the model is open weights on our own H100 with no per token charges, and the expensive judging is done for free by AddressSanitizer.
The obvious next steps do not change the spine, swap in a stronger discovery model where the precision ceiling is, feed richer threat models, point it at more targets, and harden the sandbox with gVisor or microVMs.
The article that started this put it well. It is getting easier for models to find and exploit vulnerabilities, so our job as defenders is to find and fix the bugs in our own code first, before someone else does.
We are not making the model a security expert, we are building a system where a modest model plus a deterministic oracle can do serious defensive work, and every claim it makes can be reproduced.
Wanna chat about security agents or anything else? Reach me on my LinkedIn.