The IEx console is the fastest way to try, debug, and inspect workflows. You talk directly to WorkflowExecutor – no HTTP, no auth tokens, no MCP client.
Starting a Simple Workflow
A workflow needs a name, a flow map, and optionally an input map. The flow must have a "start" key pointing to the first step spec.
flow = %{
"start" => %{
"name" => "greet",
"tool" => "get_greeting",
"args" => %{"name" => "World"},
"next" => "count"
},
"count" => %{
"name" => "count",
"tool" => "get_counter",
"args" => %{},
"done" => true
}
}
{:ok, id} = WorkflowExecutor.start_workflow("demo", flow, %{}, "james")
# => {:ok, 1773850841512}
The workflow starts immediately. The executor picks up the first step within one second (the poll interval). Each step runs in its own Task process.
Checking Status
{:ok, wf} = WorkflowExecutor.get_workflow(id)
wf["status"]
# => "completed"
wf["steps"]
# => [
# %{"name" => "greet", "status" => "done", "result_json" => "\"Hello!\"", ...},
# %{"name" => "count", "status" => "done", "result_json" => "42", ...}
# ]
The get_workflow/1 call returns the workflow row with a "steps" list appended – all steps in order of creation.
Listing All Workflows
WorkflowExecutor.list_workflows()
# => [%{"id" => ..., "name" => "demo", "status" => "completed", ...}, ...]
WorkflowExecutor.list_workflows(limit: 5)
Using Input Variables
Step args can reference workflow input via {{input.key}} templates:
flow = %{
"start" => %{
"name" => "search",
"tool" => "knowledge_search",
"args" => %{"query" => "{{input.topic}}"},
"done" => true
}
}
{:ok, id} = WorkflowExecutor.start_workflow(
"knowledge-lookup",
flow,
%{"topic" => "access control"},
"james"
)
The executor interpolates {{input.topic}} to "access control" before calling the tool.
Branching Workflow
flow = %{
"start" => %{
"name" => "check",
"tool" => "is_admin",
"args" => %{},
"branch" => [
%{"if" => "result == true", "then" => "admin_path"},
%{"if" => "result == false", "then" => "user_path"}
]
},
"admin_path" => %{
"name" => "admin_path",
"tool" => "get_greeting",
"args" => %{},
"done" => true
},
"user_path" => %{
"name" => "user_path",
"tool" => "get_counter",
"args" => %{},
"done" => true
}
}
{:ok, id} = WorkflowExecutor.start_workflow("branch-demo", flow, %{}, "james")
The executor calls is_admin, evaluates the result, and creates only one of the two next steps.
Approval Gate
A step with no "tool" is an approval gate. The executor marks it running, runs nil (no-op), and the step stays there until you manually call step_ready/1.
flow = %{
"start" => %{
"name" => "await_approval",
"tool" => nil,
"args" => %{},
"next" => "do_write"
},
"do_write" => %{
"name" => "do_write",
"tool" => "knowledge_add",
"args" => %{
"title" => "{{input.title}}",
"content" => "{{input.content}}",
"scope" => "general"
},
"done" => true
}
}
{:ok, wf_id} = WorkflowExecutor.start_workflow(
"write-with-approval",
flow,
%{"title" => "My Doc", "content" => "Some content"},
"james"
)
# Workflow pauses at the approval gate. Find the step ID:
{:ok, wf} = WorkflowExecutor.get_workflow(wf_id)
approval_step = Enum.find(wf["steps"], &(&1["name"] == "await_approval"))
step_id = approval_step["id"]
# When you're ready to approve:
WorkflowExecutor.step_ready(step_id)
# Executor picks it up, marks it done, creates do_write, runs it.
Parallel Steps
flow = %{
"start" => %{
"name" => "fan_out",
"tool" => nil,
"args" => %{},
"parallel" => ["branch_a", "branch_b"]
},
"branch_a" => %{
"name" => "branch_a",
"tool" => "get_greeting",
"args" => %{},
"join" => "merge"
},
"branch_b" => %{
"name" => "branch_b",
"tool" => "get_counter",
"args" => %{},
"join" => "merge"
},
"merge" => %{
"name" => "merge",
"tool" => "get_greeting",
"args" => %{},
"done" => true
}
}
{:ok, id} = WorkflowExecutor.start_workflow("parallel-demo", flow, %{}, "james")
Both branch_a and branch_b run concurrently. merge is created only after both complete.
Cancelling a Workflow
{:ok, _} = WorkflowExecutor.cancel_workflow(id)
{:ok, wf} = WorkflowExecutor.get_workflow(id)
wf["status"]
# => "cancelled"
Inspecting the Database Directly
The executor owns :workflow_db. You can query it directly in IEx for debugging:
# All workflows
Sqler.sql(:workflow_db, "SELECT id, name, status FROM workflows ORDER BY id DESC LIMIT 10")
# Steps for a specific workflow
Sqler.sql(:workflow_db, "SELECT id, name, status, tool FROM workflow_steps WHERE workflow_id = ?", [id])
# Any stuck steps
Sqler.sql(:workflow_db, "SELECT id, name, started_at FROM workflow_steps WHERE status = 'running'")
Triggering a Retry Manually
If a step is stuck in failed and you want to requeue it without waiting for the automatic retry:
# Find the failed step
{:ok, wf} = WorkflowExecutor.get_workflow(id)
step = Enum.find(wf["steps"], &(&1["status"] == "failed"))
# Create a new ready step with the same spec and fire it
WorkflowExecutor.step_ready(step["id"])
# Note: only works if the step is in "ready" status -- for failed steps,
# create a fresh one via the executor's retry mechanism or restart the workflow.