Building the LLM Gateway in Elixir: A Reverse Proxy for Agent-to-Model Traffic

This is the implementation guide for Technique 1 from Proxying Agent-to-LLM Traffic – the API URL redirect approach where external agents point their base URL at WorkingAgents instead of calling the LLM provider directly.

This is the most natural fit for WorkingAgents because it gives real-time visibility into everything an agent sends to and receives from its model – system prompts, conversation history, tool definitions, tool results, the full context window – without requiring TLS interception or agent source code modification.

What We’re Building

A Plug-based reverse proxy that:

  1. Speaks the Anthropic Messages API (POST /v1/messages)
  2. Speaks the OpenAI Chat Completions API (POST /v1/chat/completions)
  3. Speaks the Google Gemini API (POST /v1beta/...)
  4. Forwards requests to the real provider after inspection
  5. Streams responses back in real-time (SSE pass-through)
  6. Logs full payloads (request and response) to the audit trail
  7. Scans for injection patterns, PII, and policy violations
  8. Enforces cost and rate limits per user
  9. Integrates into the existing WorkingAgents router at /llm-gateway

External agents configure one environment variable and all their LLM traffic flows through WorkingAgents:

# Claude Code
export ANTHROPIC_BASE_URL=https://your-workingagents.com/llm-gateway/anthropic

# OpenAI Codex
export OPENAI_BASE_URL=https://your-workingagents.com/llm-gateway/openai

# Any OpenAI-compatible agent
export OPENAI_API_BASE=https://your-workingagents.com/llm-gateway/openai

The Streaming Challenge

This is the hardest part and the reason a naive Plug implementation won’t work. LLM APIs stream responses as Server-Sent Events (SSE):

Anthropic streaming format:

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}}

event: message_stop
data: {"type":"message_stop"}

OpenAI streaming format:

data: {"id":"chatcmpl-...","choices":[{"delta":{"content":"Hello"},"index":0}]}

data: {"id":"chatcmpl-...","choices":[{"delta":{"content":" world"},"index":0}]}

data: [DONE]

The proxy must:

Two Approaches

Approach A: reverse_proxy_plug (Fastest to Ship)

The reverse_proxy_plug library is purpose-built for this. It supports:

# mix.exs
defp deps do
  [
    {:reverse_proxy_plug, "~> 3.0"},
    # ... existing deps
  ]
end

Integration into the router:

# In my_mcp_server_router.ex

# Anthropic API proxy
forward "/llm-gateway/anthropic",
  to: ReverseProxyPlug,
  upstream: "https://api.anthropic.com",
  response_mode: :stream

# OpenAI API proxy
forward "/llm-gateway/openai",
  to: ReverseProxyPlug,
  upstream: "https://api.openai.com",
  response_mode: :stream

# Google Gemini API proxy
forward "/llm-gateway/google",
  to: ReverseProxyPlug,
  upstream: "https://generativelanguage.googleapis.com",
  response_mode: :stream

This gets a working proxy in 10 minutes. But it’s a dumb pipe – no inspection, no logging, no guardrails. The library supports callbacks for request/response modification, but building a full inspection pipeline on top of callbacks is awkward.

Verdict: Good for a proof-of-concept. Not sufficient for production governance.

Approach B: Custom Plug + Req Streaming (Full Control)

Build the proxy as a custom Plug module that handles request parsing, inspection, forwarding, streaming, and logging explicitly.

This is more code but gives complete control over every byte that passes through.

defmodule LLMGateway do
  @moduledoc """
  Reverse proxy for agent-to-LLM traffic.
  Inspects requests, forwards to the real provider,
  streams responses back while logging everything.
  """

  use Plug.Router
  require Logger

  plug Plug.Parsers,
    parsers: [:json],
    json_decoder: Jason,
    body_reader: {__MODULE__, :cache_body_reader, []}

  plug :match
  plug :dispatch

  # ── Anthropic Messages API ──────────────────────────────────────────

  match "/anthropic/*path" do
    proxy_request(conn, :anthropic, path)
  end

  # ── OpenAI Chat Completions API ─────────────────────────────────────

  match "/openai/*path" do
    proxy_request(conn, :openai, path)
  end

  # ── Google Gemini API ───────────────────────────────────────────────

  match "/google/*path" do
    proxy_request(conn, :google, path)
  end

  match _ do
    send_resp(conn, 404, "Unknown LLM provider")
  end

  # ── Core Proxy Logic ────────────────────────────────────────────────

  defp proxy_request(conn, provider, path) do
    user = resolve_user(conn)
    body = conn.body_params
    is_streaming = streaming_requested?(body, provider)

    # ── PRE-FLIGHT ──
    case LLMGateway.PreFlight.run(user, provider, body) do
      {:ok, body} ->
        if is_streaming do
          stream_proxy(conn, provider, path, body, user)
        else
          buffer_proxy(conn, provider, path, body, user)
        end

      {:rejected, reason} ->
        LLMGateway.Audit.log_rejection(user, provider, reason)
        conn
        |> put_resp_content_type("application/json")
        |> send_resp(403, Jason.encode!(%{error: %{message: "Request rejected: #{inspect(reason)}"}}))
    end
  end

  # ── Non-Streaming: Buffer Full Response ─────────────────────────────

  defp buffer_proxy(conn, provider, path, body, user) do
    url = provider_url(provider) <> "/" <> Enum.join(path, "/")
    headers = forward_headers(conn, provider)

    case Req.post(url, json: body, headers: headers, receive_timeout: 120_000) do
      {:ok, %Req.Response{status: status, body: resp_body, headers: resp_headers}} ->
        # ── POST-FLIGHT ──
        LLMGateway.PostFlight.run(user, provider, resp_body)
        LLMGateway.Audit.log_exchange(user, provider, body, resp_body, status)

        conn
        |> merge_resp_headers(resp_headers)
        |> put_resp_content_type("application/json")
        |> send_resp(status, Jason.encode!(resp_body))

      {:error, reason} ->
        Logger.error("LLMGateway: forward failed: #{inspect(reason)}")
        send_resp(conn, 502, Jason.encode!(%{error: %{message: "Gateway error"}}))
    end
  end

  # ── Streaming: SSE Pass-Through with Accumulation ───────────────────

  defp stream_proxy(conn, provider, path, body, user) do
    url = provider_url(provider) <> "/" <> Enum.join(path, "/")
    headers = forward_headers(conn, provider)

    # Start chunked response to the client
    conn =
      conn
      |> put_resp_content_type("text/event-stream")
      |> put_resp_header("cache-control", "no-cache")
      |> put_resp_header("connection", "keep-alive")
      |> send_chunked(200)

    # Accumulator for building the complete response for logging
    accumulator = %{chunks: [], tokens_in: 0, tokens_out: 0}

    # Use Req with streaming callback
    req_opts = [
      method: :post,
      url: url,
      json: body,
      headers: headers,
      receive_timeout: 120_000,
      into: fn {:data, data}, {req, resp} ->
        # Forward each SSE chunk to the client immediately
        case Plug.Conn.chunk(conn, data) do
          {:ok, _conn} -> {:cont, {req, resp}}
          {:error, _} -> {:halt, {req, resp}}
        end
      end
    ]

    # Spawn the streaming request
    # Accumulate chunks in a separate process for logging
    chunk_collector = spawn_link(fn -> collect_chunks(user, provider, body) end)

    case Req.request(req_opts) do
      {:ok, _response} ->
        send(chunk_collector, :done)
        conn

      {:error, reason} ->
        send(chunk_collector, {:error, reason})
        Logger.error("LLMGateway: streaming failed: #{inspect(reason)}")
        conn
    end
  end

  defp collect_chunks(user, provider, request_body) do
    do_collect(user, provider, request_body, [])
  end

  defp do_collect(user, provider, request_body, chunks) do
    receive do
      {:chunk, data} ->
        do_collect(user, provider, request_body, [data | chunks])

      :done ->
        full_response = chunks |> Enum.reverse() |> Enum.join()
        LLMGateway.Audit.log_streamed_exchange(user, provider, request_body, full_response)

      {:error, reason} ->
        LLMGateway.Audit.log_error(user, provider, request_body, reason)
    after
      180_000 -> :timeout
    end
  end

  # ── Helpers ─────────────────────────────────────────────────────────

  defp streaming_requested?(body, :anthropic), do: Map.get(body, "stream", false)
  defp streaming_requested?(body, :openai), do: Map.get(body, "stream", false)
  defp streaming_requested?(body, :google), do: Map.has_key?(body, "stream") and body["stream"]

  defp provider_url(:anthropic), do: "https://api.anthropic.com"
  defp provider_url(:openai), do: "https://api.openai.com"
  defp provider_url(:google), do: "https://generativelanguage.googleapis.com"

  defp forward_headers(conn, _provider) do
    conn.req_headers
    |> Enum.filter(fn {k, _} ->
      k in [
        "authorization", "x-api-key", "anthropic-version",
        "content-type", "accept", "anthropic-beta",
        "openai-organization", "x-goog-api-key"
      ]
    end)
  end

  defp merge_resp_headers(conn, headers) do
    Enum.reduce(headers, conn, fn
      {k, [v | _]}, c when k in ["x-request-id", "request-id"] ->
        put_resp_header(c, k, v)
      _, c -> c
    end)
  end

  defp resolve_user(conn) do
    # Reuse the existing get_user logic from the router
    # The LLM gateway is authenticated -- agents must provide a bearer token
    conn.assigns[:current_user] || %{id: 0, username: "unknown", permissions: %{}}
  end

  # Custom body reader that caches the raw body for forwarding
  def cache_body_reader(conn, opts) do
    {:ok, body, conn} = Plug.Conn.read_body(conn, opts)
    conn = Plug.Conn.put_private(conn, :raw_body, body)
    {:ok, body, conn}
  end
end

The PreFlight Pipeline

The LLM Gateway PreFlight inspects the full LLM request – system prompt, messages, tool definitions, everything:

defmodule LLMGateway.PreFlight do
  @checks [
    LLMGateway.SystemPromptGuard,
    LLMGateway.MessageGuard,
    LLMGateway.ToolResultGuard,
    LLMGateway.PiiScanner,
    LLMGateway.CostEstimator
  ]

  def run(user, provider, body) do
    messages = extract_messages(body, provider)

    Enum.reduce_while(@checks, {:ok, body}, fn check, {:ok, b} ->
      case check.scan(user, provider, b, messages) do
        {:ok, b} -> {:cont, {:ok, b}}
        {:rejected, reason} -> {:halt, {:rejected, reason}}
      end
    end)
  end

  defp extract_messages(body, :anthropic) do
    Map.get(body, "messages", [])
  end

  defp extract_messages(body, :openai) do
    Map.get(body, "messages", [])
  end

  defp extract_messages(body, :google) do
    get_in(body, ["contents"]) || []
  end
end

What Each Check Sees

This is the power of the gateway – it sees the full LLM request, not just tool calls:

SystemPromptGuard – inspects the system prompt for credential leaks, overly permissive instructions, or prompt injection vulnerabilities in the prompt itself.

defmodule LLMGateway.SystemPromptGuard do
  @credential_patterns [
    ~r/api[_-]?key\s*[:=]\s*['"]\w{20,}/i,
    ~r/password\s*[:=]\s*['"]/i,
    ~r/secret\s*[:=]\s*['"]\w{20,}/i,
    ~r/Bearer\s+[A-Za-z0-9\-._~+\/]+=*/
  ]

  def scan(_user, _provider, body, _messages) do
    system = extract_system(body)

    case Enum.find(@credential_patterns, &Regex.match?(&1, system || "")) do
      nil -> {:ok, body}
      pattern -> {:rejected, {:credential_in_system_prompt, Regex.source(pattern)}}
    end
  end

  defp extract_system(%{"system" => s}) when is_binary(s), do: s
  defp extract_system(%{"messages" => [%{"role" => "system", "content" => c} | _]}), do: c
  defp extract_system(_), do: nil
end

ToolResultGuard – scans tool results in the conversation for injection patterns. This is the highest-risk vector: external data entering the LLM’s context as trusted content.

defmodule LLMGateway.ToolResultGuard do
  @injection_patterns [
    ~r/ignore\s+(all\s+)?previous\s+instructions/i,
    ~r/IMPORTANT:\s*(?:ignore|forget|disregard)/i,
    ~r/you\s+are\s+now\s+(?:a|an)\s+/i,
    ~r/\[INST\]|\[\/INST\]|<<SYS>>|<\|im_start\|>/i
  ]

  def scan(_user, _provider, body, messages) do
    tool_results =
      messages
      |> Enum.filter(fn msg ->
        Map.get(msg, "role") == "tool" or
          Map.get(msg, "role") == "function"
      end)
      |> Enum.flat_map(&extract_text/1)

    case Enum.find(tool_results, &has_injection?/1) do
      nil -> {:ok, body}
      text -> {:rejected, {:tool_result_injection, String.slice(text, 0, 200)}}
    end
  end

  defp extract_text(%{"content" => c}) when is_binary(c), do: [c]
  defp extract_text(%{"content" => parts}) when is_list(parts) do
    Enum.flat_map(parts, fn
      %{"text" => t} -> [t]
      _ -> []
    end)
  end
  defp extract_text(_), do: []

  defp has_injection?(text) do
    Enum.any?(@injection_patterns, &Regex.match?(&1, text))
  end
end

PiiScanner – detects PII in messages before they leave for the LLM provider. Doesn’t redact by default (that would break the agent’s conversation), but logs a warning and can be configured to block.

CostEstimator – estimates token count from the request payload. If the user’s daily budget would be exceeded, blocks the call before it reaches the provider.

The Audit Module

The gateway’s audit captures what no other layer can see – the full conversation between agent and model:

defmodule LLMGateway.Audit do
  def log_exchange(user, provider, request, response, status) do
    messages = Map.get(request, "messages", [])

    Sqler.insert(:llm_gateway_db, "llm_gateway_audit", %{
      user_id: user.id,
      username: user.username,
      provider: to_string(provider),
      model: Map.get(request, "model", "unknown"),
      status: status,
      message_count: length(messages),
      has_tools: map_size(Map.get(request, "tools", %{})) > 0,
      has_system_prompt: has_system?(request),
      request_hash: hash(request),
      request_summary: summarize_request(request),
      response_summary: summarize_response(response),
      tokens_in: get_in_safe(response, ["usage", "input_tokens"]) ||
                 get_in_safe(response, ["usage", "prompt_tokens"]) || 0,
      tokens_out: get_in_safe(response, ["usage", "output_tokens"]) ||
                  get_in_safe(response, ["usage", "completion_tokens"]) || 0,
      created_at: System.system_time(:millisecond)
    })
  end

  def log_streamed_exchange(user, provider, request, raw_sse_data) do
    # Parse accumulated SSE data to extract final token counts
    token_info = parse_sse_usage(raw_sse_data, provider)

    Sqler.insert(:llm_gateway_db, "llm_gateway_audit", %{
      user_id: user.id,
      username: user.username,
      provider: to_string(provider),
      model: Map.get(request, "model", "unknown"),
      status: 200,
      message_count: length(Map.get(request, "messages", [])),
      has_tools: map_size(Map.get(request, "tools", %{})) > 0,
      has_system_prompt: has_system?(request),
      request_hash: hash(request),
      request_summary: summarize_request(request),
      response_summary: "[streamed] #{token_info.text_preview}",
      tokens_in: token_info.input_tokens,
      tokens_out: token_info.output_tokens,
      created_at: System.system_time(:millisecond)
    })
  end

  def log_rejection(user, provider, reason) do
    Sqler.insert(:llm_gateway_db, "llm_gateway_audit", %{
      user_id: user.id,
      username: user.username,
      provider: to_string(provider),
      model: "rejected",
      status: 403,
      message_count: 0,
      has_tools: false,
      has_system_prompt: false,
      request_hash: "",
      request_summary: "REJECTED: #{inspect(reason)}",
      response_summary: "",
      tokens_in: 0,
      tokens_out: 0,
      created_at: System.system_time(:millisecond)
    })
  end

  defp hash(data) do
    :crypto.hash(:sha256, Jason.encode!(data)) |> Base.encode16(case: :lower)
  end

  defp has_system?(%{"system" => _}), do: true
  defp has_system?(%{"messages" => [%{"role" => "system"} | _]}), do: true
  defp has_system?(_), do: false

  defp summarize_request(req) do
    messages = Map.get(req, "messages", [])
    last_user = messages |> Enum.filter(&(&1["role"] == "user")) |> List.last()
    content = if last_user, do: last_user["content"], else: ""
    text = if is_binary(content), do: content, else: inspect(content)
    String.slice(text, 0, 500)
  end

  defp summarize_response(resp) when is_map(resp) do
    inspect(resp) |> String.slice(0, 500)
  end
  defp summarize_response(other), do: inspect(other) |> String.slice(0, 200)

  defp get_in_safe(map, keys) when is_map(map) do
    Enum.reduce(keys, map, fn k, acc -> if is_map(acc), do: Map.get(acc, k), else: nil end)
  end
  defp get_in_safe(_, _), do: nil

  defp parse_sse_usage(raw_data, _provider) do
    # Extract usage info from the final SSE event
    # Both Anthropic and OpenAI include usage in the final message
    lines = String.split(raw_data, "\n")
    data_lines = Enum.filter(lines, &String.starts_with?(&1, "data: "))

    last_data =
      data_lines
      |> Enum.map(&String.trim_leading(&1, "data: "))
      |> Enum.reject(&(&1 == "[DONE]"))
      |> List.last()

    case last_data && Jason.decode(last_data) do
      {:ok, %{"usage" => usage}} ->
        %{
          input_tokens: usage["input_tokens"] || usage["prompt_tokens"] || 0,
          output_tokens: usage["output_tokens"] || usage["completion_tokens"] || 0,
          text_preview: "..."
        }
      _ ->
        # Accumulate text from delta chunks for preview
        text =
          data_lines
          |> Enum.map(&String.trim_leading(&1, "data: "))
          |> Enum.reject(&(&1 == "[DONE]"))
          |> Enum.flat_map(fn d ->
            case Jason.decode(d) do
              {:ok, %{"delta" => %{"text" => t}}} -> [t]
              {:ok, %{"choices" => [%{"delta" => %{"content" => t}}]}} when is_binary(t) -> [t]
              _ -> []
            end
          end)
          |> Enum.join()

        %{input_tokens: 0, output_tokens: 0, text_preview: String.slice(text, 0, 200)}
    end
  end
end

Integration Into the Router

The LLM Gateway mounts as a forward in the existing router, behind authentication:

# In my_mcp_server_router.ex

# LLM Gateway -- proxy agent-to-LLM traffic
# Requires authentication (bearer token or cookie)
forward "/llm-gateway", to: LLMGateway

Authentication reuses the existing require_authentication plug. Agents provide the same bearer token they use for MCP tool access. The gateway resolves the user from the token and applies per-user audit logging and cost limits.

The Streaming Architecture in Detail

The streaming proxy is the trickiest part. Here’s what happens step by step:

1. Agent sends POST /llm-gateway/anthropic/v1/messages
   Body: {model, messages, tools, stream: true}

2. LLM Gateway authenticates the agent (bearer token)

3. PreFlight pipeline runs:
   - SystemPromptGuard scans the system prompt
   - ToolResultGuard scans tool results in messages
   - PiiScanner checks for PII in outbound messages
   - CostEstimator checks the user's daily budget

4. Gateway opens a chunked response to the agent:
   Content-Type: text/event-stream

5. Gateway opens a streaming Req connection to the provider:
   POST https://api.anthropic.com/v1/messages
   Same headers, same body

6. For each SSE chunk received from the provider:
   a. Forward the chunk to the agent immediately (Plug.Conn.chunk)
   b. Send a copy to the chunk collector process for logging

7. When the stream ends:
   a. Chunk collector assembles the full response
   b. Audit.log_streamed_exchange writes the complete record
   c. PostFlight runs on the assembled response (optional)

8. Agent receives the response exactly as if it talked to the provider directly

The agent sees no difference. Same SSE format, same timing, same headers. The only observable change is the base URL.

What This Unlocks

With the LLM Gateway deployed, WorkingAgents can see:

What Without Gateway With Gateway
Agent’s system prompt Invisible Logged and scannable
Full conversation history Invisible Logged per turn
Tool definitions sent to the model Invisible Logged and validatable
Tool results in context Invisible Scanned for injection
Model response Invisible Logged with token counts
Total tokens and cost Unknown Tracked per user per day
PII in outbound prompts Undetected Detected and alertable
Credentials in system prompts Leaked silently Caught at the gateway

Combined with the MCP proxy (which sees tool calls) and the permission system (which gates tool access), the LLM Gateway completes the security picture:

LLM Gateway    -- sees what the agent thinks (prompts, reasoning, context)
MCP Proxy      -- sees what the agent does (tool calls, arguments, results)
Permissions    -- controls what the agent can do (capability-based gates)

File Structure

lib/
  llm_gateway.ex                    # Main Plug router
  llm_gateway/
    pre_flight.ex                   # PreFlight pipeline
    post_flight.ex                  # PostFlight pipeline
    system_prompt_guard.ex          # Credential leak detection in system prompts
    message_guard.ex                # Injection scanning in user messages
    tool_result_guard.ex            # Injection scanning in tool results
    pii_scanner.ex                  # PII detection in outbound payloads
    cost_estimator.ex               # Budget enforcement
    audit.ex                        # Full payload audit logging

Implementation Phases

Phase What Effort
1 Basic proxy: forward requests, forward responses, no streaming Half day
2 Streaming: SSE pass-through with chunk accumulation 1 day
3 Audit logging: full payload capture for both streaming and non-streaming Half day
4 PreFlight: system prompt guard + tool result guard Half day
5 PII scanner + cost estimator Half day
6 Integration: mount in router, authentication, per-user tracking Half day
Total ~3 days

Phase 1 and 2 are the foundation. Once requests flow through and responses stream back, everything else is pipeline modules that slot in.

Dependencies

One new dependency: reverse_proxy_plug (optional, for Approach A proof-of-concept only). The custom Plug approach (Approach B) uses only existing dependencies:

Zero new dependencies for the production implementation.

Sources: