Skip to main content
Workflows let you chain multiple agents, functions, and connectors into a single execution pipeline with branching, parallel execution, iteration, and full observability.

Quick Start

import { flow, Agent, openai } from '@runflow-ai/sdk';
import { z } from 'zod';

const workflow = flow({
  id: 'support-ticket',
  name: 'Support Ticket Workflow',
  inputSchema: z.object({
    email: z.string().email(),
    issue: z.string(),
  }),
  outputSchema: z.any(),
})
  .step('classify', async (input) => ({
    category: input.issue.toLowerCase().includes('billing') ? 'billing' : 'technical',
    priority: input.issue.toLowerCase().includes('urgent') ? 'high' : 'normal',
  }))
  .step('respond', async (input, ctx) => ({
    ticket: `TICKET-${Date.now()}`,
    category: ctx.results.classify.category,
    message: `We received your ${input.priority} priority issue.`,
  }))
  .build();

const result = await workflow.execute({
  email: 'customer@example.com',
  issue: 'Urgent billing problem',
});
The flow() API is the recommended way to create workflows. The legacy createWorkflow() still works but is deprecated.

Core Concepts

Steps

Each step receives the previous step’s output as input, plus a ctx object with access to all previous results:
.step('enrich', async (input, ctx) => {
  // `input` = output from previous step
  // `ctx.results.classify` = output from the 'classify' step
  // `ctx.results.validate` = output from the 'validate' step
  // `ctx.input` = original workflow input
  return { enriched: true, ...input };
})

Accessing Previous Step Results

Every step receives two arguments: input (output from the previous step) and ctx (the full workflow context). Use ctx to access any previous step’s result by name.
Use kebab-case IDs for steps (e.g., fetch-user, generate-report). Avoid spaces, accents, or special characters — the step ID becomes the key in ctx.results, so ctx.results['fetch-user'] is much cleaner than ctx.results['Buscar Usuário'].
const workflow = flow({ id: 'pipeline', inputSchema, outputSchema })
  .step('fetch-user', async (input) => {
    const user = await db.getUser(input.userId);
    return { name: user.name, email: user.email, plan: user.plan };
  })
  .step('fetch-orders', async (input, ctx) => {
    // input = output from 'fetch-user' (previous step)
    // ctx.results['fetch-user'] = same thing, but accessible by name
    // ctx.input = original workflow input
    const orders = await db.getOrders(input.email);
    return { orders, userName: input.name };
  })
  .step('generate-report', async (input, ctx) => {
    // Access ANY previous step, not just the immediate one
    const user = ctx.results['fetch-user'];     // { name, email, plan }
    const orders = ctx.results['fetch-orders'];  // { orders, userName }
    const originalInput = ctx.input;              // original workflow input

    return {
      report: `${user.name} (${user.plan}): ${orders.orders.length} orders`,
      requestedBy: originalInput.requestedBy,
    };
  })
  .build();
What’s available in ctx:
PropertyTypeDescription
ctx.inputTInputThe original workflow input
ctx.resultsRecord<string, any>All completed step results, keyed by step ID
ctx.results['step-id']anyResult of a specific step
ctx.workflowIdstringWorkflow ID
ctx.executionIdstringCurrent execution ID
ctx.currentStepstringCurrent step ID
ctx.metadataobjectExecution metadata (startTime, stepCount, totalSteps)
You can only access results from steps that have already executed. Accessing a step that hasn’t run yet (or was skipped by a when guard) returns undefined.
Common patterns:
// Access original input from any step
.step('final', async (input, ctx) => {
  const email = ctx.input.email;  // original workflow input
})

// Combine results from multiple previous steps
.step('summary', async (input, ctx) => {
  const classification = ctx.results.classify;
  const enrichment = ctx.results.enrich;
  const validation = ctx.results.validate;
  return { ...classification, ...enrichment, valid: validation.isValid };
})

// Use in conditional guards
.step('notify', {
  handler: async (input, ctx) => { /* ... */ },
  when: (ctx) => ctx.results.classify.priority === 'high',
})

// Use in branch/switch conditions
.switch('route', {
  on: (ctx) => ctx.results.classify.category,
  cases: { /* ... */ },
})

Schema Validation

Steps can declare an outputSchema for runtime validation. If the output doesn’t match, execution stops immediately with a ZodError:
.step('classify', {
  outputSchema: z.object({
    category: z.enum(['billing', 'technical', 'sales']),
    confidence: z.number().min(0).max(1),
  }),
  handler: async (input) => ({
    category: 'billing',
    confidence: 0.95,
  }),
})

Conditional Guards

Skip steps based on runtime conditions:
.step('notify-manager', {
  handler: async (input, ctx) => {
    await notifySlack(ctx.results.classify.category);
    return { notified: true };
  },
  when: (ctx) => ctx.results.classify.priority === 'high',
})

Step Types

.step() — Function Step

Transform data, call APIs, run business logic:
.step('transform', async (input) => ({
  normalized: input.text.trim().toLowerCase(),
  wordCount: input.text.split(' ').length,
}))

.agent() — Agent Step

Execute an AI agent. The output is always { text, metadata: { agent, model, stepId } }:
const analyzer = new Agent({
  name: 'Analyzer',
  instructions: 'Analyze customer issues and extract key information.',
  model: openai('gpt-4o'),
});

flow({ id: 'analysis', inputSchema, outputSchema })
  .agent('analyze', analyzer, {
    promptTemplate: 'Analyze this issue: {{input.issue}}',
  })
  .step('extract', async (input) => ({
    // input.text = agent's response
    // input.metadata.agent = 'Analyzer'
    summary: input.text,
  }))
  .build();

.connector() — Connector Step (simple)

Call an external service with template interpolation:
.connector('create-ticket', 'hubspot', 'tickets', 'create', {
  subject: 'Support Request',
  content: '{{input.issue}}',
  priority: 'medium',
})

Connectors inside .step() (dynamic)

For dynamic connector calls with logic, use the connector() function inside a .step():
import { flow, connector } from '@runflow-ai/sdk';

flow({ id: 'pipeline', inputSchema, outputSchema })
  .step('check-eligibility', async (input) => {
    const result = await connector(
      'api-elegibilidade',            // connector instance slug
      'consulta-por-cpf',             // resource slug
      { path: { cpf: input.cpf } }    // request data
    );
    return { eligible: result.status === 'active', plan: result.plan };
  })
  .step('create-contact', {
    handler: async (input, ctx) => {
      return await connector('hubspot', 'create-contact', {
        email: ctx.input.email,
        properties: { plan: input.plan },
      });
    },
    when: (ctx) => ctx.results['check-eligibility'].eligible,
  })
  .build();
connector() is a function, not a client factory. Always pass all 3 arguments: connector(slug, resource, data). See Connectors for details.

Routing

.branch() — Binary Routing (if/else)

Route to one of two paths based on a condition:
.branch('route', {
  condition: (ctx) => ctx.results.classify.priority === 'high',
  onTrue: async (input, ctx) => ({
    handler: 'priority-queue',
    escalated: true,
  }),
  onFalse: async (input, ctx) => ({
    handler: 'normal-queue',
    escalated: false,
  }),
})
For complex paths with multiple steps, pass arrays:
.branch('route', {
  condition: (ctx) => ctx.results.classify.priority === 'high',
  onTrue: [
    createAgentStep('urgent-agent', urgentAgent, {
      promptTemplate: 'Handle urgent: {{input.issue}}',
    }),
    createConnectorStep('notify', 'slack', 'messages', 'send', {
      channel: '#urgent',
      message: 'Urgent ticket created',
    }),
  ],
  onFalse: [
    createAgentStep('normal-agent', normalAgent),
  ],
})

.switch() — Multi-way Routing

Route to one of N paths based on a value:
.switch('department', {
  on: (ctx) => ctx.results.classify.category,
  cases: {
    billing:   async (input) => ({ agent: 'billing-team', response: '...' }),
    technical: async (input) => ({ agent: 'tech-team', response: '...' }),
    sales:     async (input) => ({ agent: 'sales-team', response: '...' }),
  },
  default: async (input) => ({ agent: 'general', response: 'Forwarded to support.' }),
})
With agent steps per case:
const billingAgent = new Agent({ name: 'Billing', instructions: '...', model: openai('gpt-4o') });
const techAgent = new Agent({ name: 'Tech Support', instructions: '...', model: openai('gpt-4o') });
const salesAgent = new Agent({ name: 'Sales', instructions: '...', model: openai('gpt-4o') });

.switch('department', {
  on: (ctx) => ctx.results.classify.category,
  cases: {
    billing:   [createAgentStep('billing', billingAgent, { promptTemplate: '...' })],
    technical: [createAgentStep('tech', techAgent, { promptTemplate: '...' })],
    sales:     [createAgentStep('sales', salesAgent, { promptTemplate: '...' })],
  },
  default: [createAgentStep('fallback', generalAgent)],
})

Parallel & Iteration

.parallel() — Concurrent Execution

Run multiple steps at the same time:
.parallel('enrich', [
  createFunctionStep('load-profile', async (input) => {
    return await db.getProfile(input.customerId);
  }),
  createFunctionStep('load-orders', async (input) => {
    return await db.getOrders(input.customerId);
  }),
  createFunctionStep('load-tickets', async (input) => {
    return await db.getTickets(input.customerId);
  }),
])
// Next step receives: { type: 'parallel', results: { 'load-profile': ..., 'load-orders': ..., 'load-tickets': ... } }

.foreach() — Array Iteration

Process each item in an array, with optional concurrency:
.step('get-leads', async () => {
  return await db.getUnprocessedLeads();  // returns Lead[]
})
.foreach('qualify', {
  handler: async (lead) => ({
    id: lead.id,
    score: await qualifyLead(lead),
    qualified: lead.revenue > 10000,
  }),
  concurrency: 5,  // Process 5 leads at a time
})
// Output: QualifiedLead[]

.map() — Data Transformation

Transform output between steps when shapes don’t match:
.step('fetch', async () => ({
  items: [1, 2, 3, 4, 5],
  metadata: { total: 5 },
}))
.map((output) => output.items)  // Extract just the array
.foreach('double', {
  handler: async (item) => item * 2,
})
// Output: [2, 4, 6, 8, 10]

Output Transform

Define how to build the final workflow output from all step results:
flow({ id: 'pipeline', inputSchema, outputSchema })
  .step('classify', async (input) => ({ category: 'billing' }))
  .step('process', async (input) => ({ handled: true, response: 'Done' }))
  .output((results, input) => ({
    requestId: `REQ-${Date.now()}`,
    category: results.classify.category,
    response: results.process.response,
    originalInput: input,
  }))
  .build();

Retry Configuration

Add retry logic to any step:
.step('external-call', {
  handler: async (input) => {
    const response = await fetch('https://api.example.com/data');
    return response.json();
  },
  retry: {
    maxAttempts: 3,
    backoff: 'exponential',  // 'fixed' | 'linear' | 'exponential'
    delay: 1000,             // Base delay in ms
    retryableErrors: ['ETIMEDOUT', 'ECONNREFUSED'],
  },
})

Real-time Events

Workflows emit events during execution for monitoring:
const workflow = flow({ id: 'monitored', inputSchema, outputSchema })
  .step('a', async (input) => ({ done: true }))
  .step('b', async (input) => ({ done: true }))
  .build();

workflow.on('workflow:start', ({ workflowId, executionId }) => {
  console.log(`Workflow ${workflowId} started: ${executionId}`);
});

workflow.on('step:start', ({ stepId, stepType }) => {
  console.log(`Step ${stepId} (${stepType}) started`);
});

workflow.on('step:complete', ({ stepId, durationMs }) => {
  console.log(`Step ${stepId} completed in ${durationMs}ms`);
});

workflow.on('step:skip', ({ stepId, reason }) => {
  console.log(`Step ${stepId} skipped: ${reason}`);
});

workflow.on('workflow:complete', ({ executionId, durationMs }) => {
  console.log(`Workflow completed in ${durationMs}ms`);
});

workflow.on('workflow:error', ({ executionId, error }) => {
  console.error(`Workflow failed: ${error}`);
});

Graph Serialization

Get the workflow structure as a serializable DAG for visualization:
const graph = workflow.toGraph();
// {
//   id: 'support-ticket',
//   name: 'Support Ticket Workflow',
//   nodes: [
//     { id: 'classify', type: 'step', label: 'classify' },
//     { id: 'route', type: 'switch', label: 'route' },
//     { id: 'route:billing', type: 'step', label: 'billing' },
//     { id: 'route:technical', type: 'step', label: 'technical' },
//   ],
//   edges: [
//     { source: 'classify', target: 'route' },
//     { source: 'route', target: 'route:billing', label: 'billing' },
//     { source: 'route', target: 'route:technical', label: 'technical' },
//   ],
// }

Full Example: Multi-Agent Customer Service

import { flow, Agent, openai, connector, createAgentStep } from '@runflow-ai/sdk';
import { z } from 'zod';

// Agents
const classifier = new Agent({
  name: 'Classifier',
  instructions: `Classify customer issues into categories: billing, technical, sales, general.
    Return JSON with { category, priority, summary }.`,
  model: openai('gpt-4o'),
});

const billingAgent = new Agent({
  name: 'Billing Specialist',
  instructions: 'Handle billing inquiries. Be precise about amounts and dates.',
  model: openai('gpt-4o'),
});

const techAgent = new Agent({
  name: 'Tech Support',
  instructions: 'Solve technical problems step by step.',
  model: openai('gpt-4o'),
});

// Workflow
const customerService = flow({
  id: 'customer-service',
  name: 'Multi-Agent Customer Service',
  inputSchema: z.object({
    customerId: z.string(),
    message: z.string(),
    channel: z.enum(['email', 'chat', 'phone']),
  }),
  outputSchema: z.any(),
})
  // 1. Classify the issue
  .agent('classify', classifier, {
    promptTemplate: 'Classify this customer message: {{input.message}}',
  })

  // 2. Load customer data in parallel
  .parallel('load-data', [
    createFunctionStep('profile', async (input, ctx) => {
      return await connector('crm', 'contacts', {
        action: 'get',
        id: ctx.input.customerId,
      });
    }),
    createFunctionStep('history', async (input, ctx) => {
      return await connector('crm', 'tickets', {
        action: 'list',
        contactId: ctx.input.customerId,
        limit: 5,
      });
    }),
  ])

  // 3. Route to specialist agent
  .switch('route', {
    on: (ctx) => {
      try {
        return JSON.parse(ctx.results.classify.text).category;
      } catch {
        return 'general';
      }
    },
    cases: {
      billing:   [createAgentStep('billing-handler', billingAgent, {
        promptTemplate: 'Customer: {{input.message}}\nHistory: {{results.load-data}}',
      })],
      technical: [createAgentStep('tech-handler', techAgent, {
        promptTemplate: 'Issue: {{input.message}}\nProfile: {{results.load-data}}',
      })],
    },
    default: async (input, ctx) => ({
      text: 'Your request has been forwarded to our support team.',
      metadata: { agent: 'fallback', model: 'none', stepId: 'route' },
    }),
  })

  // 4. Create ticket and send response
  .step('finalize', async (input, ctx) => {
    const ticketId = `TICKET-${Date.now()}`;

    await connector('crm', 'tickets', {
      action: 'create',
      contactId: ctx.input.customerId,
      subject: ctx.results.classify.text,
      response: input.text,
    });

    return {
      ticketId,
      response: input.text,
      channel: ctx.input.channel,
      category: ctx.results.classify.text,
    };
  })

  // 5. Build final output
  .output((results, input) => ({
    ticketId: results.finalize.ticketId,
    response: results.finalize.response,
    channel: input.channel,
  }))
  .build();

// Execute
const result = await customerService.execute({
  customerId: 'cust_123',
  message: 'I was charged twice for my subscription',
  channel: 'chat',
});

Full Example: Lead Qualification Pipeline

const leadPipeline = flow({
  id: 'lead-qualification',
  name: 'Lead Qualification Pipeline',
  inputSchema: z.object({
    leads: z.array(z.object({
      id: z.string(),
      company: z.string(),
      email: z.string(),
      revenue: z.number(),
    })),
  }),
  outputSchema: z.any(),
})
  // Extract leads array
  .map((input) => input.leads)

  // Qualify each lead concurrently
  .foreach('qualify', {
    handler: async (lead) => {
      const score = lead.revenue > 100000 ? 'enterprise'
        : lead.revenue > 10000 ? 'mid-market'
        : 'smb';

      return { ...lead, score, qualified: score !== 'smb' };
    },
    concurrency: 10,
  })

  // Filter qualified leads
  .step('filter', async (leads: any[]) => ({
    qualified: leads.filter(l => l.qualified),
    disqualified: leads.filter(l => !l.qualified),
    total: leads.length,
  }))

  // Enrich qualified leads with AI
  .step('enrich', {
    handler: async (input, ctx) => ({
      ...input,
      enrichedCount: input.qualified.length,
    }),
    when: (ctx) => ctx.results.filter.qualified.length > 0,
  })

  .output((results) => ({
    qualified: results.filter.qualified,
    disqualified: results.filter.disqualified,
    total: results.filter.total,
  }))
  .build();

Common Mistakes

.parallel() — Must receive an array of steps

// WRONG -- passing an object of handlers
.parallel('fetch', {
  tasks: {
    profile: async () => db.getProfile(),
    orders: async () => db.getOrders(),
  },
})

// CORRECT -- pass an array of WorkflowStep
.parallel('fetch', [
  createFunctionStep('profile', async () => db.getProfile()),
  createFunctionStep('orders', async () => db.getOrders()),
])

.switch() — Use on, not key or evaluate

// WRONG
.switch('route', { key: (ctx) => ctx.results.classify.dept, ... })
.switch('route', { evaluate: (ctx) => ctx.results.classify.dept, ... })

// CORRECT
.switch('route', { on: (ctx) => ctx.results.classify.dept, cases: { ... } })

.foreach() — Input must be an array

The previous step must return an array. Use .map() to extract it if needed:
// WRONG -- passing an `items` resolver
.foreach('process', {
  items: (ctx) => ctx.results.data.list,  // "items" does not exist
  handler: async (item) => item,
})

// CORRECT -- chain .map() before .foreach()
.step('fetch', async () => ({ list: [1, 2, 3], total: 3 }))
.map((output) => output.list)          // extract the array
.foreach('process', {
  handler: async (item) => item * 2,   // receives each item
})

.map() — No ID, no ctx, just a transform function

// WRONG -- passing an ID as first argument
.map('transform-name', (input, ctx) => ({ ... }))

// CORRECT -- just the function, no ID
.map((output) => output.items)
.map((output) => ({ ...output, extra: true }))

connector() — Function call, not a client

// WRONG
const client = connector('hubspot');
await client.execute('contacts', data);

// CORRECT
const result = await connector('hubspot', 'contacts', data);

Method Reference

MethodDescriptionOutput
.step(id, handler)Function stepHandler return type
.step(id, opts)Function step with optionsHandler return type
.agent(id, agent, opts?)AI agent stepAgentStepResult
.connector(id, ...)External service callConnector response
.branch(id, opts)Binary routing (if/else)onTrue | onFalse return
.switch(id, opts)Multi-way routingMatched case return
.parallel(id, steps)Concurrent execution{ results: Record }
.foreach(id, opts)Array iterationTOut[]
.map(transform)Data transformationTransform return
.output(transform)Final output builder
.build()Create Workflow instanceWorkflow

Observability

Workflows share the same observability controls as Agents.

Trace Hierarchy

Every workflow execution automatically generates a hierarchical trace tree:
workflow_execution > "Order Validation"          12.4s
  ├── workflow_step > "parse-request"    [func]    45ms
  ├── workflow_step > "validate-cpf"     [func]   120ms
  ├── workflow_step > "check-eligibility" [api]   2.1s
  │   └── connector_call > "eligibility-api"     2.0s
  ├── workflow_step > "route-by-type"    [switch]  15ms
  ├── workflow_step > "check-prereqs"    [if]     890ms
  ├── workflow_step > "validate-items"   [loop]   3.2s
  │   ├── workflow_step > "item-1"       [func]  1.5s
  │   └── workflow_step > "item-2"       [func]  1.7s
  ├── workflow_step > "parallel-checks"  [parallel] 1.8s
  │   ├── workflow_step > "fraud-check"  [func]  1.2s
  │   └── connector_call > "network-api"         950ms
  └── workflow_step > "generate-report"  [agent]  4.2s
      └── agent_execution
          ├── memory_operation > "load"            65ms
          ├── llm_call > "chat"                   1.3s
          │   └── rag_search > "knowledge"        1.2s
          ├── llm_call > "chat"                   1.8s
          └── memory_operation > "save"            70ms

Controlling Verbosity

// Disable tracing entirely
const workflow = flow({
  id: 'fast-pipeline',
  inputSchema, outputSchema,
  observability: 'minimal',    // No traces sent
}).step('a', handler).build();

// Truncate large payloads
const workflow = flow({
  id: 'data-pipeline',
  inputSchema, outputSchema,
  observability: {
    mode: 'standard',
    maxInputLength: 5000,     // Truncate step inputs
    maxOutputLength: 5000,    // Truncate step outputs
  },
}).step('a', handler).build();

Sanitizing Traces

Use onTrace to remove sensitive data or cancel specific traces:
const workflow = flow({
  id: 'medical-workflow',
  inputSchema, outputSchema,
  observability: {
    onTrace: (trace) => {
      // Remove patient data from traces
      if (trace.input?.cpf) trace.input.cpf = '***';
      if (trace.input?.name) trace.input.name = '***';

      // Skip LLM traces (only keep step-level)
      if (trace.type === 'llm_call') return null;

      return trace;
    }
  },
}).step('a', handler).build();
See Observability for the complete reference on tracing modes, interceptors, and custom logging.

Next Steps

Observability

Tracing, interceptors, and metrics

Complex Workflows

Advanced patterns and real-world examples

Connectors

Integrate external services