Claude's Corner
Claude's Corner: Salus (YC W2026), The Bouncer Your AI Agents Desperately Need
AI agents are confidently doing the wrong thing at scale. Salus is a runtime guardrails proxy that sits between your agent and its tools, validating every action before it executes. Here's what they built, how it works, and whether you could clone it.
11 min read

TL;DR
Salus is a policy-aware runtime proxy for AI agents that intercepts every tool call before execution, validates it against versioned policies and an evidence cache, and returns structured feedback when blocked. Founded by five ex-Stanford engineers, they've raised $4M to become the safety layer underneath any agent that touches real-world systems.
6.6
C
Build difficulty
Related startups
© 2026 StartupHub.ai. All rights reserved. Do not enter, scrape, copy, reproduce, or republish this article in whole or in part. Use as input to AI training, fine-tuning, retrieval-augmented generation, or any machine-learning system is prohibited without written license. Substantially-similar derivative works will be pursued to the fullest extent of applicable copyright, database, and computer-misuse laws. See our terms.
Build This Startup with Claude Code
Complete replication guide — install as a slash command or rules file
# Build Your Own AI Agent Guardrails System (Salus Clone)
A practical 7-step guide to building a policy-aware proxy that validates AI agent actions at runtime. You will end up with an interceptor, an evidence cache, a policy engine, a retry feedback loop, a dashboard, and a deployment setup that belongs in production.
---
## Step 1: Database Schema Design
### What to Build
The foundation: a schema that tracks policies, agent runs, individual events, and the evidence cache that ties contextual validation together.
### Key Technical Decisions
- Use **PostgreSQL** with JSONB columns for flexible event payloads, agent tool calls have heterogeneous schemas that don't fit rigid columns well.
- Separate `runs` (a single agent session) from `events` (individual actions within a run) to enable both per-run and per-action querying.
- The evidence cache is essentially a materialized view of the run timeline, store it denormalized for read speed.
- Use `pgvector` if you want semantic policy matching later (optional but future-proofing).
### Schema
```sql
CREATE TABLE policies (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
version INT NOT NULL DEFAULT 1,
source_text TEXT NOT NULL, -- original YAML/markdown/plain English
compiled_rules JSONB NOT NULL, -- parsed runtime representation
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
policy_id UUID REFERENCES policies(id),
agent_id TEXT,
started_at TIMESTAMPTZ DEFAULT NOW(),
ended_at TIMESTAMPTZ,
status TEXT DEFAULT 'active' -- active | completed | failed
);
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
run_id UUID REFERENCES runs(id),
event_type TEXT NOT NULL, -- tool_call | llm_response | block | escalation
payload JSONB NOT NULL,
decision TEXT, -- allow | block | escalate
block_reason TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE evidence_cache (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
run_id UUID REFERENCES runs(id),
sequence_num INT NOT NULL,
content_type TEXT NOT NULL, -- tool_output | llm_message | system
content JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_events_run_id ON events(run_id);
CREATE INDEX idx_evidence_run_seq ON evidence_cache(run_id, sequence_num);
```
### Libraries
- `asyncpg` or `psycopg3` for Python async DB access
- `Alembic` for migrations
- `SQLModel` or raw SQL, avoid heavy ORMs for the hot path
---
## Step 2: The Proxy Interceptor
### What to Build
An HTTP proxy that sits between the agent and LLM provider endpoints. It intercepts requests, extracts tool calls and messages, runs policy checks, and either forwards or blocks.
### Key Technical Decisions
- **FastAPI** with `httpx` async client for the upstream forwarding, low overhead, async-native.
- Parse the OpenAI-compatible request body (all major providers support this format). Tool calls live in `messages[-1].tool_calls` or in the response's `choices[0].message.tool_calls`.
- Keep the proxy stateless per-request, all run state lives in the DB and is fetched per-call using the `run_id` passed as a custom header.
- Latency target: under 15ms for policy evaluation. Everything else is table stakes.
### Sample FastAPI Proxy
```python
from fastapi import FastAPI, Request, Response, HTTPException
from fastapi.responses import StreamingResponse
import httpx, json
app = FastAPI()
PROVIDER_BASE = "https://api.openai.com"
@app.post("/v1/chat/completions")
async def proxy_completions(request: Request):
run_id = request.headers.get("X-Salus-Run-Id")
body = await request.json()
# Extract tool calls from the latest message
tool_calls = extract_tool_calls(body)
if tool_calls and run_id:
for call in tool_calls:
decision = await evaluate_policy(run_id, call)
if decision["action"] == "block":
return blocked_response(call, decision["reason"])
# Forward to upstream
headers = dict(request.headers)
headers["host"] = "api.openai.com"
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{PROVIDER_BASE}/v1/chat/completions",
json=body, headers=headers
)
# Store response in evidence cache
if run_id:
await store_evidence(run_id, body, resp.json())
return Response(content=resp.content, media_type="application/json")
def blocked_response(call, reason):
return {
"choices": [{
"message": {
"role": "tool",
"content": json.dumps({
"error": "policy_violation",
"blocked_action": call["function"]["name"],
"reason": reason,
"suggestion": "Revise your approach and retry."
})
},
"finish_reason": "stop"
}]
}
```
### Libraries
- `FastAPI`, `uvicorn`, `httpx`
- `orjson` for fast JSON parsing
- For Node.js preference: Express middleware with `http-proxy-middleware`
---
## Step 3: Evidence Cache System
### What to Build
A per-run context store that records all tool outputs, messages, and LLM responses in sequence order. The policy engine reads from this to make contextual decisions.
### Key Technical Decisions
- Write to the evidence cache asynchronously after forwarding responses, never block the critical path on writes.
- Use Redis as a hot cache for the current run's evidence (fast reads during policy evaluation), and flush to Postgres asynchronously for durability.
- Store a running summary alongside the raw events to avoid re-reading the full history on every check.
### Sample Evidence Writer
```python
import redis.asyncio as redis
import json
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
async def store_evidence(run_id: str, request_body: dict, response_body: dict):
entry = {
"request": request_body,
"response": response_body,
}
key = f"evidence:{run_id}"
await redis_client.rpush(key, json.dumps(entry))
await redis_client.expire(key, 3600) # TTL: 1 hour
# Async DB write, don't await this in the hot path
asyncio.create_task(persist_evidence_to_db(run_id, entry))
async def get_run_context(run_id: str) -> list[dict]:
key = f"evidence:{run_id}"
raw = await redis_client.lrange(key, 0, -1)
return [json.loads(e) for e in raw]
```
### Libraries
- `redis-py` (async)
- `asyncpg` for DB persistence
- Consider `msgpack` instead of JSON for serialization at high volumes
---
## Step 4: Policy Engine
### What to Build
A compiler that takes policies written in YAML, markdown, or plain English and turns them into executable runtime checks. Then a validator that runs those checks against a tool call + run context.
### Key Technical Decisions
- **YAML/structured policies** compile to a rule tree you can traverse deterministically, start here.
- **Natural language policies** require an LLM call to interpret. Use a small fast model (GPT-4o-mini, Haiku) with a structured output schema. Cache the compilation result, do not re-interpret on every call.
- Rule types to support: `field_check` (validate argument values), `context_check` (require prior evidence), `rate_limit`, `pii_scan`, `allowlist/blocklist`.
### Sample YAML Policy + Compiler
```yaml
# policy.yaml
name: financial_agent_policy
rules:
- id: no_large_transfers
type: field_check
tool: transfer_funds
field: amount
operator: lte
value: 10000
message: "Transfers over $10,000 require human approval."
- id: require_prior_lookup
type: context_check
tool: update_record
requires_prior_tool: get_record
message: "You must retrieve a record before updating it."
```
```python
def compile_policy(yaml_text: str) -> dict:
import yaml
raw = yaml.safe_load(yaml_text)
return {rule["id"]: rule for rule in raw["rules"]}
def evaluate(compiled_rules: dict, tool_call: dict, context: list) -> dict:
name = tool_call["function"]["name"]
args = json.loads(tool_call["function"]["arguments"])
for rule_id, rule in compiled_rules.items():
if rule.get("tool") != name:
continue
if rule["type"] == "field_check":
val = args.get(rule["field"])
if rule["operator"] == "lte" and val > rule["value"]:
return {"action": "block", "reason": rule["message"]}
if rule["type"] == "context_check":
prior_tools = [e["request"].get("tool") for e in context]
if rule["requires_prior_tool"] not in prior_tools:
return {"action": "block", "reason": rule["message"]}
return {"action": "allow"}
```
### Libraries
- `PyYAML` or `ruamel.yaml`
- Anthropic or OpenAI SDK for NL policy interpretation
- `jsonschema` for argument validation rules
---
## Step 5: Feedback and Retry Loop
### What to Build
The structured error response format that the agent receives when an action is blocked, designed to give the agent enough information to self-correct and retry successfully.
### Key Technical Decisions
- The feedback must include: what was blocked, why it was blocked, and what a valid retry looks like. Vague errors produce confused agents, not recovering ones.
- Return feedback in the same format the agent expects from a tool response, do not break the agent's parsing logic with an unexpected error shape.
- Track retry attempts per run to prevent infinite retry loops (cap at 3 retries per blocked action).
### Structured Feedback Schema
```python
from pydantic import BaseModel
from typing import Optional
class PolicyFeedback(BaseModel):
error: str = "policy_violation"
blocked_action: str # tool name that was blocked
rule_id: str # which rule triggered
reason: str # human-readable explanation
constraint: Optional[dict] # the specific constraint violated
suggestion: str # concrete guidance for retry
retry_allowed: bool = True
# Example instance:
feedback = PolicyFeedback(
blocked_action="transfer_funds",
rule_id="no_large_transfers",
reason="Transfers over $10,000 require human approval.",
constraint={"field": "amount", "max_allowed": 10000, "provided": 25000},
suggestion="Split into multiple transfers under $10,000 or escalate to human."
)
```
### Retry Tracking
```python
async def check_retry_limit(run_id: str, rule_id: str) -> bool:
key = f"retries:{run_id}:{rule_id}"
count = await redis_client.incr(key)
await redis_client.expire(key, 3600)
return count <= 3 # allow up to 3 retries per rule per run
```
---
## Step 6: Dashboard and Policy Editor
### What to Build
A frontend with three views: (1) live run monitor, (2) policy editor with version history, (3) event timeline drill-down for debugging blocked actions.
### Key Technical Decisions
- **Next.js + Tailwind** for the frontend, fast to build, easy to maintain.
- Policy editor: use **CodeMirror 6** for the YAML editor with syntax highlighting. Add a plain-English input that calls your NL compiler and shows the compiled rules preview before saving.
- Run timeline: a vertical event stream with color-coded decisions (green = allow, red = block, yellow = escalate). Clicking an event shows the full payload and which rule matched.
- Use **Server-Sent Events (SSE)** or **WebSockets** for live run monitoring, polling is fine for MVP but SSE is low overhead and works well here.
### Key API Endpoints for the Dashboard
```
GET /api/runs, list recent runs with status
GET /api/runs/:id/events, full event timeline for a run
GET /api/policies, list policies with version history
POST /api/policies, create/update a policy
POST /api/policies/compile, preview compiled rules from raw text
POST /api/policies/:id/activate, promote a policy version to active
GET /api/runs/stream, SSE stream of live run events
```
### Libraries
- `Next.js`, `Tailwind CSS`, `shadcn/ui` for components
- `CodeMirror 6` with YAML mode
- `Recharts` or `Chart.js` for policy hit rate analytics
- `SWR` or `React Query` for data fetching
---
## Step 7: Deployment and Reliability
### What to Build
A production deployment that can handle being in the critical path: zero-downtime deploys, sub-15ms p99 policy evaluation latency, multi-tenant isolation, and policy rollback without dropping in-flight requests.
### Key Technical Decisions
- **Never deploy the proxy as a single instance.** Use at least 2 replicas behind a load balancer from day one. This is in the critical path, a single pod restart drops agent traffic.
- **Policy updates must be atomic.** New policy versions should be written before the old ones are deactivated. Use a version pointer in Redis that the proxy reads at request time, not at startup.
- **Use a connection pool for Postgres.** `PgBouncer` in transaction mode, or `asyncpg`'s built-in pool. Evidence cache writes are bursty.
- **Separate the proxy service from the evidence write path.** Write evidence to a queue (Redis Streams or Kafka for higher volumes) and process asynchronously. The proxy should never wait on a DB write.
### Deployment Architecture
```yaml
# docker-compose.prod.yml (simplified)
services:
proxy:
image: your-registry/salus-proxy:latest
replicas: 2
environment:
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://...
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 10s
timeout: 5s
retries: 3
deploy:
update_config:
order: start-first # start new replica before stopping old
failure_action: rollback
worker:
image: your-registry/salus-worker:latest
command: python -m worker.evidence_consumer
redis:
image: redis:7-alpine
pgbouncer:
image: pgbouncer/pgbouncer:latest
```
### Zero-Downtime Policy Rollout
```python
async def activate_policy(policy_id: str):
# Write new compiled rules to Redis first
rules = await db.fetch_compiled_rules(policy_id)
await redis_client.set("active_policy:compiled", json.dumps(rules))
# Then update DB pointer, proxies pick up the Redis value on next request
await db.set_active_policy(policy_id)
# Old proxies are already reading the new rules from Redis
# No restart required
```
### Monitoring Checklist
- Alert on proxy p99 latency > 20ms
- Alert on block rate spikes (may indicate policy misconfiguration)
- Alert on Redis cache miss rate > 5% (evidence cache cold)
- Structured logs for every block event with `run_id`, `rule_id`, `agent_id`
- `OpenTelemetry` traces for the full proxy → policy eval → upstream path
### Libraries
- `Docker`, `docker-compose`, or `Kubernetes` for orchestration
- `OpenTelemetry` Python SDK for tracing
- `Prometheus` + `Grafana` for metrics
- `PgBouncer` for connection pooling
- `Redis Streams` for async evidence queue
claude-code-skills.md