Using Workflows from IEx

The IEx console is the fastest way to try, debug, and inspect workflows. You talk directly to WorkflowExecutor – no HTTP, no auth tokens, no MCP client.

Starting a Simple Workflow

A workflow needs a name, a flow map, and optionally an input map. The flow must have a "start" key pointing to the first step spec.

flow = %{
  "start" => %{
    "name" => "greet",
    "tool" => "get_greeting",
    "args" => %{"name" => "World"},
    "next" => "count"
  },
  "count" => %{
    "name" => "count",
    "tool" => "get_counter",
    "args" => %{},
    "done" => true
  }
}

{:ok, id} = WorkflowExecutor.start_workflow("demo", flow, %{}, "james")
# => {:ok, 1773850841512}

The workflow starts immediately. The executor picks up the first step within one second (the poll interval). Each step runs in its own Task process.

Checking Status

{:ok, wf} = WorkflowExecutor.get_workflow(id)

wf["status"]
# => "completed"

wf["steps"]
# => [
#   %{"name" => "greet", "status" => "done", "result_json" => "\"Hello!\"", ...},
#   %{"name" => "count", "status" => "done", "result_json" => "42", ...}
# ]

The get_workflow/1 call returns the workflow row with a "steps" list appended – all steps in order of creation.

Listing All Workflows

WorkflowExecutor.list_workflows()
# => [%{"id" => ..., "name" => "demo", "status" => "completed", ...}, ...]

WorkflowExecutor.list_workflows(limit: 5)

Using Input Variables

Step args can reference workflow input via {{input.key}} templates:

flow = %{
  "start" => %{
    "name" => "search",
    "tool" => "knowledge_search",
    "args" => %{"query" => "{{input.topic}}"},
    "done" => true
  }
}

{:ok, id} = WorkflowExecutor.start_workflow(
  "knowledge-lookup",
  flow,
  %{"topic" => "access control"},
  "james"
)

The executor interpolates {{input.topic}} to "access control" before calling the tool.

Branching Workflow

flow = %{
  "start" => %{
    "name" => "check",
    "tool" => "is_admin",
    "args" => %{},
    "branch" => [
      %{"if" => "result == true",  "then" => "admin_path"},
      %{"if" => "result == false", "then" => "user_path"}
    ]
  },
  "admin_path" => %{
    "name" => "admin_path",
    "tool" => "get_greeting",
    "args" => %{},
    "done" => true
  },
  "user_path" => %{
    "name" => "user_path",
    "tool" => "get_counter",
    "args" => %{},
    "done" => true
  }
}

{:ok, id} = WorkflowExecutor.start_workflow("branch-demo", flow, %{}, "james")

The executor calls is_admin, evaluates the result, and creates only one of the two next steps.

Approval Gate

A step with no "tool" is an approval gate. The executor marks it running, runs nil (no-op), and the step stays there until you manually call step_ready/1.

flow = %{
  "start" => %{
    "name" => "await_approval",
    "tool" => nil,
    "args" => %{},
    "next" => "do_write"
  },
  "do_write" => %{
    "name" => "do_write",
    "tool" => "knowledge_add",
    "args" => %{
      "title" => "{{input.title}}",
      "content" => "{{input.content}}",
      "scope" => "general"
    },
    "done" => true
  }
}

{:ok, wf_id} = WorkflowExecutor.start_workflow(
  "write-with-approval",
  flow,
  %{"title" => "My Doc", "content" => "Some content"},
  "james"
)

# Workflow pauses at the approval gate. Find the step ID:
{:ok, wf} = WorkflowExecutor.get_workflow(wf_id)
approval_step = Enum.find(wf["steps"], &(&1["name"] == "await_approval"))
step_id = approval_step["id"]

# When you're ready to approve:
WorkflowExecutor.step_ready(step_id)

# Executor picks it up, marks it done, creates do_write, runs it.

Parallel Steps

flow = %{
  "start" => %{
    "name" => "fan_out",
    "tool" => nil,
    "args" => %{},
    "parallel" => ["branch_a", "branch_b"]
  },
  "branch_a" => %{
    "name" => "branch_a",
    "tool" => "get_greeting",
    "args" => %{},
    "join" => "merge"
  },
  "branch_b" => %{
    "name" => "branch_b",
    "tool" => "get_counter",
    "args" => %{},
    "join" => "merge"
  },
  "merge" => %{
    "name" => "merge",
    "tool" => "get_greeting",
    "args" => %{},
    "done" => true
  }
}

{:ok, id} = WorkflowExecutor.start_workflow("parallel-demo", flow, %{}, "james")

Both branch_a and branch_b run concurrently. merge is created only after both complete.

Cancelling a Workflow

{:ok, _} = WorkflowExecutor.cancel_workflow(id)

{:ok, wf} = WorkflowExecutor.get_workflow(id)
wf["status"]
# => "cancelled"

Inspecting the Database Directly

The executor owns :workflow_db. You can query it directly in IEx for debugging:

# All workflows
Sqler.sql(:workflow_db, "SELECT id, name, status FROM workflows ORDER BY id DESC LIMIT 10")

# Steps for a specific workflow
Sqler.sql(:workflow_db, "SELECT id, name, status, tool FROM workflow_steps WHERE workflow_id = ?", [id])

# Any stuck steps
Sqler.sql(:workflow_db, "SELECT id, name, started_at FROM workflow_steps WHERE status = 'running'")

Triggering a Retry Manually

If a step is stuck in failed and you want to requeue it without waiting for the automatic retry:

# Find the failed step
{:ok, wf} = WorkflowExecutor.get_workflow(id)
step = Enum.find(wf["steps"], &(&1["status"] == "failed"))

# Create a new ready step with the same spec and fire it
WorkflowExecutor.step_ready(step["id"])
# Note: only works if the step is in "ready" status -- for failed steps,
# create a fresh one via the executor's retry mechanism or restart the workflow.