Supervisor Pattern
A supervisor agent coordinates specialist agents, routing work based on classification:Copy
import { flow, Agent, openai, createAgentStep } from '@runflow-ai/sdk';
import { z } from 'zod';
const supervisor = new Agent({
name: 'Supervisor',
instructions: `You are a supervisor that classifies incoming requests.
Respond with JSON: { "department": "billing|technical|sales|hr", "urgency": "low|medium|high" }`,
model: openai('gpt-4o'),
});
const billingAgent = new Agent({
name: 'Billing Specialist',
instructions: 'Handle billing, invoices, payments. Be precise with numbers.',
model: openai('gpt-4o'),
});
const techAgent = new Agent({
name: 'Tech Support',
instructions: 'Solve technical problems. Ask clarifying questions if needed.',
model: openai('gpt-4o'),
});
const salesAgent = new Agent({
name: 'Sales Rep',
instructions: 'Handle sales inquiries. Be consultative, not pushy.',
model: openai('gpt-4o'),
});
const hrAgent = new Agent({
name: 'HR Assistant',
instructions: 'Handle HR questions about policies, benefits, time off.',
model: openai('gpt-4o'),
});
const supervisorWorkflow = flow({
id: 'supervisor-workflow',
name: 'Supervisor Multi-Agent System',
inputSchema: z.object({
message: z.string(),
userId: z.string(),
}),
outputSchema: z.any(),
})
// Step 1: Supervisor classifies
.agent('classify', supervisor, {
promptTemplate: 'Classify this request: {{input.message}}',
})
// Step 2: Parse classification
.step('parse', {
outputSchema: z.object({
department: z.enum(['billing', 'technical', 'sales', 'hr']),
urgency: z.enum(['low', 'medium', 'high']),
}),
handler: async (input) => {
try {
return JSON.parse(input.text);
} catch {
return { department: 'technical', urgency: 'medium' };
}
},
})
// Step 3: Route to specialist
.switch('route', {
on: (ctx) => ctx.results.parse.department,
cases: {
billing: [createAgentStep('billing', billingAgent, {
promptTemplate: 'Customer request: {{input.message}}',
})],
technical: [createAgentStep('tech', techAgent, {
promptTemplate: 'Technical issue: {{input.message}}',
})],
sales: [createAgentStep('sales', salesAgent, {
promptTemplate: 'Sales inquiry: {{input.message}}',
})],
hr: [createAgentStep('hr', hrAgent, {
promptTemplate: 'HR question: {{input.message}}',
})],
},
})
// Step 4: High-urgency notification
.step('notify', {
handler: async (input, ctx) => {
console.log(`Alert: ${ctx.results.parse.urgency} urgency ticket routed to ${ctx.results.parse.department}`);
return { ...input, notified: true };
},
when: (ctx) => ctx.results.parse.urgency === 'high',
})
.output((results, input) => ({
userId: input.userId,
department: results.parse.department,
urgency: results.parse.urgency,
response: results.route.text,
}))
.build();
Data Processing Pipeline
Process, enrich, and aggregate data with parallel steps and iteration:Copy
const dataPipeline = flow({
id: 'data-pipeline',
name: 'Customer Data Pipeline',
inputSchema: z.object({
customerIds: z.array(z.string()),
}),
outputSchema: z.any(),
})
// Extract customer IDs
.map((input) => input.customerIds)
// Fetch all customer data concurrently
.foreach('fetch', {
handler: async (customerId) => {
const profile = await db.getCustomer(customerId);
const orders = await db.getOrders(customerId);
return {
id: customerId,
name: profile.name,
totalOrders: orders.length,
totalSpent: orders.reduce((sum, o) => sum + o.amount, 0),
lastOrder: orders[0]?.date || null,
};
},
concurrency: 20,
})
// Segment customers
.step('segment', async (customers: any[]) => {
const segments = {
vip: customers.filter(c => c.totalSpent > 50000),
active: customers.filter(c => c.totalSpent > 5000 && c.totalSpent <= 50000),
dormant: customers.filter(c => c.totalSpent <= 5000),
};
return { segments, total: customers.length };
})
// Generate insights with AI for VIP segment
.step('vip-insights', {
handler: async (input, ctx) => {
const vipSummary = input.segments.vip
.map((c: any) => `${c.name}: $${c.totalSpent}, ${c.totalOrders} orders`)
.join('\n');
return {
...input,
vipCount: input.segments.vip.length,
summary: vipSummary,
};
},
when: (ctx) => ctx.results.segment.segments.vip.length > 0,
})
.output((results) => ({
segments: results.segment.segments,
total: results.segment.total,
vipInsights: results['vip-insights']?.summary || 'No VIP customers',
}))
.build();
Multi-Stage Approval Pipeline
A pipeline where each stage validates the previous one’s output:Copy
const approvalPipeline = flow({
id: 'content-approval',
name: 'Content Approval Pipeline',
inputSchema: z.object({
topic: z.string(),
audience: z.string(),
tone: z.enum(['formal', 'casual', 'technical']),
}),
outputSchema: z.any(),
})
// Stage 1: Draft
.agent('draft', new Agent({
name: 'Content Writer',
instructions: 'Write engaging content. Follow brand guidelines.',
model: openai('gpt-4o'),
}), {
promptTemplate: 'Write a {{input.tone}} article about {{input.topic}} for {{input.audience}}.',
})
// Stage 2: Review
.agent('review', new Agent({
name: 'Content Reviewer',
instructions: `Review content for quality. Respond with JSON:
{ "approved": boolean, "score": 1-10, "issues": string[] }`,
model: openai('gpt-4o'),
}), {
promptTemplate: 'Review this content:\n\n{{results.draft.text}}',
})
// Stage 3: Parse review
.step('parse-review', async (input) => {
try {
return JSON.parse(input.text);
} catch {
return { approved: false, score: 0, issues: ['Failed to parse review'] };
}
})
// Stage 4: Route based on approval
.branch('approval', {
condition: (ctx) => ctx.results['parse-review'].approved === true,
onTrue: async (input, ctx) => ({
status: 'approved',
content: ctx.results.draft.text,
score: input.score,
}),
onFalse: async (input, ctx) => ({
status: 'needs-revision',
content: ctx.results.draft.text,
issues: input.issues,
score: input.score,
}),
})
.output((results, input) => ({
topic: input.topic,
...results.approval,
}))
.build();
Enrichment + Scoring Pipeline
Combine data from multiple sources and score:Copy
const leadScoring = flow({
id: 'lead-scoring',
name: 'Lead Scoring Pipeline',
inputSchema: z.object({
email: z.string().email(),
company: z.string(),
jobTitle: z.string(),
}),
outputSchema: z.any(),
})
// Parallel enrichment from multiple sources
.parallel('enrich', [
createFunctionStep('clearbit', async (input) => {
// Simulate enrichment API
return { employees: 500, industry: 'SaaS', funding: '$50M' };
}),
createFunctionStep('linkedin', async (input) => {
return { connections: 1200, seniority: 'Director' };
}),
createFunctionStep('crm-history', async (input) => {
return { previousDeals: 2, lastContact: '2026-01-15', status: 'active' };
}),
])
// Calculate score
.step('score', async (input) => {
const data = input.results;
let score = 0;
// Company size
if (data.clearbit.employees > 200) score += 30;
else if (data.clearbit.employees > 50) score += 15;
// Seniority
if (['VP', 'Director', 'C-Level'].includes(data.linkedin.seniority)) score += 25;
// Previous relationship
if (data['crm-history'].previousDeals > 0) score += 30;
if (data['crm-history'].status === 'active') score += 15;
return {
score,
tier: score >= 70 ? 'hot' : score >= 40 ? 'warm' : 'cold',
enrichment: data,
};
})
// Route by tier
.switch('action', {
on: (ctx) => ctx.results.score.tier,
cases: {
hot: async (input) => ({ action: 'schedule-demo', assignTo: 'senior-ae' }),
warm: async (input) => ({ action: 'nurture-sequence', assignTo: 'sdr' }),
cold: async (input) => ({ action: 'add-to-drip', assignTo: 'marketing' }),
},
})
.output((results, input) => ({
email: input.email,
company: input.company,
score: results.score.score,
tier: results.score.tier,
action: results.action,
}))
.build();
Migration from Legacy API
If you’re using the legacycreateWorkflow() API, here’s how to migrate:
Copy
// BEFORE (legacy)
import { createWorkflow, createAgentStep } from '@runflow-ai/sdk';
const wf = createWorkflow({ id: 'my-wf', inputSchema, outputSchema })
.function('classify', async (input) => ({ category: 'sales' }))
.condition(
'route',
(ctx) => ctx.stepResults.get('classify')?.category === 'sales',
[createAgentStep('sales', salesAgent)],
[createAgentStep('support', supportAgent)],
)
.output((stepResults) => ({
response: stepResults.route?.result?.text,
}))
.build();
Copy
// AFTER (V2)
import { flow, createAgentStep } from '@runflow-ai/sdk';
const wf = flow({ id: 'my-wf', inputSchema, outputSchema })
.step('classify', async (input) => ({ category: 'sales' }))
.switch('route', {
on: (ctx) => ctx.results.classify.category,
cases: {
sales: [createAgentStep('sales', salesAgent)],
support: [createAgentStep('support', supportAgent)],
},
})
.output((results) => ({
response: results.route.text,
}))
.build();
createWorkflow()becomesflow().function()becomes.step().condition()becomes.branch()or.switch()ctx.stepResults.get('name')becomesctx.results.namestepResultsin.output()becomesresults
Next Steps
Workflows
Core workflow concepts
Connectors
Integrate external services