Motia Icon

API Reference

Complete API reference for the Motia framework

Motia v1.0 migration: Upgrading from 0.17? Follow the 0.17 to 1.0 migration guide and handler migration guide.

Everything you need to know about Motia's APIs. This reference covers all the types, methods, and configurations available when building with Motia.

If you're new to Motia, start with the Steps guide to understand the basics.

Package exports

For a concise list of npm entrypoints and exports, see motia Package Exports.

Step Configuration

Every Step needs a config. The unified StepConfig type works for all step types -- the triggers array determines what activates the step.

StepConfig

type StepConfig = {
  name: string
  description?: string
  triggers: readonly TriggerConfig[]
  enqueues?: readonly Enqueue[]
  virtualEnqueues?: readonly Enqueue[]
  virtualSubscribes?: readonly string[]
  flows?: readonly string[]
  includeFiles?: readonly string[]
}

Required fields:

  • name - Unique identifier for this Step
  • triggers - Array of triggers that activate this step (HTTP, queue, cron, state, stream)

Optional fields:

  • description - Human-readable description
  • enqueues - Topics this Step can enqueue
  • virtualEnqueues - Topics shown in the iii development console but not actually enqueued (gray connections)
  • virtualSubscribes - Topics shown in the iii development console for flow visualization
  • flows - Flow names for iii development console grouping
  • includeFiles - Files to bundle with this Step (supports glob patterns, relative to Step file)

TriggerConfig

Triggers define how a step gets activated. A step can have multiple triggers.

type TriggerConfig = QueueTrigger | HttpTrigger | CronTrigger | StateTrigger | StreamTrigger

HttpTrigger

Use this for HTTP endpoints.

type HttpTrigger = {
  type: 'http'
  path: string
  method: HttpRouteMethod
  bodySchema?: StepSchemaInput
  responseSchema?: Record<number, StepSchemaInput>
  queryParams?: readonly QueryParam[]
  middleware?: readonly HttpMiddleware[]
  condition?: TriggerCondition
}

QueueTrigger

Use this for background jobs and event-driven tasks.

type QueueTrigger = {
  type: 'queue'
  topic: string
  input?: StepSchemaInput
  condition?: TriggerCondition
  config?: Partial<QueueConfig>
}

CronTrigger

Use this for scheduled tasks.

type CronTrigger = {
  type: 'cron'
  expression: string
  condition?: TriggerCondition
}

Use crontab.guru to build cron expressions.

StateTrigger

Use this to trigger steps based on state changes.

type StateTrigger = {
  type: 'state'
  condition?: TriggerCondition
}

StreamTrigger

Use this to trigger steps from stream events.

type StreamTrigger = {
  type: 'stream'
  streamName: string
  groupId?: string
  itemId?: string
  condition?: TriggerCondition
}

Trigger Helper Functions

Use these helpers for concise trigger definitions:

import { http, queue, cron, state, stream } from 'motia'
 
http(method: HttpRouteMethod, path: string, options?: HttpOptions, condition?: TriggerCondition): HttpTrigger
queue(topic: string, options?: { input?: StepSchemaInput; config?: Partial<QueueConfig> }, condition?: TriggerCondition): QueueTrigger
cron(expression: string, condition?: TriggerCondition): CronTrigger
state(condition?: TriggerCondition): StateTrigger
stream(streamName: string, condition?: TriggerCondition): StreamTrigger

Enqueue Type

type Enqueue = string | { topic: string; label?: string; conditional?: boolean }
type EnqueueData<T> = { topic: string; data: T; messageGroupId?: string }

Config Examples

import { StepConfig, Handlers, http, queue, cron } from 'motia'
 
export const config = {
  name: 'CreateUser',
  description: 'Creates a new user',
  triggers: [
    http('POST', '/users', {
      bodySchema: z.object({ name: z.string() }),
      responseSchema: {
        201: z.object({ id: z.string(), name: z.string() })
      },
      middleware: [authMiddleware],
      queryParams: [{ name: 'invite', description: 'Invite code' }],
    }),
  ],
  enqueues: ['user.created'],
  virtualEnqueues: ['notification.sent'],
  virtualSubscribes: ['user.invited'],
  flows: ['user-management'],
  includeFiles: ['../../assets/template.html'],
} as const satisfies StepConfig

Queue Step Config Example

import { StepConfig, queue } from 'motia'
 
export const config = {
  name: 'ProcessOrder',
  description: 'Processes new orders',
  triggers: [
    queue('order.created', {
      input: z.object({ orderId: z.string(), amount: z.number() }),
      config: { type: 'fifo', maxRetries: 3, visibilityTimeout: 90 },
    }),
  ],
  enqueues: ['order.processed'],
  virtualEnqueues: ['payment.initiated'],
  virtualSubscribes: ['order.cancelled'],
  flows: ['orders'],
  includeFiles: ['./templates/*.html'],
} as const satisfies StepConfig

Queue config options:

  • type - 'fifo' or 'standard' (default: 'standard')
  • maxRetries - Max retry attempts (default: 3)
  • visibilityTimeout - Seconds before message becomes visible again (default: 900)
  • delaySeconds - Delay before message becomes visible, 0-900 (default: 0)
  • concurrency - Max parallel message processing per topic
  • backoffType - Retry backoff strategy (e.g., 'exponential')
  • backoffDelayMs - Base delay in ms for retry backoff

Cron Step Config Example

import { StepConfig, cron } from 'motia'
 
export const config = {
  name: 'DailyReport',
  description: 'Generates daily reports at 9 AM',
  triggers: [
    cron('0 0 9 * * * *'),
  ],
  enqueues: ['report.generated'],
  virtualEnqueues: ['email.sent'],
  virtualSubscribes: ['report.requested'],
  flows: ['reporting'],
  includeFiles: ['./templates/report.html'],
} as const satisfies StepConfig

Multi-Trigger Step Config Example

A single step can respond to multiple trigger types:

import { StepConfig, http, queue, cron } from 'motia'
 
export const config = {
  name: 'UserSync',
  description: 'Syncs user data from multiple sources',
  triggers: [
    http('POST', '/users/sync'),
    queue('user.updated'),
    cron('0 0 */6 * * * *'),
  ],
  enqueues: ['user.synced'],
  flows: ['user-management'],
} as const satisfies StepConfig

Handlers

Handlers are the functions that execute your business logic. Use the Handlers type with your config for full type safety.

Handlers Type

type Handlers<TConfig extends StepConfig> = (
  input: InferHandlerInput<TConfig>,
  ctx: FlowContext<InferHandlerInput<TConfig>>,
) => Promise<HttpResponse | void>

The handler signature is unified -- the input type is inferred from the trigger that activated the step. Use ctx.match() or ctx.is to differentiate between trigger types in multi-trigger steps. Note that enqueue, logger, stateManager, and streams are now standalone imports from motia rather than context properties.


HTTP Step Handler

Receives a request, returns a response.

import { StepConfig, Handlers, http, enqueue } from 'motia'
 
export const config = {
  name: 'CreateUser',
  triggers: [http('POST', '/users')],
  enqueues: ['user.created'],
} as const satisfies StepConfig
 
export const handler: Handlers<typeof config> = async ({ request }, { traceId }) => {
  const { name, email } = request.body
  const userId = crypto.randomUUID()
 
  await enqueue({
    topic: 'user.created',
    data: { userId, email }
  })
 
  return {
    status: 201,
    body: { id: userId, name, email },
    headers: { 'X-Request-ID': traceId }
  }
}

Queue Step Handler

Receives queue data, processes it. No return value.

import { StepConfig, Handlers, queue, enqueue, logger, stateManager } from 'motia'
 
export const config = {
  name: 'ProcessOrder',
  triggers: [queue('order.created', { input: z.object({ orderId: z.string(), amount: z.number() }) })],
  enqueues: ['order.processed'],
} as const satisfies StepConfig
 
export const handler: Handlers<typeof config> = async (input, ctx) => {
  const data = ctx.getData()
  const { orderId, amount } = data
 
  logger.info('Processing order', { orderId, amount })
 
  await stateManager.set('orders', orderId, {
    id: orderId,
    amount,
    status: 'processed'
  })
 
  await enqueue({
    topic: 'order.processed',
    data: { orderId }
  })
}

Cron Step Handler

Runs on a schedule. Uses standalone imports for logging and state management.

import { StepConfig, Handlers, cron, logger, stateManager } from 'motia'
 
export const config = {
  name: 'DailyCleanup',
  triggers: [cron('0 0 0 * * * *')],
  enqueues: [],
} as const satisfies StepConfig
 
export const handler: Handlers<typeof config> = async (input) => {
  logger.info('Running daily cleanup')
 
  const oldOrders = await stateManager.list('orders')
  const cutoff = Date.now() - (30 * 24 * 60 * 60 * 1000)
 
  for (const order of oldOrders) {
    if (order.createdAt < cutoff) {
      await stateManager.delete('orders', order.id)
    }
  }
}

Multi-Trigger Handler with match()

For steps with multiple triggers, use ctx.match() to handle each trigger type:

import { StepConfig, Handlers, http, queue, cron, stateManager } from 'motia'
 
export const config = {
  name: 'UserSync',
  triggers: [
    http('POST', '/users/sync'),
    queue('user.updated'),
    cron('0 0 */6 * * * *'),
  ],
  enqueues: ['user.synced'],
} as const satisfies StepConfig
 
export const handler: Handlers<typeof config> = async (input, ctx) => {
  return ctx.match({
    http: async ({ request }) => {
      const { userId } = request.body
      await syncUser(userId)
      return { status: 200, body: { synced: true } }
    },
    queue: async (data) => {
      const payload = ctx.getData()
      await syncUser(payload.userId)
    },
    cron: async () => {
      const allUsers = await stateManager.list('users')
      for (const user of allUsers) {
        await syncUser(user.id)
      }
    },
  })
}

You can also use ctx.is for simpler checks:

if (ctx.is.http(input)) {
  return { status: 200, body: { ok: true } }
}
if (ctx.is.queue(input)) {
  const data = ctx.getData()
}
if (ctx.is.cron(input)) {
  // scheduled execution
}

Handler Context (FlowContext)

Every handler receives an optional context object (ctx in TypeScript/JavaScript, context in Python). The enqueue, logger, stateManager, and streams are now standalone imports from motia rather than context properties.

interface FlowContext<TInput> {
  traceId: string
  trigger: TriggerInfo
  is: {
    queue: (input) => boolean
    http: (input) => boolean
    cron: (input) => boolean
    state: (input) => boolean
    stream: (input) => boolean
  }
  getData: () => ExtractDataPayload<TInput>
  match: <TResult>(handlers: MatchHandlers) => Promise<TResult | void>
}

enqueue

Trigger other Steps by publishing messages to topics. Import directly from motia:

import { enqueue } from 'motia'
 
await enqueue({
  topic: 'order.created',
  data: { orderId: '123', total: 99.99 }
})
 
await enqueue({
  topic: 'order.processing',
  data: { orderId: '123', items: ['item1', 'item2'] },
  messageGroupId: 'user-456'
})

FIFO queues: When enqueuing to a topic that has a FIFO queue subscriber, you must include messageGroupId. Messages with the same messageGroupId are processed sequentially. Different groups are processed in parallel.

The data must match the input schema of Steps subscribing to that topic.


logger

Structured logging with automatic trace ID correlation. Import directly from motia:

import { logger } from 'motia'
 
logger.info('User created', { userId: '123', email: 'user@example.com' })
logger.warn('Rate limit approaching', { current: 95, limit: 100 })
logger.error('Payment failed', { error: err.message, orderId: '456' })
logger.debug('Cache miss', { key: 'user:123' })

All logs are automatically tagged with:

  • Timestamp
  • Step name
  • Trace ID
  • Any metadata you pass

Learn more about Observability


stateManager

Persistent key-value storage shared across Steps. Import directly from motia:

import { stateManager } from 'motia'
interface InternalStateManager {
  get<T>(groupId: string, key: string): Promise<T | null>
  set<T>(groupId: string, key: string, value: T): Promise<StreamSetResult<T> | null>
  update<T>(groupId: string, key: string, ops: UpdateOp[]): Promise<StreamSetResult<T> | null>
  delete<T>(groupId: string, key: string): Promise<T | null>
  list<T>(groupId: string): Promise<T[]>
  clear(groupId: string): Promise<void>
}
 
type StreamSetResult<T> = { new_value: T; old_value: T | null }
 
type UpdateOp =
  | { type: 'set', path: string, value: any }
  | { type: 'merge', path?: string, value: any }
  | { type: 'increment', path: string, by: number }
  | { type: 'decrement', path: string, by: number }
  | { type: 'remove', path: string }
import { stateManager } from 'motia'
 
await stateManager.set('users', 'user-123', { name: 'Alice', email: 'alice@example.com' })
 
const user = await stateManager.get<User>('users', 'user-123')
 
const allUsers = await stateManager.list<User>('users')
 
await stateManager.update('users', 'user-123', [
  { type: 'set', path: 'name', value: 'Bob' },
  { type: 'increment', path: 'loginCount', by: 1 },
])
 
await stateManager.delete('users', 'user-123')
 
await stateManager.clear('users')

Methods:

  • stateManager.get(groupId, key) - Returns the value or null
  • stateManager.set(groupId, key, value) - Stores the value and returns { new_value, old_value }
  • stateManager.update(groupId, key, ops) - Applies atomic update operations and returns { new_value, old_value }
  • stateManager.delete(groupId, key) - Removes and returns the value (or null)
  • stateManager.list(groupId) - Returns array of all values in the group
  • stateManager.clear(groupId) - Removes all items in the group

Learn more about State


Streams

Real-time data channels for pushing updates to connected clients. Stream instances are imported directly from their .stream.ts files:

import { chatMessagesStream } from './chat-messages.stream'
interface MotiaStream<TData> {
  get(groupId: string, id: string): Promise<BaseStreamItem<TData> | null>
  set(groupId: string, id: string, data: TData): Promise<StreamSetResult<BaseStreamItem<TData>>>
  delete(groupId: string, id: string): Promise<BaseStreamItem<TData> | null>
  getGroup(groupId: string): Promise<BaseStreamItem<TData>[]>
  update(groupId: string, id: string, data: UpdateOp[]): Promise<StreamSetResult<BaseStreamItem<TData>>>
  send<T>(channel: StateStreamEventChannel, event: StateStreamEvent<T>): Promise<void>
}
// Import the stream instance from its stream file
import { chatMessagesStream } from './chat-messages.stream'
 
await chatMessagesStream.set('room-123', 'msg-456', {
  text: 'Hello!',
  author: 'Alice',
  timestamp: new Date().toISOString()
})
 
const message = await chatMessagesStream.get('room-123', 'msg-456')
 
const messages = await chatMessagesStream.getGroup('room-123')
 
await chatMessagesStream.delete('room-123', 'msg-456')
 
await chatMessagesStream.update('room-123', 'msg-456', [
  { type: 'set', path: 'text', value: 'Updated message' },
])
 
await chatMessagesStream.send(
  { groupId: 'room-123' },
  { type: 'user.typing', data: { userId: 'alice' } }
)

Methods:

  • stream.set(groupId, id, data) - Create or update an item (returns { new_value, old_value })
  • stream.get(groupId, id) - Retrieve an item or null
  • stream.getGroup(groupId) - Get all items in a group
  • stream.delete(groupId, id) - Remove an item
  • stream.update(groupId, id, ops) - Apply atomic update operations
  • stream.send(channel, event) - Send an ephemeral event (e.g., typing indicators, reactions)

Learn more about Streams


traceId

Unique ID for tracking requests across Steps.

import { logger } from 'motia'
 
export const handler: Handlers<typeof config> = async ({ request }, { traceId }) => {
  logger.info('Processing request', { traceId })
  return { status: 200, body: { traceId } }
}

The trace ID is automatically generated for each request and passed through all Steps in the workflow. Use it to correlate logs, state, and events.


trigger

Information about what triggered the current handler execution.

interface TriggerInfo {
  type: 'http' | 'queue' | 'cron' | 'state' | 'stream'
}

getData

Extract the data payload from the input. Useful in queue and stream handlers.

const data = ctx.getData()

match

Route execution based on trigger type. See Multi-Trigger Handler with match() above.

type MatchHandlers<TInput, TResult> = {
  queue?: (input) => Promise<void>
  http?: (request) => Promise<TResult>
  cron?: () => Promise<void>
  state?: (input) => Promise<TResult>
  stream?: (input) => Promise<TResult>
  default?: (input) => Promise<TResult | void>
}

Middleware

Intercepts API requests before and after the handler.

import { ApiMiddleware } from 'motia'
 
export const authMiddleware: ApiMiddleware = async ({ request }, ctx, next) => {
  const token = request.headers.authorization
 
  if (!token) {
    return { status: 401, body: { error: 'Unauthorized' } }
  }
 
  return await next()
}

Parameters:

  • { request, response } - The MotiaHttpArgs object containing request and response
  • ctx - Context object
  • next - Function to call the next middleware/handler

Returns: Response object

Learn more about Middleware


Handler Input

TypeScript and JavaScript handlers receive MotiaHttpArgs (containing request and response). Python handlers use ApiRequest for standard responses and only use MotiaHttpArgs for streaming/SSE handlers.

interface MotiaHttpArgs<TBody = unknown> {
  request: MotiaHttpRequest<TBody>
  response: MotiaHttpResponse
}
 
interface MotiaHttpRequest<TBody = unknown> {
  pathParams: Record<string, string>
  queryParams: Record<string, string | string[]>
  body: TBody
  headers: Record<string, string | string[]>
  method: string
  requestBody: ChannelReader
}
 
type MotiaHttpResponse = {
  status: (statusCode: number) => void
  headers: (headers: Record<string, string>) => void
  stream: NodeJS.WritableStream
  close: () => void
}
export const handler: Handlers<typeof config> = async ({ request }, ctx) => {
  const userId = request.pathParams.id
  const page = request.queryParams.page
  const limit = request.queryParams.limit
  const { name, email } = request.body
  const auth = request.headers.authorization
 
  return { status: 200, body: { userId, name } }
}

request fields:

  • pathParams / path_params (Python) - Object with path parameters (e.g., :id from /users/:id)
  • queryParams / query_params (Python) - Object with query string params (values can be string or array)
  • body - Parsed request body (validated against bodySchema if defined)
  • headers - Object with request headers (values can be string or array)
  • method - HTTP method string
  • requestBody / request_body (Python) - Raw request body stream (for SSE / streaming)

response fields (for SSE / streaming):

  • status(code) - Set HTTP status code
  • headers(headers) - Set response headers
  • stream / writer.stream (Python) - Writable stream for sending data
  • close() - Close the response stream

Response Object (ApiResponse)

HTTP handlers must return an object with these fields.

type ApiResponse<TStatus extends number = number, TBody = any> = {
  status: TStatus
  headers?: Record<string, string>
  body: TBody
}
return {
  status: 200,
  body: { id: '123', name: 'Alice' },
  headers: {
    'Cache-Control': 'max-age=3600',
    'X-Custom-Header': 'value'
  }
}

Fields:

  • status - HTTP status code (200, 201, 400, 404, 500, etc.)
  • body - Response data (will be JSON-encoded automatically)
  • headers - Optional custom headers

Server-Sent Events (SSE)

HTTP handlers can stream responses using Server-Sent Events. Instead of returning a response object, use the response object from MotiaHttpArgs to set headers and write to the stream.

src/sse-example.step.ts
import { type Handlers, http, type StepConfig, logger } from 'motia'
 
export const config = {
  name: 'SSE Example',
  description: 'Streams data back to the client as SSE',
  flows: ['sse-example'],
  triggers: [http('POST', '/sse')],
  enqueues: [],
} as const satisfies StepConfig
 
export const handler: Handlers<typeof config> = async ({ request, response }) => {
  logger.info('SSE request received')
 
  response.status(200)
  response.headers({
    'content-type': 'text/event-stream',
    'cache-control': 'no-cache',
    connection: 'keep-alive',
  })
 
  const chunks: string[] = []
  for await (const chunk of request.requestBody.stream) {
    chunks.push(Buffer.from(chunk).toString('utf-8'))
  }
  const body = chunks.join('').trim()
 
  const items = ['alpha', 'bravo', 'charlie']
  for (const item of items) {
    response.stream.write(`event: item\ndata: ${JSON.stringify({ item })}\n\n`)
    await new Promise((resolve) => setTimeout(resolve, 500))
  }
 
  response.stream.write(`event: done\ndata: ${JSON.stringify({ total: items.length })}\n\n`)
  response.close()
}

Key differences from standard HTTP handlers:

  • Destructure both request and response from the first argument
  • Use response.status() and response.headers() to set the status and headers
  • Write SSE-formatted data to response.stream (TS/JS) or response.writer.stream (Python)
  • Call response.close() when done streaming
  • Do not return a response object — the response is streamed directly

Stream Configuration

Define real-time data streams for your app.

interface StreamConfig {
  name: string
  schema: StepSchemaInput
  baseConfig: { storageType: 'default' }
  onJoin?: (subscription, context, authContext?) => StreamJoinResult
  onLeave?: (subscription, context, authContext?) => void
}
src/chat-messages.stream.ts
import { StreamConfig } from 'motia'
import { z } from 'zod'
 
export const config: StreamConfig = {
  name: 'chatMessages',
  schema: z.object({
    text: z.string(),
    author: z.string(),
    timestamp: z.string()
  }),
  baseConfig: {
    storageType: 'default'
  }
}

Fields:

  • name - Unique stream name (the stream instance is imported from the stream file)
  • schema - Zod schema (TS/JS) or JSON Schema (Python) for data validation
  • baseConfig.storageType - Always 'default' (custom storage coming soon)
  • onJoin - Optional callback when a client subscribes
  • onLeave - Optional callback when a client unsubscribes

File naming:

  • TypeScript/JavaScript: *.stream.ts or *.stream.js
  • Python: *_stream.py

CLI Commands

For CLI usage, see the CLI Reference.


Common Patterns

Enqueue Types

You can enqueue topics as strings or objects with labels.

enqueues: ['user.created', 'email.sent']
 
enqueues: [
  { topic: 'order.approved', label: 'Auto-approved' },
  { topic: 'order.rejected', label: 'Requires review', conditional: true }
]

The label and conditional fields are for iii development console visualization only. They don't affect execution.


Query Parameters

Document query params for the iii development console.

queryParams: [
  { name: 'page', description: 'Page number for pagination' },
  { name: 'limit', description: 'Number of items per page' },
  { name: 'sort', description: 'Sort field (e.g., createdAt, name)' }
]

This metadata is used by the iii development console.


Include Files

Bundle files with your Step (useful for templates, assets, binaries).

includeFiles: [
  './templates/email.html',
  './assets/*.png',
  '../../lib/stockfish'
]

Files are copied into the deployment bundle and accessible at runtime.


What's Next?