# Workflow Engine Durable, database-driven workflow orchestrator for chaining MCP tool calls with branching, parallel execution, scheduling, retry logic, and reusable templates. --- ## Table of Contents 1. [Overview](#overview) 2. [Features](#features) 3. [Architecture](#architecture) 4. [Data Model](#data-model) 5. [Step Lifecycle](#step-lifecycle) 6. [Flow Definition](#flow-definition) 7. [Usage](#usage) 8. [API Reference -- Workflow](#api-reference----workflow) 9. [API Reference -- WorkflowExecutor](#api-reference----workflowexecutor) 10. [API Reference -- Permissions.Workflow](#api-reference----permissionsworkflow) 11. [API Reference -- Router.WorkflowApi](#api-reference----routerworkflowapi) 12. [API Reference -- WorkflowWeb](#api-reference----workflowweb) 13. [Templates](#templates) 14. [Scheduling](#scheduling) 15. [Retry Logic](#retry-logic) 16. [Input Interpolation](#input-interpolation) 17. [Troubleshooting](#troubleshooting) 18. [Related Documentation](#related-documentation) --- ## Overview The workflow engine automates multi-step MCP tool invocations. You define a flow as a JSON map of step specifications, provide optional input data, and the engine executes each step in sequence, branches based on results, fans out in parallel, waits for joins, and retries on failure. The engine follows the project's separation of concerns: - **Workflow** -- pure functional module with SQL queries and business logic - **WorkflowExecutor** -- GenServer that owns the database and drives execution - **Permissions.Workflow** -- access control wrappers (permission key `130_001`) - **Router.WorkflowApi** -- REST API endpoints - **WorkflowWeb** -- self-contained SPA web interface All database writes go through WorkflowExecutor (single-writer pattern), eliminating race conditions. ## Features | Feature | Description | |---------|-------------| | Sequential steps | Chain steps via `"next"` | | Branching | Conditional paths via `"branch"` with safe pattern-matched conditions | | Parallel execution | Fan-out via `"parallel"`, converge with `"join"` | | Approval gates | Steps with `"tool": null` pause until manually unblocked | | Input interpolation | `{{input.key}}` placeholders resolved in step args, including nested lists and maps | | Retry with backoff | 3 attempts with exponential backoff (5s, 30s, 60s) via Alarm | | Watchdog | Detects stuck steps (>120s) every 30 seconds | | Templates | Save and load reusable workflow definitions | | Scheduling | Start workflows at a future time via natural language or unix timestamp | | Permission gating | All operations require the workflow permission key | ## Architecture ``` Web UI (/workflow) ──┐ REST API (/api/workflow) ──┤──> Permissions.Workflow ──> WorkflowExecutor (GenServer) MCP Tools ──┘ │ ├── :workflow_db (Sqler/SQLite) ├── Alarm (retry scheduling) └── MyMCPServer.Manager (tool execution) ``` WorkflowExecutor is the sole owner of `:workflow_db`. It runs a 1-second poll loop for ready steps and a 30-second watchdog for stuck steps. Steps execute in spawned Tasks that cast results back to the GenServer. ## Data Model ### workflows | Column | Type | Description | |--------|------|-------------| | id | INTEGER PK | Millisecond timestamp (creation time) | | updated_at | INTEGER | Optimistic locking timestamp | | name | TEXT | Workflow label | | flow_json | TEXT | JSON map of step specifications | | input_json | TEXT | JSON map of input data for interpolation | | status | TEXT | `running`, `completed`, `failed`, `cancelled`, `scheduled` | | created_by | TEXT | Username of the creator | | completed_at | INTEGER | Completion timestamp | | cancelled_at | INTEGER | Cancellation timestamp | ### workflow_steps | Column | Type | Description | |--------|------|-------------| | id | INTEGER PK | Millisecond timestamp | | updated_at | INTEGER | Optimistic locking timestamp | | workflow_id | INTEGER | Foreign key to workflows | | name | TEXT | Step identifier | | tool | TEXT | MCP tool name (null for approval gates) | | args_json | TEXT | JSON map of tool arguments | | result_json | TEXT | JSON output on completion | | status | TEXT | `pending`, `ready`, `running`, `done`, `failed` | | attempt | INTEGER | Retry counter (1, 2, 3) | | ready_at | INTEGER | When marked ready (orders poll) | | started_at | INTEGER | When executor picked it up | | completed_at | INTEGER | Completion timestamp | ### workflow_templates | Column | Type | Description | |--------|------|-------------| | id | INTEGER PK | Millisecond timestamp | | updated_at | INTEGER | Optimistic locking timestamp | | name | TEXT | Template label | | flow_json | TEXT | JSON flow definition | | input_json | TEXT | Default input values | | description | TEXT | What this template does | | created_by | TEXT | Username | | cancelled_at | INTEGER | Soft-delete timestamp | ## Step Lifecycle ``` pending ──> ready ──> running ──> done │ └──> failed ──> retry (new step, pending) └──> failed (max attempts) ──> workflow failed ``` | Transition | Trigger | |-----------|---------| | pending -> ready | `step_ready` cast (from Alarm, UI, or external event) | | ready -> running | Poll loop picks up step, marks running, spawns Task | | running -> done | Task completes, casts `step_done` | | running -> failed | Task fails or watchdog timeout (>120s) | | failed -> retry | Attempt < 3: new step created as pending, Alarm schedules `step_ready` | ## Flow Definition A flow is a JSON map where keys are step names. The `"start"` key defines the first step. ### Step Spec Fields | Field | Type | Description | |-------|------|-------------| | `name` | string | Step identifier (required) | | `tool` | string/null | MCP tool to call. Null for approval gates | | `args` | object | Arguments passed to the tool | | `next` | string | Name of the next step (sequential) | | `done` | boolean | `true` marks workflow as complete | | `branch` | array | Conditional paths (see below) | | `parallel` | array | Step names to run concurrently | | `join` | string | Step that waits for all parallel siblings | | `attempt` | integer | Current retry attempt (set internally) | ### Branch Conditions ```json "branch": [ {"if": "result == true", "then": "approve"}, {"if": "result == false", "then": "reject"} ] ``` Supported conditions (safe pattern matching, no code eval): | Condition | Matches when | |-----------|-------------| | `"result == true"` | Result is boolean `true` | | `"result == false"` | Result is boolean `false` | | `"result == nil"` | Result is `nil` | | `"result != nil"` | Result is not `nil` | | `"result == \"string\""` | Result equals the quoted string | First matching branch wins. No match returns empty (workflow stays running). ## Usage ### IEx -- Start a Workflow ```elixir WorkflowExecutor.start_workflow( "notify", %{ "start" => %{ "name" => "send", "tool" => "pushover_send", "args" => %{"message" => "Hello from workflow"}, "done" => true } }, nil, "james" ) # => {:ok, 1774093092892} ``` ### IEx -- Two-Step Sequential ```elixir WorkflowExecutor.start_workflow( "greet-then-count", %{ "start" => %{"name" => "greet", "tool" => "get_greeting", "args" => %{"name" => "World"}, "next" => "count"}, "count" => %{"name" => "count", "tool" => "get_counter", "args" => %{}, "done" => true} }, nil, "james" ) ``` ### IEx -- Scheduled Workflow ```elixir WorkflowExecutor.start_workflow( "morning-notify", %{"start" => %{"name" => "send", "tool" => "pushover_send", "args" => %{"message" => "Good morning"}, "done" => true}}, nil, "james", schedule: "tomorrow at 9am" ) ``` ### IEx -- Templates ```elixir # Save WorkflowExecutor.create_template("pushover", flow_json, nil, "Send a push notification", "james") # List WorkflowExecutor.list_templates() # Load and run {:ok, tmpl} = WorkflowExecutor.get_template(id) flow = Jason.decode!(tmpl["flow_json"]) WorkflowExecutor.start_workflow("from-template", flow, %{"msg" => "Hello"}, "james") ``` --- ## API Reference -- Workflow Pure functional module. All functions take a `db` (Sqler process) as the first argument. ### `setup_database/1` Creates the `workflows`, `workflow_steps`, and `workflow_templates` tables with indexes. Idempotent. ```elixir Workflow.setup_database(:workflow_db) # => :ok ``` ### `create/5` Creates a workflow and its first step atomically in a transaction. The first step is created as `"ready"`. **Parameters:** - `db` -- Sqler process - `name` (string) -- workflow label - `flow` (map) -- flow definition with `"start"` key - `input` (map/nil) -- input data for interpolation - `user` (string) -- creator username **Returns:** `{:ok, workflow_id}` or `{:error, reason}` ```elixir {:ok, id} = Workflow.create(db, "my-workflow", flow, %{"key" => "value"}, "alice") ``` ### `create_scheduled/5` Like `create/5` but sets status to `"scheduled"` and first step to `"pending"`. Returns the step ID for alarm binding. **Returns:** `{:ok, workflow_id, step_id}` or `{:error, reason}` ```elixir {:ok, wf_id, step_id} = Workflow.create_scheduled(db, "deferred", flow, nil, "alice") ``` ### `get/2` Gets a workflow by ID. **Returns:** `{:ok, map}` or `{:error, :not_found}` ```elixir {:ok, wf} = Workflow.get(db, workflow_id) ``` ### `list/2` Lists non-cancelled workflows, newest first. **Options:** - `:limit` -- max results (default 50) - `:status` -- filter by status. `nil` excludes cancelled, `"all"` includes everything ```elixir workflows = Workflow.list(db, limit: 10, status: "running") ``` ### `cancel/3`, `complete/3`, `fail/4` Transition workflow status. All use optimistic locking via `updated_at`. ```elixir {:ok, wf} = Workflow.get(db, id) Workflow.cancel(db, id, wf["updated_at"]) Workflow.complete(db, id, wf["updated_at"]) Workflow.fail(db, id, wf["updated_at"], "reason") ``` ### `next_steps/4` Determines what to create after a step completes. The core orchestration logic. **Returns:** - `:done` -- workflow is complete - `[]` -- no new steps (parallel siblings still running) - `[spec, ...]` -- list of step specs to create ```elixir case Workflow.next_steps(db, wf_id, "step_a", result) do :done -> # complete the workflow [] -> # nothing to do, parallel branch waiting specs -> # create these steps as ready end ``` ### `evaluate_condition/2` Evaluates a branch condition string against a result value. No code eval. ```elixir Workflow.evaluate_condition("result == true", true) # => true Workflow.evaluate_condition("result == nil", nil) # => true Workflow.evaluate_condition(~s(result == "yes"), "yes") # => true ``` ### `step_create/4`, `step_get/2`, `step_mark_running/3`, `step_mark_done/4`, `step_mark_failed/4` Step CRUD and state transitions. All use optimistic locking. ```elixir {:ok, step_id} = Workflow.step_create(db, wf_id, spec, "ready") {:ok, step} = Workflow.step_get(db, step_id) {:ok, _} = Workflow.step_mark_running(db, step_id, step["updated_at"]) {:ok, _} = Workflow.step_mark_done(db, step_id, updated_at, %{"result" => "ok"}) {:ok, _} = Workflow.step_mark_failed(db, step_id, updated_at, "timeout") ``` ### `template_create/6`, `template_get/2`, `template_list/1`, `template_delete/3` Template CRUD. Soft-delete via `cancelled_at`. ```elixir {:ok, id} = Workflow.template_create(db, "name", flow_json, input_json, "description", "alice") {:ok, tmpl} = Workflow.template_get(db, id) templates = Workflow.template_list(db) # sorted by name, excludes deleted Workflow.template_delete(db, id, tmpl["updated_at"]) ``` ### `row_to_map/2` Zips field name list with a row value list into a string-keyed map. ```elixir Workflow.row_to_map(["id", "name"], [42, "test"]) # => %{"id" => 42, "name" => "test"} ``` --- ## API Reference -- WorkflowExecutor GenServer that owns `:workflow_db`. All writes go through this process. ### `start_workflow/5` Starts a new workflow. Optional `schedule` in opts. ```elixir {:ok, id} = WorkflowExecutor.start_workflow("name", flow, input, "user") {:ok, id} = WorkflowExecutor.start_workflow("name", flow, input, "user", schedule: "tomorrow at 9am") ``` ### `get_workflow/1` Returns workflow with all steps attached. ```elixir {:ok, wf} = WorkflowExecutor.get_workflow(id) wf["steps"] # => [%{"name" => "step_a", "status" => "done", ...}, ...] ``` ### `list_workflows/1` Lists workflows. Accepts same options as `Workflow.list/2`. ```elixir workflows = WorkflowExecutor.list_workflows(limit: 10) ``` ### `cancel_workflow/1` Cancels a running workflow. ```elixir {:ok, _} = WorkflowExecutor.cancel_workflow(id) ``` ### `step_ready/1`, `step_done/2`, `step_failed/2` Async casts for step state transitions. ```elixir WorkflowExecutor.step_ready(step_id) # unblock approval gate WorkflowExecutor.step_done(step_id, result) # report completion WorkflowExecutor.step_failed(step_id, reason) # report failure ``` ### `create_template/5`, `list_templates/0`, `get_template/1`, `delete_template/1` Template operations via GenServer. ```elixir {:ok, id} = WorkflowExecutor.create_template("name", flow_json, input_json, "desc", "user") templates = WorkflowExecutor.list_templates() {:ok, tmpl} = WorkflowExecutor.get_template(id) {:ok, _} = WorkflowExecutor.delete_template(id) ``` --- ## API Reference -- Permissions.Workflow Permission key: `130_001` (from `Permissions.Keys.workflow()`). All functions take a permissions map as the first argument. Returns `{:not_allowed, "Workflow"}` without the key. | Function | Delegates to | |----------|-------------| | `create/5`, `create/6` | `WorkflowExecutor.start_workflow` | | `list/2` | `WorkflowExecutor.list_workflows` | | `get/2` | `WorkflowExecutor.get_workflow` | | `cancel/2` | `WorkflowExecutor.cancel_workflow` | | `step_ready/2` | `WorkflowExecutor.step_ready` | | `create_template/6` | `WorkflowExecutor.create_template` | | `list_templates/1` | `WorkflowExecutor.list_templates` | | `get_template/2` | `WorkflowExecutor.get_template` | | `delete_template/2` | `WorkflowExecutor.delete_template` | ### MCP Tools (8 total) | Tool | Description | |------|-------------| | `workflow_create` | Create and start a workflow (optional `schedule` param) | | `workflow_list` | List workflows | | `workflow_get` | Get workflow with steps | | `workflow_cancel` | Cancel a running workflow | | `workflow_step_ready` | Unblock an approval gate | | `workflow_template_list` | List saved templates | | `workflow_template_save` | Save a template | | `workflow_template_delete` | Delete a template | --- ## API Reference -- Router.WorkflowApi Base path: `/api/workflow` | Method | Path | Description | |--------|------|-------------| | GET | `/` | List workflows (`?limit=50`) | | GET | `/:id` | Get workflow with steps | | POST | `/` | Create workflow (`name`, `flow`, `input`, optional `schedule`) | | DELETE | `/:id` | Cancel workflow | | POST | `/:step_id/ready` | Unblock approval gate | | GET | `/templates` | List templates | | GET | `/templates/:id` | Get template | | POST | `/templates` | Save template (`name`, `flow_json`, `input_json`, `description`) | | DELETE | `/templates/:id` | Soft-delete template | All routes require the workflow permission key. Returns 403 JSON without it. --- ## API Reference -- WorkflowWeb Renders two SPA pages: | Function | Route | Description | |----------|-------|-------------| | `render/0` | `/workflow` | Creation form + last workflow (auto-expanded). Template dropdown, schedule field. | | `render_list/0` | `/workflows` | Full list of all workflows with expand/cancel/approve controls. | Both pages auto-refresh every 5 seconds via `setInterval` fetching from the REST API. --- ## Templates Templates store reusable flow + input definitions. The web UI provides: - **Dropdown** on the creation form -- selecting a template auto-loads name, flow, and input into the form fields - **Save as Template** button -- saves the current form as a new template with an optional description - **Delete** button -- soft-deletes the selected template Templates are also available via MCP tools and REST API for programmatic use. ## Scheduling Workflows can be scheduled for future execution: ```elixir WorkflowExecutor.start_workflow("name", flow, input, "user", schedule: "tomorrow at 9am") ``` The `schedule` parameter accepts: - Natural language strings parsed by Chronic (e.g., "in 30 minutes", "tomorrow at 2pm", "next monday 9am") - Unix timestamps as integers Under the hood: workflow is created with status `"scheduled"`, first step as `"pending"`. An Alarm fires at the scheduled time, transitions status to `"running"`, and casts `step_ready`. ## Retry Logic When a step fails: | Attempt | Backoff | Action | |---------|---------|--------| | 1 | 5 seconds | New step created as pending, Alarm schedules `step_ready` | | 2 | 30 seconds | Same | | 3 | -- | Workflow marked as failed | Retries use the Alarm system for durability -- they survive process restarts. ## Input Interpolation Step args support `{{input.key}}` placeholders resolved from the workflow's input JSON: ```json { "args": { "message": "Hello {{input.name}}", "recipients": ["{{input.to}}"], "config": {"region": "{{input.region}}"} } } ``` Interpolation recurses into lists and maps. Missing keys resolve to empty string. Non-string values pass through unchanged. ## Troubleshooting | Problem | Cause | Fix | |---------|-------|-----| | "Unknown tool: X" | Tool name doesn't match any handler | Check exact tool name (case-sensitive, camelCase for some params) | | Tool arg mismatch | Wrong parameter names (e.g., `inbox_id` vs `inboxId`) | Error message now shows expected vs provided params | | Step stuck in running | Tool call taking >120s | Watchdog will fail it; increase timeout or break into smaller steps | | Scheduled workflow not starting | Alarm delivered while executor was restarting | Check Alarm history; re-schedule if needed | | Template not loading | Stale page | Hard refresh (Cmd+Shift+R) | | `{{input.key}}` appearing literally | Input not provided or key missing | Check input JSON has the key; missing keys resolve to "" | ## Related Documentation - [Alarm](alarm.md) -- scheduling and retry timer system - [AccessControl](access_control.md) -- permission management - [Sqler](sqler.md) -- SQLite database wrapper --- *Source: `lib/workflow.ex`, `lib/workflow_executor.ex`, `lib/workflow_web.ex`, `lib/permissions/workflow.ex`, `lib/router/workflow_api.ex` -- Last updated: 2026-03-21*