Implementing the Two-Table Workflow Engine in Elixir

The design is simple: two tables, a ready flag, an executor that owns the table, and events that speak two messages. This article walks through the actual implementation – the modules, the schemas, the code that makes it run.

Everything here follows the conventions of the host application: GenServers registered by module name, SQLite via a private Sqler instance, millisecond timestamp IDs, optimistic locking with updated_at, and clean separation between the server process and the functional module.


The Tables

Two tables. The executor creates both on startup.

CREATE TABLE IF NOT EXISTS workflows (
  id           INTEGER PRIMARY KEY,
  updated_at   INTEGER NOT NULL,
  name         TEXT NOT NULL,
  flow_json    TEXT NOT NULL,
  input_json   TEXT,
  status       TEXT NOT NULL DEFAULT 'running',
  created_by   TEXT,
  completed_at INTEGER,
  cancelled_at INTEGER
);

CREATE TABLE IF NOT EXISTS workflow_steps (
  id           INTEGER PRIMARY KEY,
  updated_at   INTEGER NOT NULL,
  workflow_id  INTEGER NOT NULL,
  name         TEXT NOT NULL,
  tool         TEXT,
  args_json    TEXT,
  result_json  TEXT,
  status       TEXT NOT NULL DEFAULT 'pending',
  attempt      INTEGER NOT NULL DEFAULT 1,
  ready_at     INTEGER,
  started_at   INTEGER,
  completed_at INTEGER
);

CREATE INDEX IF NOT EXISTS idx_steps_ready
  ON workflow_steps (status, ready_at)
  WHERE status = 'ready';

The flow_json column in workflows stores the entire step graph – a JSON map of step name to transition spec. This is the only place workflow logic lives. Steps are dumb rows; the workflow definition answers “what comes next?”

The index on status = 'ready' and ready_at is the only index that matters for the executor’s hot path. It makes the poll query fast regardless of how many total steps exist.


The Workflow Module

Workflow is a pure functional module. No process, no state. It owns the schema and all SQL operations. The executor calls it.

defmodule Workflow do
  @fields ~w(id updated_at name flow_json input_json status
             created_by completed_at cancelled_at)a

  def setup_database(db) do
    Sqler.sql(db, """
      CREATE TABLE IF NOT EXISTS workflows ( ... )
    """)
    Sqler.sql(db, """
      CREATE TABLE IF NOT EXISTS workflow_steps ( ... )
    """)
    Sqler.sql(db, """
      CREATE INDEX IF NOT EXISTS idx_steps_ready
        ON workflow_steps (status, ready_at) WHERE status = 'ready'
    """)
  end

  def create(db, name, flow, input, user) do
    id = System.system_time(:millisecond)
    now = id
    Sqler.insert(db, "workflows", %{
      id: id, updated_at: now,
      name: name,
      flow_json: Jason.encode!(flow),
      input_json: Jason.encode!(input),
      status: "running",
      created_by: user
    })
    {:ok, id}
  end

  def next_steps(db, workflow_id, completed_step_name, result) do
    {:ok, row} = get(db, workflow_id)
    flow = Jason.decode!(row.flow_json)
    step_spec = Map.get(flow, completed_step_name)
    resolve_transitions(flow, step_spec, result, db, workflow_id)
  end

  defp resolve_transitions(_flow, nil, _result, _db, _wf_id), do: []

  defp resolve_transitions(_flow, %{"next" => name} = spec, _result, _db, _wf_id) do
    [Map.put(spec, "name", name)]
  end

  defp resolve_transitions(_flow, %{"branch" => branches}, result, _db, _wf_id) do
    match = Enum.find(branches, fn %{"if" => condition} ->
      evaluate(condition, result)
    end)
    case match do
      %{"then" => name} = spec -> [Map.put(spec, "name", name)]
      nil -> []
    end
  end

  defp resolve_transitions(_flow, %{"parallel" => names} = spec, _result, _db, _wf_id) do
    Enum.map(names, fn name -> Map.merge(spec, %{"name" => name}) end)
  end

  defp resolve_transitions(_flow, %{"join" => join_step} = spec, _result, db, wf_id) do
    siblings = pending_parallel_steps(db, wf_id, join_step)
    if siblings == [] do
      [Map.put(spec, "name", join_step)]
    else
      []
    end
  end

  defp resolve_transitions(_flow, %{"done" => true}, _result, _db, _wf_id), do: :done

  defp evaluate("result == true", result),  do: result == "true" or result == true
  defp evaluate("result == false", result), do: result == "false" or result == false
  defp evaluate("result != nil", result),   do: result not in [nil, "", "nil"]
  defp evaluate("result == nil", result),   do: result in [nil, "", "nil"]
  defp evaluate(<<"result == ", v::binary>>, result), do: result == v
  defp evaluate(<<"result != ", v::binary>>, result), do: result != v
  defp evaluate(_condition, _result), do: false
end

next_steps/4 is the single function the executor calls after a step completes. It decodes the flow, finds the completed step’s transition spec, and returns a list of step specs to create – zero (done or waiting on siblings), one (sequential or branch), or many (parallel). The :done atom signals the workflow is finished.

Conditions are evaluated by explicit pattern matching. No Code.eval_string. The condition language is small and safe. If a new condition type is needed, a new function head is added.


The Step Module

Workflow.Step handles the actual execution of a single step. It runs in a Task process spawned by the executor. It knows nothing about what comes before or after it.

defmodule Workflow.Step do
  def run(step, context) do
    tool = step.tool
    args = resolve_args(step.args_json, context)
    MyMCPServer.Manager.call_tool_as(tool, args, context.user)
  end

  defp resolve_args(nil, _context), do: %{}

  defp resolve_args(args_json, context) do
    args = Jason.decode!(args_json)
    input = context.input || %{}
    Enum.into(args, %{}, fn {k, v} ->
      {k, resolve_value(v, input)}
    end)
  end

  defp resolve_value("{{input." <> rest, input) do
    key = String.trim_trailing(rest, "}}")
    Map.get(input, key, "")
  end

  defp resolve_value(v, _input), do: v
end

{{input.key}} templates are resolved against the workflow’s original input map. The step calls MyMCPServer.Manager.call_tool_as/3 – the same permission-gated path as any direct MCP tool call. The step runs as the user who created the workflow. No privilege escalation.


The Executor

WorkflowExecutor is a GenServer registered by module name. It starts its own Sqler instance. It is the only process that touches the database. Everything goes through it.

defmodule WorkflowExecutor do
  use GenServer

  @poll_interval 1_000   # ms
  @stuck_timeout 120     # seconds -- steps stuck in 'running' longer than this

  def start_link(_), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)

  def init(_) do
    {:ok, db} = Sqler.start_link("workflow")
    Workflow.setup_database(db)
    schedule_poll()
    schedule_watchdog()
    {:ok, %{db: db}}
  end

  # --- Public API (all go through the GenServer) ---

  def start_workflow(name, flow, input, user) do
    GenServer.call(__MODULE__, {:start_workflow, name, flow, input, user})
  end

  def step_ready(step_id) do
    GenServer.cast(__MODULE__, {:step_ready, step_id})
  end

  def step_done(step_id, result) do
    GenServer.cast(__MODULE__, {:step_done, step_id, result})
  end

  def step_failed(step_id, reason) do
    GenServer.cast(__MODULE__, {:step_failed, step_id, reason})
  end

  # --- Handlers ---

  def handle_call({:start_workflow, name, flow, input, user}, _from, %{db: db} = state) do
    {:ok, workflow_id} = Workflow.create(db, name, flow, input, user)
    first_step = first_step_spec(flow)
    create_and_ready_step(db, workflow_id, first_step)
    {:reply, {:ok, workflow_id}, state}
  end

  def handle_cast({:step_ready, step_id}, %{db: db} = state) do
    now = System.system_time(:millisecond)
    Sqler.update(db, "workflow_steps", step_id, %{status: "running", started_at: now})
    step = Sqler.get(db, "workflow_steps", step_id)
    workflow = Sqler.get(db, "workflows", step.workflow_id)
    context = %{user: workflow.created_by, input: Jason.decode!(workflow.input_json || "{}")}
    Task.start(fn ->
      result = Workflow.Step.run(step, context)
      case result do
        {:ok, value}    -> WorkflowExecutor.step_done(step_id, value)
        {:error, reason} -> WorkflowExecutor.step_failed(step_id, to_string(reason))
      end
    end)
    {:noreply, state}
  end

  def handle_cast({:step_done, step_id, result}, %{db: db} = state) do
    now = System.system_time(:millisecond)
    Sqler.update(db, "workflow_steps", step_id, %{
      status: "done",
      result_json: Jason.encode!(result),
      completed_at: now
    })
    step = Sqler.get(db, "workflow_steps", step_id)
    next = Workflow.next_steps(db, step.workflow_id, step.name, result)
    case next do
      :done ->
        Sqler.update(db, "workflows", step.workflow_id, %{
          status: "completed",
          completed_at: now
        })
      [] ->
        :ok  # parallel branch -- siblings still running
      specs when is_list(specs) ->
        Enum.each(specs, fn spec ->
          create_and_ready_step(db, step.workflow_id, spec)
        end)
    end
    {:noreply, state}
  end

  def handle_cast({:step_failed, step_id, reason}, %{db: db} = state) do
    now = System.system_time(:millisecond)
    step = Sqler.get(db, "workflow_steps", step_id)
    max_attempts = 3
    if step.attempt < max_attempts do
      # schedule retry via Alarm
      retry_at = div(now, 1000) + backoff_seconds(step.attempt)
      new_step_id = System.system_time(:millisecond)
      Sqler.insert(db, "workflow_steps", %{
        id: new_step_id, updated_at: now,
        workflow_id: step.workflow_id,
        name: step.name,
        tool: step.tool,
        args_json: step.args_json,
        status: "pending",
        attempt: step.attempt + 1
      })
      Alarm.set_timer(retry_at, WorkflowExecutor, {:step_ready, new_step_id})
      Sqler.update(db, "workflow_steps", step_id, %{status: "failed", result_json: reason})
    else
      Sqler.update(db, "workflow_steps", step_id, %{status: "failed", result_json: reason})
      Sqler.update(db, "workflows", step.workflow_id, %{status: "failed"})
    end
    {:noreply, state}
  end

  # Alarm fires this
  def handle_cast({:step_ready, step_id}, state) do
    handle_cast({:step_ready, step_id}, state)
  end

  # --- Poll loop ---

  def handle_info(:poll, %{db: db} = state) do
    ready_steps = Sqler.query(db, """
      SELECT id FROM workflow_steps
      WHERE status = 'ready'
      ORDER BY ready_at ASC
      LIMIT 50
    """)
    Enum.each(ready_steps, fn %{id: id} ->
      GenServer.cast(__MODULE__, {:step_ready, id})
    end)
    schedule_poll()
    {:noreply, state}
  end

  def handle_info(:watchdog, %{db: db} = state) do
    cutoff = div(System.system_time(:millisecond), 1000) - @stuck_timeout
    stuck = Sqler.query(db, """
      SELECT id FROM workflow_steps
      WHERE status = 'running' AND started_at < ?
    """, [cutoff * 1000])
    Enum.each(stuck, fn %{id: id} ->
      GenServer.cast(__MODULE__, {:step_failed, id, "timeout"})
    end)
    schedule_watchdog()
    {:noreply, state}
  end

  # --- Helpers ---

  defp create_and_ready_step(db, workflow_id, spec) do
    now = System.system_time(:millisecond)
    id = now
    Sqler.insert(db, "workflow_steps", %{
      id: id, updated_at: now,
      workflow_id: workflow_id,
      name: spec["name"],
      tool: spec["tool"],
      args_json: Jason.encode!(spec["args"] || %{}),
      status: "ready",
      ready_at: now,
      attempt: 1
    })
  end

  defp first_step_spec(flow) do
    flow["start"] || flow |> Map.values() |> List.first()
  end

  defp backoff_seconds(1), do: 30
  defp backoff_seconds(2), do: 120
  defp backoff_seconds(_), do: 300

  defp schedule_poll,     do: Process.send_after(self(), :poll, @poll_interval)
  defp schedule_watchdog, do: Process.send_after(self(), :watchdog, 30_000)
end

The executor’s handle_cast for :step_ready immediately marks the step running before spawning the Task. This prevents the poll loop from picking it up again. The step runs in its own process. When it finishes, the Task calls WorkflowExecutor.step_done or WorkflowExecutor.step_failed – two messages back to the executor, and the executor handles them serialized. No races.

The poll loop is a fallback. In normal operation, step completions chain directly via create_and_ready_step. The poll is there to pick up steps that were set ready by external events (alarms, UI) while no other step was running.


How Events Plug In

Every external event source sends one message to the executor. Nothing else.

Alarm:

# Alarm fires GenServer.cast(WorkflowExecutor, {:step_ready, step_id})
# The executor registered itself -- Alarm just needs the module name and message
Alarm.set_timer(unix_time, WorkflowExecutor, {:step_ready, step_id})

UI button / MCP tool:

def workflow_approve(step_id, _user) do
  WorkflowExecutor.step_ready(step_id)
  {:ok, "approved"}
end

Task completion:

# In NisTask, after marking a task completed:
if linked_step_id = task.workflow_step_id do
  WorkflowExecutor.step_ready(linked_step_id)
end

The alarm doesn’t import WorkflowExecutor. It takes a module name and a message and fires a cast at the scheduled time. The UI handler doesn’t know what the step does. The task completion handler doesn’t know it’s participating in a workflow. They know a step ID. That’s all.


Defining a Workflow

A workflow definition is a map. The flow_json column stores it. The start key names the first step.

flow = %{
  "start" => %{
    "name" => "search",
    "tool" => "knowledge_search",
    "args" => %{"query" => "{{input.topic}}"},
    "next" => "summarize"
  },
  "summarize" => %{
    "name" => "summarize",
    "tool" => "knowledge_get",
    "args" => %{"id" => "{{input.doc_id}}"},
    "next" => "notify"
  },
  "notify" => %{
    "name" => "notify",
    "tool" => "pushover_send",
    "args" => %{"message" => "Research complete"},
    "done" => true
  }
}

WorkflowExecutor.start_workflow("research", flow, %{"topic" => "AI safety"}, "james")

A branching workflow:

flow = %{
  "start" => %{
    "name" => "check_admin",
    "tool" => "is_admin",
    "args" => %{},
    "branch" => [
      %{"if" => "result == true",  "then" => "admin_action"},
      %{"if" => "result == false", "then" => "user_action"}
    ]
  },
  "admin_action" => %{"tool" => "knowledge_add", ..., "done" => true},
  "user_action"  => %{"tool" => "get_greeting",  ..., "done" => true}
}

An approval gate – a step with no tool that waits in pending until a human acts:

flow = %{
  "start" => %{
    "name" => "request_approval",
    "tool" => nil,
    "args" => %{},
    "next" => "execute_write"
    # executor creates this step as 'pending', not 'ready'
    # notification is sent; human calls workflow_approve(step_id)
    # executor receives step_ready, marks running, tool is nil so result is 'approved'
    # next step created and readied
  },
  "execute_write" => %{
    "name" => "execute_write",
    "tool" => "knowledge_add",
    "args" => %{"title" => "{{input.title}}", "body" => "{{input.body}}"},
    "done" => true
  }
}

Supervision Tree

One line in application.ex, after {Alarm, []}:

{WorkflowExecutor, []}

WorkflowExecutor.init/1 starts its Sqler, creates its tables, schedules the first poll and the first watchdog tick. The entire workflow engine is online. No external services, no additional configuration, no migration scripts. The tables are created idempotently on every startup via CREATE TABLE IF NOT EXISTS.


What the Implementation Does Not Include

This is a deliberate list.

No workflow versioning. If you change flow_json on a running workflow, the in-flight run sees the new definition on the next step. Version the flow yourself by naming it differently if you need immutability.

No SSE/WebSocket events. Add SsePush.broadcast("workflow_event", %{...}) in step_done and step_failed handlers when you want real-time UI updates. Two lines.

No per-step timeout. The watchdog catches steps stuck in running after a global threshold. Per-step timeouts can be added by scheduling an Alarm when a step starts and cancelling it when it finishes.

No parallel join tracking in DB. The join check (pending_parallel_steps) queries the steps table. If the executor crashes mid-fan-out, the check runs against current DB state on restart and correctly identifies which branches are still pending.

No rich condition language. Six condition patterns cover most cases. Add function heads to evaluate/2 for more.

Everything omitted is additive. The core is small on purpose. A working system is better than a complete specification.


The Full Picture

Five files. Two tables. One GenServer.

The rest of the system – Alarm, Pushover, WsHandler, NisTask, the MCP server – participates without modification. They send messages. The executor acts on them.

That is what minimal coupling looks like when you build it instead of describe it.