Dark Mode

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

wavezync/durable

Repository files navigation

Durable

A durable, resumable workflow engine for Elixir. Similar to Temporal/Inngest.

Features

  • Pipeline Model - Context flows from step to step, simple and explicit
  • Resumability - Sleep, wait for events, wait for human input
  • Branching - Pattern-matched conditional flow control
  • Parallel - Run steps concurrently with result collection
  • Compensations - Saga pattern with automatic rollback
  • Cron Scheduling - Recurring workflows with cron expressions
  • Reliability - Automatic retries with exponential/linear/constant backoff
  • Orchestration - Parent/child workflow composition
  • Persistence - PostgreSQL-backed execution state

Installation

def deps do
[{:durable, "~> 0.0.0-alpha"}]
end

Quick Start

1. Create Migration

defmodule MyApp.Repo.Migrations.AddDurable do
use Ecto.Migration
def up, do: Durable.Migration.up()
def down, do: Durable.Migration.down()
end

2. Add to Supervision Tree

children = [
MyApp.Repo,
{Durable, repo: MyApp.Repo, queues: %{default: [concurrency: 10]}}
]

3. Define & Run

total = ctx.items |> Enum.map(& &1["price"]) |> Enum.sum() {:ok, assign(ctx, :total, total)} end step :charge_payment, [retry: [max_attempts: 3, backoff: :exponential]], fn ctx -> {:ok, charge} = PaymentService.charge(ctx.order_id, ctx.total) {:ok, assign(ctx, :charge_id, charge.id)} end step :send_confirmation, fn ctx -> EmailService.send_confirmation(ctx.order_id) {:ok, ctx} end end end # Start it {:ok, id} = Durable.start(MyApp.OrderWorkflow, %{"id" => "order_123", "items" => items})">defmodule MyApp.OrderWorkflow do
use Durable
use Durable.Helpers

workflow "process_order", timeout: hours(2) do
# First step receives workflow input
step :validate, fn input ->
{:ok, %{
order_id: input["id"],
items: input["items"],
customer_id: input["customer_id"]
}}
end

# Each step receives previous step's output as context
step :calculate_total, fn ctx ->
total = ctx.items |> Enum.map(& &1["price"]) |> Enum.sum()
{:ok, assign(ctx, :total, total)}
end

step :charge_payment, [retry: [max_attempts: 3, backoff: :exponential]], fn ctx ->
{:ok, charge} = PaymentService.charge(ctx.order_id, ctx.total)
{:ok, assign(ctx, :charge_id, charge.id)}
end

step :send_confirmation, fn ctx ->
EmailService.send_confirmation(ctx.order_id)
{:ok, ctx}
end
end
end

# Start it
{:ok, id} = Durable.start(MyApp.OrderWorkflow, %{"id" => "order_123", "items" => items})

Examples

Approval Workflow

Wait for human approval with timeout fallback.

defmodule MyApp.ExpenseApproval do
use Durable
use Durable.Helpers
use Durable.Wait

workflow "expense_approval" do
step :request_approval, fn ctx ->
result = wait_for_approval("manager",
prompt: "Approve $#{ctx["amount"]} expense?",
timeout: days(3),
timeout_value: :auto_rejected
)
{:ok, assign(ctx, :decision, result)}
end

branch on: fn ctx -> ctx.decision end do
:approved ->
step :process, fn ctx ->
Expenses.reimburse(ctx["employee_id"], ctx["amount"])
{:ok, assign(ctx, :status, :reimbursed)}
end

_ ->
step :notify_rejection, fn ctx ->
Mailer.send_rejection(ctx["employee_id"])
{:ok, assign(ctx, :status, :rejected)}
end
end
end
end

# Approve externally
Durable.provide_input(workflow_id, "manager", :approved)

Parallel Data Fetch

Fetch data concurrently, then combine results.

defmodule MyApp.DashboardBuilder do
use Durable
use Durable.Helpers

workflow "build_dashboard" do
step :init, fn input ->
{:ok, %{user_id: input["user_id"]}}
end

# Parallel steps produce results in __results__ map
parallel do
step :user, fn ctx ->
{:ok, %{user: Users.get(ctx.user_id)}}
end

step :orders, fn ctx ->
{:ok, %{orders: Orders.recent(ctx.user_id)}}
end

step :notifications, fn ctx ->
{:ok, %{notifs: Notifications.unread(ctx.user_id)}}
end
end

# Access results from __results__ map
step :render, fn ctx ->
results = ctx[:__results__]

# Results are tagged tuples: ["ok", data] or ["error", reason]
user = case results["user"] do
["ok", data] -> data.user
_ -> nil
end

orders = case results["orders"] do
["ok", data] -> data.orders
_ -> []
end

notifs = case results["notifications"] do
["ok", data] -> data.notifs
_ -> []
end

dashboard = Dashboard.build(user, orders, notifs)
{:ok, assign(ctx, :dashboard, dashboard)}
end
end
end

# Or use into: to transform results directly
defmodule MyApp.DashboardBuilderWithInto do
use Durable
use Durable.Helpers

workflow "build_dashboard_v2" do
step :init, fn input ->
{:ok, %{user_id: input["user_id"]}}
end

parallel into: fn ctx, results ->
# results contains tuples: %{user: {:ok, data}, orders: {:ok, data}, ...}
case {results[:user], results[:orders], results[:notifications]} do
{{:ok, user_data}, {:ok, orders_data}, {:ok, notifs_data}} ->
{:ok, Map.merge(ctx, %{
user: user_data.user,
orders: orders_data.orders,
notifs: notifs_data.notifs
})}

_ ->
{:error, "Failed to fetch dashboard data"}
end
end do
step :user, fn ctx -> {:ok, %{user: Users.get(ctx.user_id)}} end
step :orders, fn ctx -> {:ok, %{orders: Orders.recent(ctx.user_id)}} end
step :notifications, fn ctx -> {:ok, %{notifs: Notifications.unread(ctx.user_id)}} end
end

step :render, fn ctx ->
dashboard = Dashboard.build(ctx.user, ctx.orders, ctx.notifs)
{:ok, assign(ctx, :dashboard, dashboard)}
end
end
end

Batch Processing

Process items with controlled concurrency using Task.async_stream.

defmodule MyApp.BulkEmailer do
use Durable
use Durable.Helpers

workflow "send_campaign" do
step :load, fn input ->
recipients = Subscribers.active(input["campaign_id"])
{:ok, %{campaign_id: input["campaign_id"], recipients: recipients}}
end

step :send_emails, fn ctx ->
results =
ctx.recipients
|> Task.async_stream(
fn recipient ->
case Mailer.send_campaign(recipient, ctx.campaign_id) do
:ok -> {:ok, recipient}
{:error, reason} -> {:error, {recipient, reason}}
end
end,
max_concurrency: 10,
timeout: :infinity
)
|> Enum.map(fn {:ok, r} -> r end)

sent = for {:ok, _} <- results, do: 1
failed = for {:error, _} <- results, do: 1

{:ok, ctx
|> assign(:sent_count, length(sent))
|> assign(:failed_count, length(failed))}
end
end
end

Trip Booking (Saga)

Book multiple services with automatic rollback on failure.

defmodule MyApp.TripBooking do
use Durable
use Durable.Helpers

workflow "book_trip" do
step :book_flight, [compensate: :cancel_flight], fn ctx ->
booking = Flights.book(ctx["flight"])
{:ok, assign(ctx, :flight, booking)}
end

step :book_hotel, [compensate: :cancel_hotel], fn ctx ->
booking = Hotels.book(ctx["hotel"])
{:ok, assign(ctx, :hotel, booking)}
end

step :charge, fn ctx ->
total = ctx.flight.price + ctx.hotel.price
Payments.charge(ctx["card"], total)
{:ok, assign(ctx, :charged, true)}
end

compensate :cancel_flight, fn ctx ->
Flights.cancel(ctx.flight.id)
{:ok, ctx}
end

compensate :cancel_hotel, fn ctx ->
Hotels.cancel(ctx.hotel.id)
{:ok, ctx}
end
end
end

Scheduled Reports

Run daily at 9am.

defmodule MyApp.DailyReport do
use Durable
use Durable.Helpers
use Durable.Scheduler.DSL

@schedule cron: "0 9 * * *", timezone: "America/New_York"
workflow "daily_sales_report" do
step :generate, fn _input ->
report = Reports.sales_summary(Date.utc_today())
{:ok, %{report: report}}
end

step :distribute, fn ctx ->
Mailer.send_report(ctx.report, to: "team@company.com")
Slack.post_summary(ctx.report, channel: "#sales")
{:ok, ctx}
end
end
end

# Register in supervision tree
{Durable, repo: MyApp.Repo, scheduled_modules: [MyApp.DailyReport]}

Delayed & Scheduled Execution

Sleep, schedule for specific times, and wait for events.

defmodule MyApp.TrialReminder do
use Durable
use Durable.Helpers
use Durable.Wait

workflow "trial_reminder" do
step :welcome, fn ctx ->
Mailer.send_welcome(ctx["user_id"])
{:ok, %{user_id: ctx["user_id"], trial_started_at: ctx["trial_started_at"]}}
end

step :wait_3_days, fn ctx ->
sleep(days(3))
{:ok, ctx}
end

step :check_in, fn ctx ->
Mailer.send_tips(ctx.user_id)
{:ok, ctx}
end

step :wait_until_trial_ends, fn ctx ->
trial_end = DateTime.add(ctx.trial_started_at, 14, :day)
schedule_at(trial_end)
{:ok, ctx}
end

step :convert_or_remind, fn ctx ->
if Subscriptions.active?(ctx.user_id) do
{:ok, assign(ctx, :converted, true)}
else
Mailer.send_upgrade_reminder(ctx.user_id)
{:ok, assign(ctx, :converted, false)}
end
end
end
end

Event-Driven Workflow

Wait for external webhook events.

defmodule MyApp.PaymentFlow do
use Durable
use Durable.Helpers
use Durable.Wait

workflow "payment_flow" do
step :create_invoice, fn ctx ->
invoice = Invoices.create(ctx["order_id"], ctx["amount"])
{:ok, %{order_id: ctx["order_id"], invoice_id: invoice.id}}
end

step :await_payment, fn ctx ->
{event, _payload} = wait_for_any(["payment.success", "payment.failed"],
timeout: days(7),
timeout_value: {"payment.expired", nil}
)
{:ok, assign(ctx, :result, event)}
end

branch on: fn ctx -> ctx.result end do
"payment.success" ->
step :fulfill, fn ctx ->
Orders.fulfill(ctx.order_id)
{:ok, assign(ctx, :status, :fulfilled)}
end

_ ->
step :cancel, fn ctx ->
Orders.cancel(ctx.order_id)
{:ok, assign(ctx, :status, :cancelled)}
end
end
end
end

# Webhook handler sends event
Durable.send_event(workflow_id, "payment.success", %{transaction_id: "txn_123"})

Reference

Helper Functions

use Durable.Helpers

assign(ctx, :key, value) # Set a value
assign(ctx, %{a: 1, b: 2}) # Merge multiple values
update(ctx, :key, default, fn old -> new end)
append(ctx, :list, item) # Append to list
increment(ctx, :count) # Increment by 1
increment(ctx, :count, 5) # Increment by 5

Time Helpers

seconds(30) # 30_000 ms
minutes(5) # 300_000 ms
hours(2) # 7_200_000 ms
days(7) # 604_800_000 ms

Orchestration

use Durable.Orchestration

# Synchronous: call child and wait for result
case call_workflow(MyApp.PaymentWorkflow, %{"amount" => 100}, timeout: hours(1)) do
{:ok, result} -> {:ok, assign(data, :payment, result)}
{:error, reason} -> {:error, reason}
end

# Fire-and-forget: start child and continue
{:ok, child_id} = start_workflow(MyApp.EmailWorkflow, %{"to" => email}, ref: :welcome)

# call_workflow also works inside parallel blocks (executed inline)
parallel do
step :payment, fn data ->
case call_workflow(MyApp.PaymentWorkflow, %{"amount" => data.total}, ref: :pay) do
{:ok, result} -> {:ok, assign(data, :payment, result)}
{:error, reason} -> {:error, reason}
end
end

step :shipping, fn data ->
case call_workflow(MyApp.ShippingWorkflow, %{"id" => data.order_id}, ref: :ship) do
{:ok, result} -> {:ok, assign(data, :shipping, result)}
{:error, reason} -> {:error, reason}
end
end
end

API

Durable.start(Module, input)
Durable.start(Module, input, queue: :priority, scheduled_at: datetime)
Durable.get_execution(id)
Durable.list_executions(workflow: Module, status: :running)
Durable.cancel(id, "reason")
Durable.send_event(id, "event", payload)
Durable.provide_input(id, "input_name", data)
Durable.list_children(parent_id)

Mix Tasks

Durable includes mix tasks for managing workflows from the command line.

mix durable.cancel --reason "no longer needed" # Clean up old executions mix durable.cleanup --older-than 30d # completed/failed older than 30 days mix durable.cleanup --older-than 7d --status completed # only completed, older than 7 days mix durable.cleanup --older-than 24h --dry-run # preview what would be deleted"># Show queue status and workflow summary
mix durable.status

# List workflow executions (with filters)
mix durable.list # all executions
mix durable.list --status running # filter by status
mix durable.list --workflow MyApp.OrderWorkflow # filter by workflow
mix durable.list --limit 20 --format json # limit results, JSON output

# Start a workflow
mix durable.run MyApp.OrderWorkflow # no input
mix durable.run MyApp.OrderWorkflow --input '{"id": 123}' # with JSON input
mix durable.run MyApp.OrderWorkflow --queue high_priority # specific queue

# Cancel a workflow
mix durable.cancel <execution_id>
mix durable.cancel <execution_id> --reason "no longer needed"

# Clean up old executions
mix durable.cleanup --older-than 30d # completed/failed older than 30 days
mix durable.cleanup --older-than 7d --status completed # only completed, older than 7 days
mix durable.cleanup --older-than 24h --dry-run # preview what would be deleted

Guides

Coming Soon

  • Phoenix LiveView dashboard

License

MIT

About

A durable workflow execution engine for Elixir

Topics

Resources

Readme

License

MIT license

Stars

Watchers

Forks

Releases

No releases published

Packages

Contributors

Languages