The design is simple: two tables, a ready flag, an executor that owns the table, and events that speak two messages. This article walks through the actual implementation – the modules, the schemas, the code that makes it run.
Everything here follows the conventions of the host application: GenServers registered by module name, SQLite via a private Sqler instance, millisecond timestamp IDs, optimistic locking with updated_at, and clean separation between the server process and the functional module.
The Tables
Two tables. The executor creates both on startup.
CREATE TABLE IF NOT EXISTS workflows (
id INTEGER PRIMARY KEY,
updated_at INTEGER NOT NULL,
name TEXT NOT NULL,
flow_json TEXT NOT NULL,
input_json TEXT,
status TEXT NOT NULL DEFAULT 'running',
created_by TEXT,
completed_at INTEGER,
cancelled_at INTEGER
);
CREATE TABLE IF NOT EXISTS workflow_steps (
id INTEGER PRIMARY KEY,
updated_at INTEGER NOT NULL,
workflow_id INTEGER NOT NULL,
name TEXT NOT NULL,
tool TEXT,
args_json TEXT,
result_json TEXT,
status TEXT NOT NULL DEFAULT 'pending',
attempt INTEGER NOT NULL DEFAULT 1,
ready_at INTEGER,
started_at INTEGER,
completed_at INTEGER
);
CREATE INDEX IF NOT EXISTS idx_steps_ready
ON workflow_steps (status, ready_at)
WHERE status = 'ready';
The flow_json column in workflows stores the entire step graph – a JSON map of step name to transition spec. This is the only place workflow logic lives. Steps are dumb rows; the workflow definition answers “what comes next?”
The index on status = 'ready' and ready_at is the only index that matters for the executor’s hot path. It makes the poll query fast regardless of how many total steps exist.
The Workflow Module
Workflow is a pure functional module. No process, no state. It owns the schema and all SQL operations. The executor calls it.
defmodule Workflow do
@fields ~w(id updated_at name flow_json input_json status
created_by completed_at cancelled_at)a
def setup_database(db) do
Sqler.sql(db, """
CREATE TABLE IF NOT EXISTS workflows ( ... )
""")
Sqler.sql(db, """
CREATE TABLE IF NOT EXISTS workflow_steps ( ... )
""")
Sqler.sql(db, """
CREATE INDEX IF NOT EXISTS idx_steps_ready
ON workflow_steps (status, ready_at) WHERE status = 'ready'
""")
end
def create(db, name, flow, input, user) do
id = System.system_time(:millisecond)
now = id
Sqler.insert(db, "workflows", %{
id: id, updated_at: now,
name: name,
flow_json: Jason.encode!(flow),
input_json: Jason.encode!(input),
status: "running",
created_by: user
})
{:ok, id}
end
def next_steps(db, workflow_id, completed_step_name, result) do
{:ok, row} = get(db, workflow_id)
flow = Jason.decode!(row.flow_json)
step_spec = Map.get(flow, completed_step_name)
resolve_transitions(flow, step_spec, result, db, workflow_id)
end
defp resolve_transitions(_flow, nil, _result, _db, _wf_id), do: []
defp resolve_transitions(_flow, %{"next" => name} = spec, _result, _db, _wf_id) do
[Map.put(spec, "name", name)]
end
defp resolve_transitions(_flow, %{"branch" => branches}, result, _db, _wf_id) do
match = Enum.find(branches, fn %{"if" => condition} ->
evaluate(condition, result)
end)
case match do
%{"then" => name} = spec -> [Map.put(spec, "name", name)]
nil -> []
end
end
defp resolve_transitions(_flow, %{"parallel" => names} = spec, _result, _db, _wf_id) do
Enum.map(names, fn name -> Map.merge(spec, %{"name" => name}) end)
end
defp resolve_transitions(_flow, %{"join" => join_step} = spec, _result, db, wf_id) do
siblings = pending_parallel_steps(db, wf_id, join_step)
if siblings == [] do
[Map.put(spec, "name", join_step)]
else
[]
end
end
defp resolve_transitions(_flow, %{"done" => true}, _result, _db, _wf_id), do: :done
defp evaluate("result == true", result), do: result == "true" or result == true
defp evaluate("result == false", result), do: result == "false" or result == false
defp evaluate("result != nil", result), do: result not in [nil, "", "nil"]
defp evaluate("result == nil", result), do: result in [nil, "", "nil"]
defp evaluate(<<"result == ", v::binary>>, result), do: result == v
defp evaluate(<<"result != ", v::binary>>, result), do: result != v
defp evaluate(_condition, _result), do: false
end
next_steps/4 is the single function the executor calls after a step completes. It decodes the flow, finds the completed step’s transition spec, and returns a list of step specs to create – zero (done or waiting on siblings), one (sequential or branch), or many (parallel). The :done atom signals the workflow is finished.
Conditions are evaluated by explicit pattern matching. No Code.eval_string. The condition language is small and safe. If a new condition type is needed, a new function head is added.
The Step Module
Workflow.Step handles the actual execution of a single step. It runs in a Task process spawned by the executor. It knows nothing about what comes before or after it.
defmodule Workflow.Step do
def run(step, context) do
tool = step.tool
args = resolve_args(step.args_json, context)
MyMCPServer.Manager.call_tool_as(tool, args, context.user)
end
defp resolve_args(nil, _context), do: %{}
defp resolve_args(args_json, context) do
args = Jason.decode!(args_json)
input = context.input || %{}
Enum.into(args, %{}, fn {k, v} ->
{k, resolve_value(v, input)}
end)
end
defp resolve_value("{{input." <> rest, input) do
key = String.trim_trailing(rest, "}}")
Map.get(input, key, "")
end
defp resolve_value(v, _input), do: v
end
{{input.key}} templates are resolved against the workflow’s original input map. The step calls MyMCPServer.Manager.call_tool_as/3 – the same permission-gated path as any direct MCP tool call. The step runs as the user who created the workflow. No privilege escalation.
The Executor
WorkflowExecutor is a GenServer registered by module name. It starts its own Sqler instance. It is the only process that touches the database. Everything goes through it.
defmodule WorkflowExecutor do
use GenServer
@poll_interval 1_000 # ms
@stuck_timeout 120 # seconds -- steps stuck in 'running' longer than this
def start_link(_), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
def init(_) do
{:ok, db} = Sqler.start_link("workflow")
Workflow.setup_database(db)
schedule_poll()
schedule_watchdog()
{:ok, %{db: db}}
end
# --- Public API (all go through the GenServer) ---
def start_workflow(name, flow, input, user) do
GenServer.call(__MODULE__, {:start_workflow, name, flow, input, user})
end
def step_ready(step_id) do
GenServer.cast(__MODULE__, {:step_ready, step_id})
end
def step_done(step_id, result) do
GenServer.cast(__MODULE__, {:step_done, step_id, result})
end
def step_failed(step_id, reason) do
GenServer.cast(__MODULE__, {:step_failed, step_id, reason})
end
# --- Handlers ---
def handle_call({:start_workflow, name, flow, input, user}, _from, %{db: db} = state) do
{:ok, workflow_id} = Workflow.create(db, name, flow, input, user)
first_step = first_step_spec(flow)
create_and_ready_step(db, workflow_id, first_step)
{:reply, {:ok, workflow_id}, state}
end
def handle_cast({:step_ready, step_id}, %{db: db} = state) do
now = System.system_time(:millisecond)
Sqler.update(db, "workflow_steps", step_id, %{status: "running", started_at: now})
step = Sqler.get(db, "workflow_steps", step_id)
workflow = Sqler.get(db, "workflows", step.workflow_id)
context = %{user: workflow.created_by, input: Jason.decode!(workflow.input_json || "{}")}
Task.start(fn ->
result = Workflow.Step.run(step, context)
case result do
{:ok, value} -> WorkflowExecutor.step_done(step_id, value)
{:error, reason} -> WorkflowExecutor.step_failed(step_id, to_string(reason))
end
end)
{:noreply, state}
end
def handle_cast({:step_done, step_id, result}, %{db: db} = state) do
now = System.system_time(:millisecond)
Sqler.update(db, "workflow_steps", step_id, %{
status: "done",
result_json: Jason.encode!(result),
completed_at: now
})
step = Sqler.get(db, "workflow_steps", step_id)
next = Workflow.next_steps(db, step.workflow_id, step.name, result)
case next do
:done ->
Sqler.update(db, "workflows", step.workflow_id, %{
status: "completed",
completed_at: now
})
[] ->
:ok # parallel branch -- siblings still running
specs when is_list(specs) ->
Enum.each(specs, fn spec ->
create_and_ready_step(db, step.workflow_id, spec)
end)
end
{:noreply, state}
end
def handle_cast({:step_failed, step_id, reason}, %{db: db} = state) do
now = System.system_time(:millisecond)
step = Sqler.get(db, "workflow_steps", step_id)
max_attempts = 3
if step.attempt < max_attempts do
# schedule retry via Alarm
retry_at = div(now, 1000) + backoff_seconds(step.attempt)
new_step_id = System.system_time(:millisecond)
Sqler.insert(db, "workflow_steps", %{
id: new_step_id, updated_at: now,
workflow_id: step.workflow_id,
name: step.name,
tool: step.tool,
args_json: step.args_json,
status: "pending",
attempt: step.attempt + 1
})
Alarm.set_timer(retry_at, WorkflowExecutor, {:step_ready, new_step_id})
Sqler.update(db, "workflow_steps", step_id, %{status: "failed", result_json: reason})
else
Sqler.update(db, "workflow_steps", step_id, %{status: "failed", result_json: reason})
Sqler.update(db, "workflows", step.workflow_id, %{status: "failed"})
end
{:noreply, state}
end
# Alarm fires this
def handle_cast({:step_ready, step_id}, state) do
handle_cast({:step_ready, step_id}, state)
end
# --- Poll loop ---
def handle_info(:poll, %{db: db} = state) do
ready_steps = Sqler.query(db, """
SELECT id FROM workflow_steps
WHERE status = 'ready'
ORDER BY ready_at ASC
LIMIT 50
""")
Enum.each(ready_steps, fn %{id: id} ->
GenServer.cast(__MODULE__, {:step_ready, id})
end)
schedule_poll()
{:noreply, state}
end
def handle_info(:watchdog, %{db: db} = state) do
cutoff = div(System.system_time(:millisecond), 1000) - @stuck_timeout
stuck = Sqler.query(db, """
SELECT id FROM workflow_steps
WHERE status = 'running' AND started_at < ?
""", [cutoff * 1000])
Enum.each(stuck, fn %{id: id} ->
GenServer.cast(__MODULE__, {:step_failed, id, "timeout"})
end)
schedule_watchdog()
{:noreply, state}
end
# --- Helpers ---
defp create_and_ready_step(db, workflow_id, spec) do
now = System.system_time(:millisecond)
id = now
Sqler.insert(db, "workflow_steps", %{
id: id, updated_at: now,
workflow_id: workflow_id,
name: spec["name"],
tool: spec["tool"],
args_json: Jason.encode!(spec["args"] || %{}),
status: "ready",
ready_at: now,
attempt: 1
})
end
defp first_step_spec(flow) do
flow["start"] || flow |> Map.values() |> List.first()
end
defp backoff_seconds(1), do: 30
defp backoff_seconds(2), do: 120
defp backoff_seconds(_), do: 300
defp schedule_poll, do: Process.send_after(self(), :poll, @poll_interval)
defp schedule_watchdog, do: Process.send_after(self(), :watchdog, 30_000)
end
The executor’s handle_cast for :step_ready immediately marks the step running before spawning the Task. This prevents the poll loop from picking it up again. The step runs in its own process. When it finishes, the Task calls WorkflowExecutor.step_done or WorkflowExecutor.step_failed – two messages back to the executor, and the executor handles them serialized. No races.
The poll loop is a fallback. In normal operation, step completions chain directly via create_and_ready_step. The poll is there to pick up steps that were set ready by external events (alarms, UI) while no other step was running.
How Events Plug In
Every external event source sends one message to the executor. Nothing else.
Alarm:
# Alarm fires GenServer.cast(WorkflowExecutor, {:step_ready, step_id})
# The executor registered itself -- Alarm just needs the module name and message
Alarm.set_timer(unix_time, WorkflowExecutor, {:step_ready, step_id})
UI button / MCP tool:
def workflow_approve(step_id, _user) do
WorkflowExecutor.step_ready(step_id)
{:ok, "approved"}
end
Task completion:
# In NisTask, after marking a task completed:
if linked_step_id = task.workflow_step_id do
WorkflowExecutor.step_ready(linked_step_id)
end
The alarm doesn’t import WorkflowExecutor. It takes a module name and a message and fires a cast at the scheduled time. The UI handler doesn’t know what the step does. The task completion handler doesn’t know it’s participating in a workflow. They know a step ID. That’s all.
Defining a Workflow
A workflow definition is a map. The flow_json column stores it. The start key names the first step.
flow = %{
"start" => %{
"name" => "search",
"tool" => "knowledge_search",
"args" => %{"query" => "{{input.topic}}"},
"next" => "summarize"
},
"summarize" => %{
"name" => "summarize",
"tool" => "knowledge_get",
"args" => %{"id" => "{{input.doc_id}}"},
"next" => "notify"
},
"notify" => %{
"name" => "notify",
"tool" => "pushover_send",
"args" => %{"message" => "Research complete"},
"done" => true
}
}
WorkflowExecutor.start_workflow("research", flow, %{"topic" => "AI safety"}, "james")
A branching workflow:
flow = %{
"start" => %{
"name" => "check_admin",
"tool" => "is_admin",
"args" => %{},
"branch" => [
%{"if" => "result == true", "then" => "admin_action"},
%{"if" => "result == false", "then" => "user_action"}
]
},
"admin_action" => %{"tool" => "knowledge_add", ..., "done" => true},
"user_action" => %{"tool" => "get_greeting", ..., "done" => true}
}
An approval gate – a step with no tool that waits in pending until a human acts:
flow = %{
"start" => %{
"name" => "request_approval",
"tool" => nil,
"args" => %{},
"next" => "execute_write"
# executor creates this step as 'pending', not 'ready'
# notification is sent; human calls workflow_approve(step_id)
# executor receives step_ready, marks running, tool is nil so result is 'approved'
# next step created and readied
},
"execute_write" => %{
"name" => "execute_write",
"tool" => "knowledge_add",
"args" => %{"title" => "{{input.title}}", "body" => "{{input.body}}"},
"done" => true
}
}
Supervision Tree
One line in application.ex, after {Alarm, []}:
{WorkflowExecutor, []}
WorkflowExecutor.init/1 starts its Sqler, creates its tables, schedules the first poll and the first watchdog tick. The entire workflow engine is online. No external services, no additional configuration, no migration scripts. The tables are created idempotently on every startup via CREATE TABLE IF NOT EXISTS.
What the Implementation Does Not Include
This is a deliberate list.
No workflow versioning. If you change flow_json on a running workflow, the in-flight run sees the new definition on the next step. Version the flow yourself by naming it differently if you need immutability.
No SSE/WebSocket events. Add SsePush.broadcast("workflow_event", %{...}) in step_done and step_failed handlers when you want real-time UI updates. Two lines.
No per-step timeout. The watchdog catches steps stuck in running after a global threshold. Per-step timeouts can be added by scheduling an Alarm when a step starts and cancelling it when it finishes.
No parallel join tracking in DB. The join check (pending_parallel_steps) queries the steps table. If the executor crashes mid-fan-out, the check runs against current DB state on restart and correctly identifies which branches are still pending.
No rich condition language. Six condition patterns cover most cases. Add function heads to evaluate/2 for more.
Everything omitted is additive. The core is small on purpose. A working system is better than a complete specification.
The Full Picture
Five files. Two tables. One GenServer.
-
lib/workflow.ex– schema, CRUD,next_steps/4, condition evaluation -
lib/workflow/step.ex– single step execution, template resolution, tool call -
lib/workflow_executor.ex– the GenServer: owns the DB, handles two messages, runs the poll loop -
lib/permissions/workflow.ex– MCP tool definitions and permission wrappers -
One entry in
application.ex
The rest of the system – Alarm, Pushover, WsHandler, NisTask, the MCP server – participates without modification. They send messages. The executor acts on them.
That is what minimal coupling looks like when you build it instead of describe it.