Skip to main content

Supervisor Pattern

A supervisor agent coordinates specialist agents, routing work based on classification:
import { flow, Agent, openai, createAgentStep } from '@runflow-ai/sdk';
import { z } from 'zod';

const supervisor = new Agent({
  name: 'Supervisor',
  instructions: `You are a supervisor that classifies incoming requests.
    Respond with JSON: { "department": "billing|technical|sales|hr", "urgency": "low|medium|high" }`,
  model: openai('gpt-4o'),
});

const billingAgent = new Agent({
  name: 'Billing Specialist',
  instructions: 'Handle billing, invoices, payments. Be precise with numbers.',
  model: openai('gpt-4o'),
});

const techAgent = new Agent({
  name: 'Tech Support',
  instructions: 'Solve technical problems. Ask clarifying questions if needed.',
  model: openai('gpt-4o'),
});

const salesAgent = new Agent({
  name: 'Sales Rep',
  instructions: 'Handle sales inquiries. Be consultative, not pushy.',
  model: openai('gpt-4o'),
});

const hrAgent = new Agent({
  name: 'HR Assistant',
  instructions: 'Handle HR questions about policies, benefits, time off.',
  model: openai('gpt-4o'),
});

const supervisorWorkflow = flow({
  id: 'supervisor-workflow',
  name: 'Supervisor Multi-Agent System',
  inputSchema: z.object({
    message: z.string(),
    userId: z.string(),
  }),
  outputSchema: z.any(),
})
  // Step 1: Supervisor classifies
  .agent('classify', supervisor, {
    promptTemplate: 'Classify this request: {{input.message}}',
  })

  // Step 2: Parse classification
  .step('parse', {
    outputSchema: z.object({
      department: z.enum(['billing', 'technical', 'sales', 'hr']),
      urgency: z.enum(['low', 'medium', 'high']),
    }),
    handler: async (input) => {
      try {
        return JSON.parse(input.text);
      } catch {
        return { department: 'technical', urgency: 'medium' };
      }
    },
  })

  // Step 3: Route to specialist
  .switch('route', {
    on: (ctx) => ctx.results.parse.department,
    cases: {
      billing:   [createAgentStep('billing', billingAgent, {
        promptTemplate: 'Customer request: {{input.message}}',
      })],
      technical: [createAgentStep('tech', techAgent, {
        promptTemplate: 'Technical issue: {{input.message}}',
      })],
      sales:     [createAgentStep('sales', salesAgent, {
        promptTemplate: 'Sales inquiry: {{input.message}}',
      })],
      hr:        [createAgentStep('hr', hrAgent, {
        promptTemplate: 'HR question: {{input.message}}',
      })],
    },
  })

  // Step 4: High-urgency notification
  .step('notify', {
    handler: async (input, ctx) => {
      console.log(`Alert: ${ctx.results.parse.urgency} urgency ticket routed to ${ctx.results.parse.department}`);
      return { ...input, notified: true };
    },
    when: (ctx) => ctx.results.parse.urgency === 'high',
  })

  .output((results, input) => ({
    userId: input.userId,
    department: results.parse.department,
    urgency: results.parse.urgency,
    response: results.route.text,
  }))
  .build();

Data Processing Pipeline

Process, enrich, and aggregate data with parallel steps and iteration:
const dataPipeline = flow({
  id: 'data-pipeline',
  name: 'Customer Data Pipeline',
  inputSchema: z.object({
    customerIds: z.array(z.string()),
  }),
  outputSchema: z.any(),
})
  // Extract customer IDs
  .map((input) => input.customerIds)

  // Fetch all customer data concurrently
  .foreach('fetch', {
    handler: async (customerId) => {
      const profile = await db.getCustomer(customerId);
      const orders = await db.getOrders(customerId);
      return {
        id: customerId,
        name: profile.name,
        totalOrders: orders.length,
        totalSpent: orders.reduce((sum, o) => sum + o.amount, 0),
        lastOrder: orders[0]?.date || null,
      };
    },
    concurrency: 20,
  })

  // Segment customers
  .step('segment', async (customers: any[]) => {
    const segments = {
      vip: customers.filter(c => c.totalSpent > 50000),
      active: customers.filter(c => c.totalSpent > 5000 && c.totalSpent <= 50000),
      dormant: customers.filter(c => c.totalSpent <= 5000),
    };
    return { segments, total: customers.length };
  })

  // Generate insights with AI for VIP segment
  .step('vip-insights', {
    handler: async (input, ctx) => {
      const vipSummary = input.segments.vip
        .map((c: any) => `${c.name}: $${c.totalSpent}, ${c.totalOrders} orders`)
        .join('\n');
      return {
        ...input,
        vipCount: input.segments.vip.length,
        summary: vipSummary,
      };
    },
    when: (ctx) => ctx.results.segment.segments.vip.length > 0,
  })

  .output((results) => ({
    segments: results.segment.segments,
    total: results.segment.total,
    vipInsights: results['vip-insights']?.summary || 'No VIP customers',
  }))
  .build();

Multi-Stage Approval Pipeline

A pipeline where each stage validates the previous one’s output:
const approvalPipeline = flow({
  id: 'content-approval',
  name: 'Content Approval Pipeline',
  inputSchema: z.object({
    topic: z.string(),
    audience: z.string(),
    tone: z.enum(['formal', 'casual', 'technical']),
  }),
  outputSchema: z.any(),
})
  // Stage 1: Draft
  .agent('draft', new Agent({
    name: 'Content Writer',
    instructions: 'Write engaging content. Follow brand guidelines.',
    model: openai('gpt-4o'),
  }), {
    promptTemplate: 'Write a {{input.tone}} article about {{input.topic}} for {{input.audience}}.',
  })

  // Stage 2: Review
  .agent('review', new Agent({
    name: 'Content Reviewer',
    instructions: `Review content for quality. Respond with JSON:
      { "approved": boolean, "score": 1-10, "issues": string[] }`,
    model: openai('gpt-4o'),
  }), {
    promptTemplate: 'Review this content:\n\n{{results.draft.text}}',
  })

  // Stage 3: Parse review
  .step('parse-review', async (input) => {
    try {
      return JSON.parse(input.text);
    } catch {
      return { approved: false, score: 0, issues: ['Failed to parse review'] };
    }
  })

  // Stage 4: Route based on approval
  .branch('approval', {
    condition: (ctx) => ctx.results['parse-review'].approved === true,
    onTrue: async (input, ctx) => ({
      status: 'approved',
      content: ctx.results.draft.text,
      score: input.score,
    }),
    onFalse: async (input, ctx) => ({
      status: 'needs-revision',
      content: ctx.results.draft.text,
      issues: input.issues,
      score: input.score,
    }),
  })

  .output((results, input) => ({
    topic: input.topic,
    ...results.approval,
  }))
  .build();

Enrichment + Scoring Pipeline

Combine data from multiple sources and score:
const leadScoring = flow({
  id: 'lead-scoring',
  name: 'Lead Scoring Pipeline',
  inputSchema: z.object({
    email: z.string().email(),
    company: z.string(),
    jobTitle: z.string(),
  }),
  outputSchema: z.any(),
})
  // Parallel enrichment from multiple sources
  .parallel('enrich', [
    createFunctionStep('clearbit', async (input) => {
      // Simulate enrichment API
      return { employees: 500, industry: 'SaaS', funding: '$50M' };
    }),
    createFunctionStep('linkedin', async (input) => {
      return { connections: 1200, seniority: 'Director' };
    }),
    createFunctionStep('crm-history', async (input) => {
      return { previousDeals: 2, lastContact: '2026-01-15', status: 'active' };
    }),
  ])

  // Calculate score
  .step('score', async (input) => {
    const data = input.results;
    let score = 0;

    // Company size
    if (data.clearbit.employees > 200) score += 30;
    else if (data.clearbit.employees > 50) score += 15;

    // Seniority
    if (['VP', 'Director', 'C-Level'].includes(data.linkedin.seniority)) score += 25;

    // Previous relationship
    if (data['crm-history'].previousDeals > 0) score += 30;
    if (data['crm-history'].status === 'active') score += 15;

    return {
      score,
      tier: score >= 70 ? 'hot' : score >= 40 ? 'warm' : 'cold',
      enrichment: data,
    };
  })

  // Route by tier
  .switch('action', {
    on: (ctx) => ctx.results.score.tier,
    cases: {
      hot:  async (input) => ({ action: 'schedule-demo', assignTo: 'senior-ae' }),
      warm: async (input) => ({ action: 'nurture-sequence', assignTo: 'sdr' }),
      cold: async (input) => ({ action: 'add-to-drip', assignTo: 'marketing' }),
    },
  })

  .output((results, input) => ({
    email: input.email,
    company: input.company,
    score: results.score.score,
    tier: results.score.tier,
    action: results.action,
  }))
  .build();

Migration from Legacy API

If you’re using the legacy createWorkflow() API, here’s how to migrate:
// BEFORE (legacy)
import { createWorkflow, createAgentStep } from '@runflow-ai/sdk';

const wf = createWorkflow({ id: 'my-wf', inputSchema, outputSchema })
  .function('classify', async (input) => ({ category: 'sales' }))
  .condition(
    'route',
    (ctx) => ctx.stepResults.get('classify')?.category === 'sales',
    [createAgentStep('sales', salesAgent)],
    [createAgentStep('support', supportAgent)],
  )
  .output((stepResults) => ({
    response: stepResults.route?.result?.text,
  }))
  .build();
// AFTER (V2)
import { flow, createAgentStep } from '@runflow-ai/sdk';

const wf = flow({ id: 'my-wf', inputSchema, outputSchema })
  .step('classify', async (input) => ({ category: 'sales' }))
  .switch('route', {
    on: (ctx) => ctx.results.classify.category,
    cases: {
      sales:   [createAgentStep('sales', salesAgent)],
      support: [createAgentStep('support', supportAgent)],
    },
  })
  .output((results) => ({
    response: results.route.text,
  }))
  .build();
Key differences:
  • createWorkflow() becomes flow()
  • .function() becomes .step()
  • .condition() becomes .branch() or .switch()
  • ctx.stepResults.get('name') becomes ctx.results.name
  • stepResults in .output() becomes results

Next Steps

Workflows

Core workflow concepts

Connectors

Integrate external services