Quick Start
Core Concepts
Steps
Each step receives the previous step’s output as input, plus actx object with access to all previous results:
Accessing Previous Step Results
Every step receives two arguments:input (output from the previous step) and ctx (the full workflow context). Use ctx to access any previous step’s result by name.
ctx:
| Property | Type | Description |
|---|---|---|
ctx.input | TInput | The original workflow input |
ctx.results | Record<string, any> | All completed step results, keyed by step ID |
ctx.results['step-id'] | any | Result of a specific step |
ctx.workflowId | string | Workflow ID |
ctx.executionId | string | Current execution ID |
ctx.currentStep | string | Current step ID |
ctx.metadata | object | Execution metadata (startTime, stepCount, totalSteps) |
Schema Validation
Steps can declare anoutputSchema for runtime validation. If the output doesn’t match, execution stops immediately with a ZodError:
Conditional Guards
Skip steps based on runtime conditions:Step Types
.step() — Function Step
Transform data, call APIs, run business logic:
.agent() — Agent Step
Execute an AI agent. The output is always { text, metadata: { agent, model, stepId } }:
.connector() — Connector Step (simple)
Call an external service with template interpolation:
Connectors inside .step() (dynamic)
For dynamic connector calls with logic, use the connector() function inside a .step():
Routing
.branch() — Binary Routing (if/else)
Route to one of two paths based on a condition:
.switch() — Multi-way Routing
Route to one of N paths based on a value:
Parallel & Iteration
.parallel() — Concurrent Execution
Run multiple steps at the same time:
.foreach() — Array Iteration
Process each item in an array, with optional concurrency:
.map() — Data Transformation
Transform output between steps when shapes don’t match:
Output Transform
Define how to build the final workflow output from all step results:Retry Configuration
Add retry logic to any step:Real-time Events
Workflows emit events during execution for monitoring:Graph Serialization
Get the workflow structure as a serializable DAG for visualization:Full Example: Multi-Agent Customer Service
Full Example: Lead Qualification Pipeline
Common Mistakes
.parallel() — Must receive an array of steps
.switch() — Use on, not key or evaluate
.foreach() — Input must be an array
The previous step must return an array. Use .map() to extract it if needed:
.map() — No ID, no ctx, just a transform function
connector() — Function call, not a client
Method Reference
| Method | Description | Output |
|---|---|---|
.step(id, handler) | Function step | Handler return type |
.step(id, opts) | Function step with options | Handler return type |
.agent(id, agent, opts?) | AI agent step | AgentStepResult |
.connector(id, ...) | External service call | Connector response |
.branch(id, opts) | Binary routing (if/else) | onTrue | onFalse return |
.switch(id, opts) | Multi-way routing | Matched case return |
.parallel(id, steps) | Concurrent execution | { results: Record } |
.foreach(id, opts) | Array iteration | TOut[] |
.map(transform) | Data transformation | Transform return |
.output(transform) | Final output builder | — |
.build() | Create Workflow instance | Workflow |
Observability
Workflows share the same observability controls as Agents.Trace Hierarchy
Every workflow execution automatically generates a hierarchical trace tree:Controlling Verbosity
Sanitizing Traces
UseonTrace to remove sensitive data or cancel specific traces:
Next Steps
Observability
Tracing, interceptors, and metrics
Complex Workflows
Advanced patterns and real-world examples
Connectors
Integrate external services