Motia Icon

API Reference

Complete API reference for the Motia framework

Everything you need to know about Motia's APIs. This reference covers all the types, methods, and configurations available when building with Motia.

If you're new to Motia, start with the Steps guide to understand the basics.

Step Configuration

Every Step needs a config. The unified StepConfig type works for all step types -- the triggers array determines what activates the step.

StepConfig

type StepConfig = {
  name: string
  description?: string
  triggers: readonly TriggerConfig[]
  enqueues?: readonly Enqueue[]
  virtualEnqueues?: readonly Enqueue[]
  virtualSubscribes?: readonly string[]
  flows?: readonly string[]
  includeFiles?: readonly string[]
}

Required fields:

  • name - Unique identifier for this Step
  • triggers - Array of triggers that activate this step (API, queue, cron, state, stream)

Optional fields:

  • description - Human-readable description
  • enqueues - Topics this Step can enqueue
  • virtualEnqueues - Topics shown in the iii development console but not actually enqueued (gray connections)
  • virtualSubscribes - Topics shown in the iii development console for flow visualization
  • flows - Flow names for iii development console grouping
  • includeFiles - Files to bundle with this Step (supports glob patterns, relative to Step file)

TriggerConfig

Triggers define how a step gets activated. A step can have multiple triggers.

type TriggerConfig = QueueTrigger | ApiTrigger | CronTrigger | StateTrigger | StreamTrigger

ApiTrigger

Use this for HTTP endpoints.

type ApiTrigger = {
  type: 'api'
  path: string
  method: ApiRouteMethod
  bodySchema?: StepSchemaInput
  responseSchema?: Record<number, StepSchemaInput>
  queryParams?: readonly QueryParam[]
  middleware?: readonly ApiMiddleware[]
  condition?: TriggerCondition
}

QueueTrigger

Use this for background jobs and event-driven tasks.

type QueueTrigger = {
  type: 'queue'
  topic: string
  input?: StepSchemaInput
  condition?: TriggerCondition
  infrastructure?: Partial<InfrastructureConfig>
}

CronTrigger

Use this for scheduled tasks.

type CronTrigger = {
  type: 'cron'
  expression: string
  condition?: TriggerCondition
}

Use crontab.guru to build cron expressions.

StateTrigger

Use this to trigger steps based on state changes.

type StateTrigger = {
  type: 'state'
  condition?: TriggerCondition
}

StreamTrigger

Use this to trigger steps from stream events.

type StreamTrigger = {
  type: 'stream'
  streamName: string
  groupId?: string
  itemId?: string
  condition?: TriggerCondition
}

Trigger Helper Functions

Use these helpers for concise trigger definitions:

import { api, queue, cron, state, stream } from 'motia'
 
api(method: ApiRouteMethod, path: string, options?: ApiOptions, condition?: TriggerCondition): ApiTrigger
queue(topic: string, options?: QueueOptions, condition?: TriggerCondition): QueueTrigger
cron(expression: string, condition?: TriggerCondition): CronTrigger
state(condition?: TriggerCondition): StateTrigger
stream(streamName: string, condition?: TriggerCondition): StreamTrigger

Enqueue Type

type Enqueue = string | { topic: string; label?: string; conditional?: boolean }
type EnqueueData<T> = { topic: string; data: T; messageGroupId?: string }

Config Examples

import { StepConfig, Handlers, api, queue, cron } from 'motia'
 
export const config = {
  name: 'CreateUser',
  description: 'Creates a new user',
  triggers: [
    api('POST', '/users', {
      bodySchema: z.object({ name: z.string() }),
      responseSchema: {
        201: z.object({ id: z.string(), name: z.string() })
      },
      middleware: [authMiddleware],
      queryParams: [{ name: 'invite', description: 'Invite code' }],
    }),
  ],
  enqueues: ['user.created'],
  virtualEnqueues: ['notification.sent'],
  virtualSubscribes: ['user.invited'],
  flows: ['user-management'],
  includeFiles: ['../../assets/template.html'],
} as const satisfies StepConfig

Queue Step Config Example

import { StepConfig, queue } from 'motia'
 
export const config = {
  name: 'ProcessOrder',
  description: 'Processes new orders',
  triggers: [
    queue('order.created', {
      input: z.object({ orderId: z.string(), amount: z.number() }),
      infrastructure: {
        handler: { ram: 2048, timeout: 60 },
        queue: { type: 'fifo', maxRetries: 3, visibilityTimeout: 90 }
      },
    }),
  ],
  enqueues: ['order.processed'],
  virtualEnqueues: ['payment.initiated'],
  virtualSubscribes: ['order.cancelled'],
  flows: ['orders'],
  includeFiles: ['./templates/*.html'],
} as const satisfies StepConfig

Infrastructure config (Motia Cloud only):

  • handler.ram - Memory in MB (128-10240, required)
  • handler.cpu - CPU vCPUs (optional, auto-calculated from RAM if not provided, must be proportional)
  • handler.timeout - Timeout in seconds (1-900, required)
  • queue.type - 'fifo' or 'standard' (required)
  • queue.maxRetries - Max retry attempts (0+, required)
  • queue.visibilityTimeout - Timeout in seconds (required, must be > handler.timeout to prevent premature redelivery)
  • queue.delaySeconds - Optional delay before message becomes visible (0-900)

Cron Step Config Example

import { StepConfig, cron } from 'motia'
 
export const config = {
  name: 'DailyReport',
  description: 'Generates daily reports at 9 AM',
  triggers: [
    cron('0 9 * * *'),
  ],
  enqueues: ['report.generated'],
  virtualEnqueues: ['email.sent'],
  virtualSubscribes: ['report.requested'],
  flows: ['reporting'],
  includeFiles: ['./templates/report.html'],
} as const satisfies StepConfig

Multi-Trigger Step Config Example

A single step can respond to multiple trigger types:

import { StepConfig, api, queue, cron } from 'motia'
 
export const config = {
  name: 'UserSync',
  description: 'Syncs user data from multiple sources',
  triggers: [
    api('POST', '/users/sync'),
    queue('user.updated'),
    cron('0 */6 * * *'),
  ],
  enqueues: ['user.synced'],
  flows: ['user-management'],
} as const satisfies StepConfig

NoopConfig

Use this for visual-only nodes in the iii development console (no code execution).

import { StepConfig } from 'motia'
 
export const config = {
  name: 'ManualApproval',
  description: 'Manager approval gate',
  triggers: [],
  virtualEnqueues: ['approved', 'rejected'],
  virtualSubscribes: ['approval.requested'],
  flows: ['approvals'],
} as const satisfies StepConfig

No handler needed - Noop Steps don't execute code. They exist for iii development console visualization only.


Handlers

Handlers are the functions that execute your business logic. Use the Handlers type with your config for full type safety.

Handlers Type

type Handlers<TConfig extends StepConfig> = (
  input: InferHandlerInput<TConfig>,
  ctx: FlowContext<InferEnqueues<TConfig>, InferHandlerInput<TConfig>>,
) => Promise<ApiResponse | void>

The handler signature is unified -- the input type is inferred from the trigger that activated the step. Use ctx.match() or ctx.is to differentiate between trigger types in multi-trigger steps.


API Step Handler

Receives a request, returns a response.

import { StepConfig, Handlers, api } from 'motia'
 
export const config = {
  name: 'CreateUser',
  triggers: [api('POST', '/users')],
  enqueues: ['user.created'],
} as const satisfies StepConfig
 
export const handler: Handlers<typeof config> = async (req, ctx) => {
  const { name, email } = req.body
  const userId = crypto.randomUUID()
 
  await ctx.enqueue({
    topic: 'user.created',
    data: { userId, email }
  })
 
  return {
    status: 201,
    body: { id: userId, name, email },
    headers: { 'X-Request-ID': ctx.traceId }
  }
}

Queue Step Handler

Receives queue data, processes it. No return value.

import { StepConfig, Handlers, queue } from 'motia'
 
export const config = {
  name: 'ProcessOrder',
  triggers: [queue('order.created', { input: z.object({ orderId: z.string(), amount: z.number() }) })],
  enqueues: ['order.processed'],
} as const satisfies StepConfig
 
export const handler: Handlers<typeof config> = async (input, ctx) => {
  const data = ctx.getData()
  const { orderId, amount } = data
 
  ctx.logger.info('Processing order', { orderId, amount })
 
  await ctx.state.set('orders', orderId, {
    id: orderId,
    amount,
    status: 'processed'
  })
 
  await ctx.enqueue({
    topic: 'order.processed',
    data: { orderId }
  })
}

Cron Step Handler

Runs on a schedule. Only receives context.

import { StepConfig, Handlers, cron } from 'motia'
 
export const config = {
  name: 'DailyCleanup',
  triggers: [cron('0 0 * * *')],
  enqueues: [],
} as const satisfies StepConfig
 
export const handler: Handlers<typeof config> = async (input, ctx) => {
  ctx.logger.info('Running daily cleanup')
 
  const oldOrders = await ctx.state.list('orders')
  const cutoff = Date.now() - (30 * 24 * 60 * 60 * 1000)
 
  for (const order of oldOrders) {
    if (order.createdAt < cutoff) {
      await ctx.state.delete('orders', order.id)
    }
  }
}

Multi-Trigger Handler with match()

For steps with multiple triggers, use ctx.match() to handle each trigger type:

import { StepConfig, Handlers, api, queue, cron } from 'motia'
 
export const config = {
  name: 'UserSync',
  triggers: [
    api('POST', '/users/sync'),
    queue('user.updated'),
    cron('0 */6 * * *'),
  ],
  enqueues: ['user.synced'],
} as const satisfies StepConfig
 
export const handler: Handlers<typeof config> = async (input, ctx) => {
  return ctx.match({
    api: async (request) => {
      const { userId } = request.body
      await syncUser(userId, ctx)
      return { status: 200, body: { synced: true } }
    },
    queue: async (data) => {
      const payload = ctx.getData()
      await syncUser(payload.userId, ctx)
    },
    cron: async () => {
      const allUsers = await ctx.state.list('users')
      for (const user of allUsers) {
        await syncUser(user.id, ctx)
      }
    },
  })
}

You can also use ctx.is for simpler checks:

if (ctx.is.api(input)) {
  return { status: 200, body: { ok: true } }
}
if (ctx.is.queue(input)) {
  const data = ctx.getData()
}
if (ctx.is.cron(input)) {
  // scheduled execution
}

Handler Context (FlowContext)

Every handler gets a context object (ctx in TypeScript/JavaScript, context in Python) with these tools.

interface FlowContext<TEnqueueData, TInput> {
  enqueue: Enqueuer<TEnqueueData>
  traceId: string
  state: InternalStateManager
  logger: Logger
  streams: Streams
  trigger: TriggerInfo
  is: {
    queue: (input) => boolean
    api: (input) => boolean
    cron: (input) => boolean
    state: (input) => boolean
    stream: (input) => boolean
  }
  getData: () => ExtractDataPayload<TInput>
  match: <TResult>(handlers: MatchHandlers) => Promise<TResult | void>
}

enqueue

Trigger other Steps by publishing messages to topics.

await ctx.enqueue({
  topic: 'order.created',
  data: { orderId: '123', total: 99.99 }
})
 
await ctx.enqueue({
  topic: 'order.processing',
  data: { orderId: '123', items: [...] },
  messageGroupId: 'user-456'
})

FIFO queues: When enqueuing to a topic that has a FIFO queue subscriber, you must include messageGroupId. Messages with the same messageGroupId are processed sequentially. Different groups are processed in parallel.

The data must match the input schema of Steps subscribing to that topic.


logger

Structured logging with automatic trace ID correlation.

logger.info('User created', { userId: '123', email: 'user@example.com' })
logger.warn('Rate limit approaching', { current: 95, limit: 100 })
logger.error('Payment failed', { error: err.message, orderId: '456' })
logger.debug('Cache miss', { key: 'user:123' })

All logs are automatically tagged with:

  • Timestamp
  • Step name
  • Trace ID
  • Any metadata you pass

Learn more about Observability


state

Persistent key-value storage shared across Steps.

interface InternalStateManager {
  get<T>(groupId: string, key: string): Promise<T | null>
  set<T>(groupId: string, key: string, value: T): Promise<StreamSetResult<T> | null>
  update<T>(groupId: string, key: string, ops: UpdateOp[]): Promise<StreamSetResult<T> | null>
  delete<T>(groupId: string, key: string): Promise<T | null>
  list<T>(groupId: string): Promise<T[]>
  clear(groupId: string): Promise<void>
}
 
type StreamSetResult<T> = { new_value: T; old_value: T | null }
 
type UpdateOp =
  | { type: 'set', path: string, value: any }
  | { type: 'increment', path: string, by: number }
  | { type: 'decrement', path: string, by: number }
await state.set('users', 'user-123', { name: 'Alice', email: 'alice@example.com' })
 
const user = await state.get<User>('users', 'user-123')
 
const allUsers = await state.list<User>('users')
 
await state.update('users', 'user-123', [
  { type: 'set', path: 'name', value: 'Bob' },
  { type: 'increment', path: 'loginCount', by: 1 },
])
 
await state.delete('users', 'user-123')
 
await state.clear('users')

Methods:

  • get(groupId, key) - Returns the value or null
  • set(groupId, key, value) - Stores the value and returns { new_value, old_value }
  • update(groupId, key, ops) - Applies atomic update operations and returns { new_value, old_value }
  • delete(groupId, key) - Removes and returns the value (or null)
  • list(groupId) - Returns array of all values in the group
  • clear(groupId) - Removes all items in the group

Learn more about State


streams

Real-time data channels for pushing updates to connected clients.

interface MotiaStream<TData> {
  get(groupId: string, id: string): Promise<BaseStreamItem<TData> | null>
  set(groupId: string, id: string, data: TData): Promise<StreamSetResult<BaseStreamItem<TData>>>
  delete(groupId: string, id: string): Promise<BaseStreamItem<TData> | null>
  getGroup(groupId: string): Promise<BaseStreamItem<TData>[]>
  update(groupId: string, id: string, data: UpdateOp[]): Promise<StreamSetResult<BaseStreamItem<TData>>>
  send<T>(channel: StateStreamEventChannel, event: StateStreamEvent<T>): Promise<void>
}
await streams.chatMessages.set('room-123', 'msg-456', {
  text: 'Hello!',
  author: 'Alice',
  timestamp: new Date().toISOString()
})
 
const message = await streams.chatMessages.get('room-123', 'msg-456')
 
const messages = await streams.chatMessages.getGroup('room-123')
 
await streams.chatMessages.delete('room-123', 'msg-456')
 
await streams.chatMessages.update('room-123', 'msg-456', [
  { type: 'set', path: 'text', value: 'Updated message' },
])
 
await streams.chatMessages.send(
  { groupId: 'room-123' },
  { type: 'user.typing', data: { userId: 'alice' } }
)

Methods:

  • set(groupId, id, data) - Create or update an item (returns { new_value, old_value })
  • get(groupId, id) - Retrieve an item or null
  • getGroup(groupId) - Get all items in a group
  • delete(groupId, id) - Remove an item
  • update(groupId, id, ops) - Apply atomic update operations
  • send(channel, event) - Send an ephemeral event (e.g., typing indicators, reactions)

Learn more about Streams


traceId

Unique ID for tracking requests across Steps.

export const handler: Handlers<typeof config> = async (req, { traceId, logger }) => {
  logger.info('Processing request', { traceId })
  return { status: 200, body: { traceId } }
}

The trace ID is automatically generated for each request and passed through all Steps in the workflow. Use it to correlate logs, state, and events.


trigger

Information about what triggered the current handler execution.

interface TriggerInfo {
  type: 'api' | 'queue' | 'cron' | 'state' | 'stream'
}

getData

Extract the data payload from the input. Useful in queue and stream handlers.

const data = ctx.getData()

match

Route execution based on trigger type. See Multi-Trigger Handler with match() above.

type MatchHandlers<TInput, TEnqueueData, TResult> = {
  queue?: (input) => Promise<void>
  api?: (request) => Promise<TResult>
  cron?: () => Promise<void>
  state?: (input) => Promise<TResult>
  stream?: (input) => Promise<TResult>
  default?: (input) => Promise<TResult | void>
}

Middleware

Intercepts API requests before and after the handler.

import { ApiMiddleware } from 'motia'
 
export const authMiddleware: ApiMiddleware = async (req, ctx, next) => {
  const token = req.headers.authorization
 
  if (!token) {
    return { status: 401, body: { error: 'Unauthorized' } }
  }
 
  return await next()
}

Parameters:

  • req - Request object
  • ctx - Context object
  • next - Function to call the next middleware/handler

Returns: Response object

Learn more about Middleware


Request Object (ApiRequest)

API handlers receive a request object with these fields.

interface ApiRequest<TBody = unknown> {
  pathParams: Record<string, string>
  queryParams: Record<string, string | string[]>
  body: TBody
  headers: Record<string, string | string[]>
}
export const handler: Handlers<typeof config> = async (req, ctx) => {
  const userId = req.pathParams.id
  const page = req.queryParams.page
  const limit = req.queryParams.limit
  const { name, email } = req.body
  const auth = req.headers.authorization
 
  return { status: 200, body: { userId, name } }
}

Fields:

  • pathParams - Object with path parameters (e.g., :id from /users/:id)
  • queryParams - Object with query string params (values can be string or array)
  • body - Parsed request body (validated against bodySchema if defined)
  • headers - Object with request headers (values can be string or array)

Response Object (ApiResponse)

API handlers must return an object with these fields.

type ApiResponse<TStatus extends number, TBody> = {
  status: TStatus
  headers?: Record<string, string>
  body: TBody
}
return {
  status: 200,
  body: { id: '123', name: 'Alice' },
  headers: {
    'Cache-Control': 'max-age=3600',
    'X-Custom-Header': 'value'
  }
}

Fields:

  • status - HTTP status code (200, 201, 400, 404, 500, etc.)
  • body - Response data (will be JSON-encoded automatically)
  • headers - Optional custom headers

Stream Configuration

Define real-time data streams for your app.

interface StreamConfig {
  name: string
  schema: StepSchemaInput
  baseConfig: { storageType: 'default' }
  onJoin?: (subscription, context, authContext?) => StreamJoinResult
  onLeave?: (subscription, context, authContext?) => void
}
src/chat-messages.stream.ts
import { StreamConfig } from 'motia'
import { z } from 'zod'
 
export const config: StreamConfig = {
  name: 'chatMessages',
  schema: z.object({
    text: z.string(),
    author: z.string(),
    timestamp: z.string()
  }),
  baseConfig: {
    storageType: 'default'
  }
}

Fields:

  • name - Unique stream name (used in ctx.streams.<name>)
  • schema - Zod schema (TS/JS) or JSON Schema (Python) for data validation
  • baseConfig.storageType - Always 'default' (custom storage coming soon)
  • onJoin - Optional callback when a client subscribes
  • onLeave - Optional callback when a client unsubscribes

File naming:

  • TypeScript/JavaScript: *.stream.ts or *.stream.js
  • Python: *_stream.py

CLI Commands

For CLI usage, see the CLI Reference.


Common Patterns

Enqueue Types

You can enqueue topics as strings or objects with labels.

enqueues: ['user.created', 'email.sent']
 
enqueues: [
  { topic: 'order.approved', label: 'Auto-approved' },
  { topic: 'order.rejected', label: 'Requires review', conditional: true }
]

The label and conditional fields are for iii development console visualization only. They don't affect execution.


Query Parameters

Document query params for the iii development console.

queryParams: [
  { name: 'page', description: 'Page number for pagination' },
  { name: 'limit', description: 'Number of items per page' },
  { name: 'sort', description: 'Sort field (e.g., createdAt, name)' }
]

This metadata is used by the iii development console.


Include Files

Bundle files with your Step (useful for templates, assets, binaries).

includeFiles: [
  './templates/email.html',
  './assets/*.png',
  '../../lib/stockfish'
]

Files are copied into the deployment bundle and accessible at runtime.


What's Next?