This is the implementation guide for Technique 1 from Proxying Agent-to-LLM Traffic – the API URL redirect approach where external agents point their base URL at WorkingAgents instead of calling the LLM provider directly.
This is the most natural fit for WorkingAgents because it gives real-time visibility into everything an agent sends to and receives from its model – system prompts, conversation history, tool definitions, tool results, the full context window – without requiring TLS interception or agent source code modification.
What We’re Building
A Plug-based reverse proxy that:
-
Speaks the Anthropic Messages API (
POST /v1/messages) -
Speaks the OpenAI Chat Completions API (
POST /v1/chat/completions) -
Speaks the Google Gemini API (
POST /v1beta/...) - Forwards requests to the real provider after inspection
- Streams responses back in real-time (SSE pass-through)
- Logs full payloads (request and response) to the audit trail
- Scans for injection patterns, PII, and policy violations
- Enforces cost and rate limits per user
-
Integrates into the existing WorkingAgents router at
/llm-gateway
External agents configure one environment variable and all their LLM traffic flows through WorkingAgents:
# Claude Code
export ANTHROPIC_BASE_URL=https://your-workingagents.com/llm-gateway/anthropic
# OpenAI Codex
export OPENAI_BASE_URL=https://your-workingagents.com/llm-gateway/openai
# Any OpenAI-compatible agent
export OPENAI_API_BASE=https://your-workingagents.com/llm-gateway/openai
The Streaming Challenge
This is the hardest part and the reason a naive Plug implementation won’t work. LLM APIs stream responses as Server-Sent Events (SSE):
Anthropic streaming format:
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}}
event: message_stop
data: {"type":"message_stop"}
OpenAI streaming format:
data: {"id":"chatcmpl-...","choices":[{"delta":{"content":"Hello"},"index":0}]}
data: {"id":"chatcmpl-...","choices":[{"delta":{"content":" world"},"index":0}]}
data: [DONE]
The proxy must:
- Forward each SSE chunk to the client as soon as it arrives (zero delay)
- Accumulate chunks internally to build the complete response for logging
- Handle the connection staying open for 30-120 seconds during generation
- Not buffer the entire response before forwarding (that would kill streaming UX)
Two Approaches
Approach A: reverse_proxy_plug (Fastest to Ship)
The reverse_proxy_plug library is purpose-built for this. It supports:
-
Chunked transfer encoding with zero-delay pass-through (default
:streammode) - HTTP/2 with Cowboy 2
- Multiple HTTP client backends (Req, Finch, HTTPoison, Tesla)
- Request and response modification callbacks
# mix.exs
defp deps do
[
{:reverse_proxy_plug, "~> 3.0"},
# ... existing deps
]
end
Integration into the router:
# In my_mcp_server_router.ex
# Anthropic API proxy
forward "/llm-gateway/anthropic",
to: ReverseProxyPlug,
upstream: "https://api.anthropic.com",
response_mode: :stream
# OpenAI API proxy
forward "/llm-gateway/openai",
to: ReverseProxyPlug,
upstream: "https://api.openai.com",
response_mode: :stream
# Google Gemini API proxy
forward "/llm-gateway/google",
to: ReverseProxyPlug,
upstream: "https://generativelanguage.googleapis.com",
response_mode: :stream
This gets a working proxy in 10 minutes. But it’s a dumb pipe – no inspection, no logging, no guardrails. The library supports callbacks for request/response modification, but building a full inspection pipeline on top of callbacks is awkward.
Verdict: Good for a proof-of-concept. Not sufficient for production governance.
Approach B: Custom Plug + Req Streaming (Full Control)
Build the proxy as a custom Plug module that handles request parsing, inspection, forwarding, streaming, and logging explicitly.
This is more code but gives complete control over every byte that passes through.
defmodule LLMGateway do
@moduledoc """
Reverse proxy for agent-to-LLM traffic.
Inspects requests, forwards to the real provider,
streams responses back while logging everything.
"""
use Plug.Router
require Logger
plug Plug.Parsers,
parsers: [:json],
json_decoder: Jason,
body_reader: {__MODULE__, :cache_body_reader, []}
plug :match
plug :dispatch
# ── Anthropic Messages API ──────────────────────────────────────────
match "/anthropic/*path" do
proxy_request(conn, :anthropic, path)
end
# ── OpenAI Chat Completions API ─────────────────────────────────────
match "/openai/*path" do
proxy_request(conn, :openai, path)
end
# ── Google Gemini API ───────────────────────────────────────────────
match "/google/*path" do
proxy_request(conn, :google, path)
end
match _ do
send_resp(conn, 404, "Unknown LLM provider")
end
# ── Core Proxy Logic ────────────────────────────────────────────────
defp proxy_request(conn, provider, path) do
user = resolve_user(conn)
body = conn.body_params
is_streaming = streaming_requested?(body, provider)
# ── PRE-FLIGHT ──
case LLMGateway.PreFlight.run(user, provider, body) do
{:ok, body} ->
if is_streaming do
stream_proxy(conn, provider, path, body, user)
else
buffer_proxy(conn, provider, path, body, user)
end
{:rejected, reason} ->
LLMGateway.Audit.log_rejection(user, provider, reason)
conn
|> put_resp_content_type("application/json")
|> send_resp(403, Jason.encode!(%{error: %{message: "Request rejected: #{inspect(reason)}"}}))
end
end
# ── Non-Streaming: Buffer Full Response ─────────────────────────────
defp buffer_proxy(conn, provider, path, body, user) do
url = provider_url(provider) <> "/" <> Enum.join(path, "/")
headers = forward_headers(conn, provider)
case Req.post(url, json: body, headers: headers, receive_timeout: 120_000) do
{:ok, %Req.Response{status: status, body: resp_body, headers: resp_headers}} ->
# ── POST-FLIGHT ──
LLMGateway.PostFlight.run(user, provider, resp_body)
LLMGateway.Audit.log_exchange(user, provider, body, resp_body, status)
conn
|> merge_resp_headers(resp_headers)
|> put_resp_content_type("application/json")
|> send_resp(status, Jason.encode!(resp_body))
{:error, reason} ->
Logger.error("LLMGateway: forward failed: #{inspect(reason)}")
send_resp(conn, 502, Jason.encode!(%{error: %{message: "Gateway error"}}))
end
end
# ── Streaming: SSE Pass-Through with Accumulation ───────────────────
defp stream_proxy(conn, provider, path, body, user) do
url = provider_url(provider) <> "/" <> Enum.join(path, "/")
headers = forward_headers(conn, provider)
# Start chunked response to the client
conn =
conn
|> put_resp_content_type("text/event-stream")
|> put_resp_header("cache-control", "no-cache")
|> put_resp_header("connection", "keep-alive")
|> send_chunked(200)
# Accumulator for building the complete response for logging
accumulator = %{chunks: [], tokens_in: 0, tokens_out: 0}
# Use Req with streaming callback
req_opts = [
method: :post,
url: url,
json: body,
headers: headers,
receive_timeout: 120_000,
into: fn {:data, data}, {req, resp} ->
# Forward each SSE chunk to the client immediately
case Plug.Conn.chunk(conn, data) do
{:ok, _conn} -> {:cont, {req, resp}}
{:error, _} -> {:halt, {req, resp}}
end
end
]
# Spawn the streaming request
# Accumulate chunks in a separate process for logging
chunk_collector = spawn_link(fn -> collect_chunks(user, provider, body) end)
case Req.request(req_opts) do
{:ok, _response} ->
send(chunk_collector, :done)
conn
{:error, reason} ->
send(chunk_collector, {:error, reason})
Logger.error("LLMGateway: streaming failed: #{inspect(reason)}")
conn
end
end
defp collect_chunks(user, provider, request_body) do
do_collect(user, provider, request_body, [])
end
defp do_collect(user, provider, request_body, chunks) do
receive do
{:chunk, data} ->
do_collect(user, provider, request_body, [data | chunks])
:done ->
full_response = chunks |> Enum.reverse() |> Enum.join()
LLMGateway.Audit.log_streamed_exchange(user, provider, request_body, full_response)
{:error, reason} ->
LLMGateway.Audit.log_error(user, provider, request_body, reason)
after
180_000 -> :timeout
end
end
# ── Helpers ─────────────────────────────────────────────────────────
defp streaming_requested?(body, :anthropic), do: Map.get(body, "stream", false)
defp streaming_requested?(body, :openai), do: Map.get(body, "stream", false)
defp streaming_requested?(body, :google), do: Map.has_key?(body, "stream") and body["stream"]
defp provider_url(:anthropic), do: "https://api.anthropic.com"
defp provider_url(:openai), do: "https://api.openai.com"
defp provider_url(:google), do: "https://generativelanguage.googleapis.com"
defp forward_headers(conn, _provider) do
conn.req_headers
|> Enum.filter(fn {k, _} ->
k in [
"authorization", "x-api-key", "anthropic-version",
"content-type", "accept", "anthropic-beta",
"openai-organization", "x-goog-api-key"
]
end)
end
defp merge_resp_headers(conn, headers) do
Enum.reduce(headers, conn, fn
{k, [v | _]}, c when k in ["x-request-id", "request-id"] ->
put_resp_header(c, k, v)
_, c -> c
end)
end
defp resolve_user(conn) do
# Reuse the existing get_user logic from the router
# The LLM gateway is authenticated -- agents must provide a bearer token
conn.assigns[:current_user] || %{id: 0, username: "unknown", permissions: %{}}
end
# Custom body reader that caches the raw body for forwarding
def cache_body_reader(conn, opts) do
{:ok, body, conn} = Plug.Conn.read_body(conn, opts)
conn = Plug.Conn.put_private(conn, :raw_body, body)
{:ok, body, conn}
end
end
The PreFlight Pipeline
The LLM Gateway PreFlight inspects the full LLM request – system prompt, messages, tool definitions, everything:
defmodule LLMGateway.PreFlight do
@checks [
LLMGateway.SystemPromptGuard,
LLMGateway.MessageGuard,
LLMGateway.ToolResultGuard,
LLMGateway.PiiScanner,
LLMGateway.CostEstimator
]
def run(user, provider, body) do
messages = extract_messages(body, provider)
Enum.reduce_while(@checks, {:ok, body}, fn check, {:ok, b} ->
case check.scan(user, provider, b, messages) do
{:ok, b} -> {:cont, {:ok, b}}
{:rejected, reason} -> {:halt, {:rejected, reason}}
end
end)
end
defp extract_messages(body, :anthropic) do
Map.get(body, "messages", [])
end
defp extract_messages(body, :openai) do
Map.get(body, "messages", [])
end
defp extract_messages(body, :google) do
get_in(body, ["contents"]) || []
end
end
What Each Check Sees
This is the power of the gateway – it sees the full LLM request, not just tool calls:
SystemPromptGuard – inspects the system prompt for credential leaks, overly permissive instructions, or prompt injection vulnerabilities in the prompt itself.
defmodule LLMGateway.SystemPromptGuard do
@credential_patterns [
~r/api[_-]?key\s*[:=]\s*['"]\w{20,}/i,
~r/password\s*[:=]\s*['"]/i,
~r/secret\s*[:=]\s*['"]\w{20,}/i,
~r/Bearer\s+[A-Za-z0-9\-._~+\/]+=*/
]
def scan(_user, _provider, body, _messages) do
system = extract_system(body)
case Enum.find(@credential_patterns, &Regex.match?(&1, system || "")) do
nil -> {:ok, body}
pattern -> {:rejected, {:credential_in_system_prompt, Regex.source(pattern)}}
end
end
defp extract_system(%{"system" => s}) when is_binary(s), do: s
defp extract_system(%{"messages" => [%{"role" => "system", "content" => c} | _]}), do: c
defp extract_system(_), do: nil
end
ToolResultGuard – scans tool results in the conversation for injection patterns. This is the highest-risk vector: external data entering the LLM’s context as trusted content.
defmodule LLMGateway.ToolResultGuard do
@injection_patterns [
~r/ignore\s+(all\s+)?previous\s+instructions/i,
~r/IMPORTANT:\s*(?:ignore|forget|disregard)/i,
~r/you\s+are\s+now\s+(?:a|an)\s+/i,
~r/\[INST\]|\[\/INST\]|<<SYS>>|<\|im_start\|>/i
]
def scan(_user, _provider, body, messages) do
tool_results =
messages
|> Enum.filter(fn msg ->
Map.get(msg, "role") == "tool" or
Map.get(msg, "role") == "function"
end)
|> Enum.flat_map(&extract_text/1)
case Enum.find(tool_results, &has_injection?/1) do
nil -> {:ok, body}
text -> {:rejected, {:tool_result_injection, String.slice(text, 0, 200)}}
end
end
defp extract_text(%{"content" => c}) when is_binary(c), do: [c]
defp extract_text(%{"content" => parts}) when is_list(parts) do
Enum.flat_map(parts, fn
%{"text" => t} -> [t]
_ -> []
end)
end
defp extract_text(_), do: []
defp has_injection?(text) do
Enum.any?(@injection_patterns, &Regex.match?(&1, text))
end
end
PiiScanner – detects PII in messages before they leave for the LLM provider. Doesn’t redact by default (that would break the agent’s conversation), but logs a warning and can be configured to block.
CostEstimator – estimates token count from the request payload. If the user’s daily budget would be exceeded, blocks the call before it reaches the provider.
The Audit Module
The gateway’s audit captures what no other layer can see – the full conversation between agent and model:
defmodule LLMGateway.Audit do
def log_exchange(user, provider, request, response, status) do
messages = Map.get(request, "messages", [])
Sqler.insert(:llm_gateway_db, "llm_gateway_audit", %{
user_id: user.id,
username: user.username,
provider: to_string(provider),
model: Map.get(request, "model", "unknown"),
status: status,
message_count: length(messages),
has_tools: map_size(Map.get(request, "tools", %{})) > 0,
has_system_prompt: has_system?(request),
request_hash: hash(request),
request_summary: summarize_request(request),
response_summary: summarize_response(response),
tokens_in: get_in_safe(response, ["usage", "input_tokens"]) ||
get_in_safe(response, ["usage", "prompt_tokens"]) || 0,
tokens_out: get_in_safe(response, ["usage", "output_tokens"]) ||
get_in_safe(response, ["usage", "completion_tokens"]) || 0,
created_at: System.system_time(:millisecond)
})
end
def log_streamed_exchange(user, provider, request, raw_sse_data) do
# Parse accumulated SSE data to extract final token counts
token_info = parse_sse_usage(raw_sse_data, provider)
Sqler.insert(:llm_gateway_db, "llm_gateway_audit", %{
user_id: user.id,
username: user.username,
provider: to_string(provider),
model: Map.get(request, "model", "unknown"),
status: 200,
message_count: length(Map.get(request, "messages", [])),
has_tools: map_size(Map.get(request, "tools", %{})) > 0,
has_system_prompt: has_system?(request),
request_hash: hash(request),
request_summary: summarize_request(request),
response_summary: "[streamed] #{token_info.text_preview}",
tokens_in: token_info.input_tokens,
tokens_out: token_info.output_tokens,
created_at: System.system_time(:millisecond)
})
end
def log_rejection(user, provider, reason) do
Sqler.insert(:llm_gateway_db, "llm_gateway_audit", %{
user_id: user.id,
username: user.username,
provider: to_string(provider),
model: "rejected",
status: 403,
message_count: 0,
has_tools: false,
has_system_prompt: false,
request_hash: "",
request_summary: "REJECTED: #{inspect(reason)}",
response_summary: "",
tokens_in: 0,
tokens_out: 0,
created_at: System.system_time(:millisecond)
})
end
defp hash(data) do
:crypto.hash(:sha256, Jason.encode!(data)) |> Base.encode16(case: :lower)
end
defp has_system?(%{"system" => _}), do: true
defp has_system?(%{"messages" => [%{"role" => "system"} | _]}), do: true
defp has_system?(_), do: false
defp summarize_request(req) do
messages = Map.get(req, "messages", [])
last_user = messages |> Enum.filter(&(&1["role"] == "user")) |> List.last()
content = if last_user, do: last_user["content"], else: ""
text = if is_binary(content), do: content, else: inspect(content)
String.slice(text, 0, 500)
end
defp summarize_response(resp) when is_map(resp) do
inspect(resp) |> String.slice(0, 500)
end
defp summarize_response(other), do: inspect(other) |> String.slice(0, 200)
defp get_in_safe(map, keys) when is_map(map) do
Enum.reduce(keys, map, fn k, acc -> if is_map(acc), do: Map.get(acc, k), else: nil end)
end
defp get_in_safe(_, _), do: nil
defp parse_sse_usage(raw_data, _provider) do
# Extract usage info from the final SSE event
# Both Anthropic and OpenAI include usage in the final message
lines = String.split(raw_data, "\n")
data_lines = Enum.filter(lines, &String.starts_with?(&1, "data: "))
last_data =
data_lines
|> Enum.map(&String.trim_leading(&1, "data: "))
|> Enum.reject(&(&1 == "[DONE]"))
|> List.last()
case last_data && Jason.decode(last_data) do
{:ok, %{"usage" => usage}} ->
%{
input_tokens: usage["input_tokens"] || usage["prompt_tokens"] || 0,
output_tokens: usage["output_tokens"] || usage["completion_tokens"] || 0,
text_preview: "..."
}
_ ->
# Accumulate text from delta chunks for preview
text =
data_lines
|> Enum.map(&String.trim_leading(&1, "data: "))
|> Enum.reject(&(&1 == "[DONE]"))
|> Enum.flat_map(fn d ->
case Jason.decode(d) do
{:ok, %{"delta" => %{"text" => t}}} -> [t]
{:ok, %{"choices" => [%{"delta" => %{"content" => t}}]}} when is_binary(t) -> [t]
_ -> []
end
end)
|> Enum.join()
%{input_tokens: 0, output_tokens: 0, text_preview: String.slice(text, 0, 200)}
end
end
end
Integration Into the Router
The LLM Gateway mounts as a forward in the existing router, behind authentication:
# In my_mcp_server_router.ex
# LLM Gateway -- proxy agent-to-LLM traffic
# Requires authentication (bearer token or cookie)
forward "/llm-gateway", to: LLMGateway
Authentication reuses the existing require_authentication plug. Agents provide the same bearer token they use for MCP tool access. The gateway resolves the user from the token and applies per-user audit logging and cost limits.
The Streaming Architecture in Detail
The streaming proxy is the trickiest part. Here’s what happens step by step:
1. Agent sends POST /llm-gateway/anthropic/v1/messages
Body: {model, messages, tools, stream: true}
2. LLM Gateway authenticates the agent (bearer token)
3. PreFlight pipeline runs:
- SystemPromptGuard scans the system prompt
- ToolResultGuard scans tool results in messages
- PiiScanner checks for PII in outbound messages
- CostEstimator checks the user's daily budget
4. Gateway opens a chunked response to the agent:
Content-Type: text/event-stream
5. Gateway opens a streaming Req connection to the provider:
POST https://api.anthropic.com/v1/messages
Same headers, same body
6. For each SSE chunk received from the provider:
a. Forward the chunk to the agent immediately (Plug.Conn.chunk)
b. Send a copy to the chunk collector process for logging
7. When the stream ends:
a. Chunk collector assembles the full response
b. Audit.log_streamed_exchange writes the complete record
c. PostFlight runs on the assembled response (optional)
8. Agent receives the response exactly as if it talked to the provider directly
The agent sees no difference. Same SSE format, same timing, same headers. The only observable change is the base URL.
What This Unlocks
With the LLM Gateway deployed, WorkingAgents can see:
| What | Without Gateway | With Gateway |
|---|---|---|
| Agent’s system prompt | Invisible | Logged and scannable |
| Full conversation history | Invisible | Logged per turn |
| Tool definitions sent to the model | Invisible | Logged and validatable |
| Tool results in context | Invisible | Scanned for injection |
| Model response | Invisible | Logged with token counts |
| Total tokens and cost | Unknown | Tracked per user per day |
| PII in outbound prompts | Undetected | Detected and alertable |
| Credentials in system prompts | Leaked silently | Caught at the gateway |
Combined with the MCP proxy (which sees tool calls) and the permission system (which gates tool access), the LLM Gateway completes the security picture:
LLM Gateway -- sees what the agent thinks (prompts, reasoning, context)
MCP Proxy -- sees what the agent does (tool calls, arguments, results)
Permissions -- controls what the agent can do (capability-based gates)
File Structure
lib/
llm_gateway.ex # Main Plug router
llm_gateway/
pre_flight.ex # PreFlight pipeline
post_flight.ex # PostFlight pipeline
system_prompt_guard.ex # Credential leak detection in system prompts
message_guard.ex # Injection scanning in user messages
tool_result_guard.ex # Injection scanning in tool results
pii_scanner.ex # PII detection in outbound payloads
cost_estimator.ex # Budget enforcement
audit.ex # Full payload audit logging
Implementation Phases
| Phase | What | Effort |
|---|---|---|
| 1 | Basic proxy: forward requests, forward responses, no streaming | Half day |
| 2 | Streaming: SSE pass-through with chunk accumulation | 1 day |
| 3 | Audit logging: full payload capture for both streaming and non-streaming | Half day |
| 4 | PreFlight: system prompt guard + tool result guard | Half day |
| 5 | PII scanner + cost estimator | Half day |
| 6 | Integration: mount in router, authentication, per-user tracking | Half day |
| Total | ~3 days |
Phase 1 and 2 are the foundation. Once requests flow through and responses stream back, everything else is pipeline modules that slot in.
Dependencies
One new dependency: reverse_proxy_plug (optional, for Approach A proof-of-concept only). The custom Plug approach (Approach B) uses only existing dependencies:
- Plug – already in the project (router framework)
- Req – already in the project (HTTP client for provider calls)
- Jason – already in the project (JSON encoding/decoding)
- Sqler – already in the project (audit logging to SQLite)
Zero new dependencies for the production implementation.
Sources: