Featured in Vercel OSS 2025

A New Paradigm in Backend Engineering

You’re one
Step
Step
Single Core Primitive
away from replacing 10+  
frameworks.
 Build 
queues.
streams.
stateful backend.
workflows.
AI agents.
observable apps.
scalable apps.
APIs.
background jobs.

One STEP to build any backend.

A Step is the core primitive in Motia. Instead of juggling separate frameworks for APIs, background jobs, queues, or workflows, you define everything in one place: how it runs, when it runs, where it runs, and what it does.

Composable
Reusable
Observable
Multi-language
Scalable

Trigger - How Execution Begins

Every Step has a type that defines how it triggers. Change the type, and the same pattern works for different use cases. A Step file contains two parts:

Config → defines when and how the Step runs, and gives it a unique name
Handler → the function that executes your business logic
Read docs
Motia automatically discovers any file ending in  .step.ts  .step.js or _step.py  The filename tells Motia to load it, and the name in the config uniquely identifies the Step inside your system.
api.step.ts
import { ApiRouteConfig, Handlers } from 'motia'

export const config: ApiRouteConfig = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST', 
  emits: ['message.sent'],
  flows: ['messaging']
}

export const handler: Handlers['SendMessage'] = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api.step.js
exports.config = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

exports.handler = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api_step.py
config = {
    name: 'SendMessage',
    type: 'api',
    path: '/messages',
    "method": "POST",
    "emits": ["message.sent"],
    "flows": ["messaging"]
}

async def handler(req, context):
    text = req.get("body", {}).get("text")
    user_id = req.get("body", {}).get("userId")
    message = {"text": text, "userId": user_id, "status": "sent"}
    await context.state.set("messages", user_id, message)
    await context.streams.messages.set(user_id, message)
    context.logger.info("Message sent", {"userId": user_id})
    await context.emit({"topic": "message.sent", "data": message})
    return {"status": 201, "body": message}
event.step.ts
import { EventConfig, Handlers } from 'motia'

export const config: EventConfig = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

export const handler: Handlers['ProcessMessage'] = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
event.step.js
exports.config = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

exports.handler = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
event_step.py
config = {
    "name": "ProcessMessage",
    type: 'event',
    "subscribes": ["message.sent"],
    "emits": ["message.processed"],
    "flows": ["messaging"]
}

async def handler(input_data, context):
    text = input_data.get("text")
    user_id = input_data.get("userId")
    status = input_data.get("status")
    processed_message = {"text": text, "userId": user_id, "status": "processed"}
    await context.state.set("processed", user_id, processed_message)
    await context.streams.processed.set(user_id, processed_message)
    context.logger.info("Message processed", {"userId": user_id})
    await context.emit({ "topic": "message.processed", "data": processed_message })
cron.step.ts
import { CronConfig, Handlers } from 'motia'

export const config: CronConfig = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

export const handler: Handlers['DailySummary'] = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
cron.stpe.js
exports.config = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

exports.handler = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
cron_step.py
config = {
    "name": "DailySummary",
    type: 'cron',
    "cron": "0 9 * * *",
    "emits": ["summary.generated"],
    "flows": ["messaging"]
}

async def handler(context):
    messages = await context.state.get_group("messages")
    summary = {"total": len(messages), "status": "completed"}
    await context.state.set("summaries", "daily", summary)
    await context.streams.summary.set("latest", summary)
    context.logger.info("Daily summary generated", {"total": summary["total"]})
    await context.emit({  "topic": "summary.generated", "data": summary })

Handler - How It Performs Logic

This is where your business logic lives. The handler function receives input data and a context object with everything you need: logger for tracking, emit for triggering other Steps, state for storing data, and streams for real-time updates.
Read docs
Motia automatically discovers any file ending in  .step.ts  .step.js or _step.py  The filename tells Motia to load it, and the name in the config uniquely identifies the Step inside your system.
api.step.ts
import { ApiRouteConfig, Handlers } from 'motia'

export const config: ApiRouteConfig = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

export const handler: Handlers['SendMessage'] = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api.step.js
 exports.config = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

exports.handler = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api_step.py
config = {
    "name": "SendMessage",
    "type": "api",
    "path": "/messages",
    "method": "POST",
    "emits": ["message.sent"],
    "flows": ["messaging"]
}

async def handler(req, context):
    text = req.get("body", {}).get("text")
    user_id = req.get("body", {}).get("userId")
    message = {"text": text, "userId": user_id, "status": "sent"}
    await context.state.set("messages", user_id, message)
    await context.streams.messages.set(user_id, message)
    context.logger.info("Message sent", {"userId": user_id})
    await context.emit({"topic": "message.sent", "data": message})
    return {"status": 201, "body": message}
event.step.ts
import { EventConfig, Handlers } from 'motia'

export const config: EventConfig = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

export const handler: Handlers['ProcessMessage'] = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
event.step.js
exports.config = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  input: z.object({ text: z.string(), userId: z.string(), status: z.string() }),
  emits: ['message.processed'],
  flows: ['messaging']
}

exports.handler = async (input, { emit, logger, state, streams }) => {
 const { text, userId, status } = input
 const processedMessage = { text, userId, status: 'processed' }
 await state.set('processed', userId, processedMessage)
 await streams.processed.set(userId, processedMessage)
 logger.info('Message processed', { userId })
 await emit({ topic: 'message.processed', data: processedMessage })
}
event_step.py
config = {
    "name": "ProcessMessage",
    "type": "event",
    "subscribes": ["message.sent"],
    "emits": ["message.processed"],
    "flows": ["messaging"]
}

async def handler(input_data, context):
    text = input_data.get("text")
    user_id = input_data.get("userId")
    status = input_data.get("status")
    processed_message = {"text": text, "userId": user_id, "status": "processed"
    await context.state.set("processed", user_id, processed_message)
    await context.streams.processed.set(user_id, processed_message)
    context.logger.info("Message processed", {"userId": user_id})
    await context.emit({ "topic": "message.processed", "data": processed_message })
cron.step.ts
import { CronConfig, Handlers } from 'motia'

export const config: CronConfig = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

export const handler: Handlers['DailySummary'] = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  >const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
cron.step.js
exports.config = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

exports.handler = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
cron_step.py
config = {
    "name": "DailySummary",
    "type": "cron",
    "cron": "0 9 * * *",
    "emits": ["summary.generated"],
    "flows": ["messaging"]
}

async def handler(context):
    messages = await context.state.get_group("messages")
    summary = {"total": len(messages), "status": "completed"}
    await context.state.set("summaries", "daily", summary)
    await context.streams.summary.set("latest", summary)
    context.logger.info("Daily summary generated", {"total": summary["total"]})
    await context.emit({ "topic": "summary.generated", "data": summary })

Emit - How It Outputs Data

Send data to other Steps using
await emit({ topic: 'event.name', data: {...} }).
Any Step that subscribes to that topic will receive your data and run automatically. This is how Steps talk to each other.
Read docs
Motia automatically discovers any file ending in  .step.ts  .step.js or _step.py  The filename tells Motia to load it, and the name in the config uniquely identifies the Step inside your system.
api.step.ts
import { ApiRouteConfig, Handlers } from 'motia'

export const config: ApiRouteConfig = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

export const handler: Handlers['SendMessage'] = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api.step.js
exports.config = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

exports.handler = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api_step.py
config = {
    "name": "SendMessage",
    "type": "api",
    "path": "/messages",
    "method": "POST",
    "emits": ["message.sent"],
    "flows": ["messaging"]
}

async def handler(req, context):
    text = req.get("body", {}).get("text")
    user_id = req.get("body", {}).get("userId")
    message = {"text": text, "userId": user_id, "status": "sent"}
    await context.state.set("messages", user_id, message)
    await context.streams.messages.set(user_id, message)
    context.logger.info("Message sent", {"userId": user_id})
    await context.emit({"topic": "message.sent", "data": message})
    return {"status": 201, "body": message}
event.step.ts
import { EventConfig, Handlers } from 'motia'

export const config: EventConfig = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

export const handler: Handlers['ProcessMessage'] = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
event.step.js
exports.config = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

exports.handler = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
event_step.py
config = {
    "name": "ProcessMessage",
    "type": "event",
    "subscribes": ["message.sent"],
    "emits": ["message.processed"],
    "flows": ["messaging"]
}

async def handler(input_data, context):
    text = input_data.get("text")
    user_id = input_data.get("userId")
    status = input_data.get("status")
    processed_message = {"text": text, "userId": user_id, "status": "processed"
    await context.state.set("processed", user_id, processed_message)
    await context.streams.processed.set(user_id, processed_message)
    context.logger.info("Message processed", {"userId": user_id})
    await context.emit({ "topic": "message.processed", "data": processed_message })
cron.step.ts
import { CronConfig, Handlers } from 'motia'

export const config: CronConfig = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

export const handler: Handlers['DailySummary'] = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
cron.step.js
exports.config = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

exports.handler = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
cron_step.py
config = {
    "name": "DailySummary",
    "type": "cron",
    "cron": "0 9 * * *",
    "emits": ["summary.generated"],
    "flows": ["messaging"]
}

async def handler(context):
    >messages = await context.state.get_group("messages")
    summary = {"total": len(messages), "status": "completed"}
    await context.state.set("summaries", "daily", summary)
    await context.streams.summary.set("latest", summary)
    context.logger.info("Daily summary generated", {"total": summary["total"]})
    await context.emit({ "topic": "summary.generated", "data": summary })

State - How It Stores Data

Store and retrieve data across Steps using
await state.set(key, value) and await state.get(key). Perfect for tracking workflow progress, caching results, or sharing data between Steps without setting up a database.
Read docs
Motia automatically discovers any file ending in  .step.ts  .step.js or _step.py  The filename tells Motia to load it, and the name in the config uniquely identifies the Step inside your system.
api.step.ts
import { ApiRouteConfig, Handlers } from 'motia'

export const config: ApiRouteConfig = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

export const handler: Handlers['SendMessage'] = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api.step.js
exports.config = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

exports.handler = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api_step.py
config = {
    "name": "SendMessage",
    "type": "api",
    "path": "/messages",
    "method": "POST",
    "emits": ["message.sent"],
    "flows": ["messaging"]
}

async def handler(req, context):
    text = req.get("body", {}).get("text")
    user_id = req.get("body", {}).get("userId")
    message = {"text": text, "userId": user_id, "status": "sent"}
    await context.state.set("messages", user_id, message)
    await context.streams.messages.set(user_id, message)
    context.logger.info("Message sent", {"userId": user_id})
    await context.emit({"topic": "message.sent", "data": message})
    return {"status": 201, "body": message}
event.step.ts
import { EventConfig, Handlers } from 'motia'

export const config: EventConfig = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

export const handler: Handlers['ProcessMessage'] = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({  topic: 'message.processed', data: processedMessage })
}
event.step.js
exports.config = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

exports.handler = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
event_step.py
config = {
    "name": "ProcessMessage",
    "type": "event",
    "subscribes": ["message.sent"],
    "emits": ["message.processed"],
    "flows": ["messaging"]
}

async def handler(input_data, context):
    text = input_data.get("text")
    user_id = input_data.get("userId")
    status = input_data.get("status")
    processed_message = {"text": text, "userId": user_id, "status": "processed"}
    await context.state.set("processed", user_id, processed_message)
    await context.streams.processed.set(user_id, processed_message)
    context.logger.info("Message processed", {"userId": user_id})
    await context.emit({ "topic": "message.processed",  "data": processed_message })
cron.step.ts
import { CronConfig, Handlers } from 'motia'

export const config: CronConfig = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

export const handler: Handlers['DailySummary'] = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
cron.step.js
exports.config = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

exports.handler = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
cron_step.py
config = {
    "name": "DailySummary",
    "type": "cron",
    "cron": "0 9 * * *",
    "emits": ["summary.generated"],
    "flows": ["messaging"]
}

async def handler(context):
    messages = await context.state.get_group("messages")
    summary = {"total": len(messages), "status": "completed"}
    await context.state.set("summaries", "daily", summary)
    await context.streams.summary.set("latest", summary)
    context.logger.info("Daily summary generated", {"total": summary["total"]})
    await context.emit({ "topic": "summary.generated", "data": summary })

Logger - How it tracks execution

Every handler gets a logger object. Call logger.info(), logger.warn(), or logger.error() with your message and data. All logs automatically include your Step name, trace ID, and timestamp - making it easy to debug workflows in the Workbench.
Read docs
Motia automatically discovers any file ending in  .step.ts  .step.js or _step.py  The filename tells Motia to load it, and the name in the config uniquely identifies the Step inside your system.
api.step.ts
import { ApiRouteConfig, Handlers } from 'motia'

export const config: ApiRouteConfig = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

export const handler: Handlers['SendMessage'] = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api.step.js
exports.config = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

exports.handler = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api_step.py
config = {
    "name": "SendMessage",
    "type": "api",
    "path": "/messages",
    "method": "POST",
    "emits": ["message.sent"],
    "flows": ["messaging"]
}

async def handler(req, context):
    text = req.get("body", {}).get("text")
    user_id = req.get("body", {}).get("userId")
    message = {"text": text, "userId": user_id, "status": "sent"}
    await context.state.set("messages", user_id, message)
    await context.streams.messages.set(user_id, message)
    context.logger.info("Message sent", {"userId": user_id})
    await context.emit({"topic": "message.sent", "data": message})
    return {"status": 201, "body": message}
event.step.ts
import { EventConfig, Handlers } from 'motia'

export const config: EventConfig = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

export const handler: Handlers['ProcessMessage'] = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
event.step.js
exports.config = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

exports.handler = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
event_step.py
config = {
    "name": "ProcessMessage",
    "type": "event",
    "subscribes": ["message.sent"],
    "emits": ["message.processed"],
    "flows": ["messaging"]
}

async def handler(input_data, context):
    text = input_data.get("text")
    user_id = input_data.get("userId")
    status = input_data.get("status")
    processed_message = {"text": text, "userId": user_id, "status": "processed"}
    await context.state.set("processed", user_id, processed_message)
    await context.streams.processed.set(user_id, processed_message)
    context.logger.info("Message processed", {"userId": user_id})
    await context.emit({  "topic": "message.processed", "data": processed_message })
cron.step.ts
import { CronConfig, Handlers } from 'motia'

export const config: CronConfig = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

export const handler: Handlers['DailySummary'] = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
cron_step.py
exports.config = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

exports.handler = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
api_cron.py
config = {
    "name": "DailySummary",
    "type": "cron",
    "cron": "0 9 * * *",
    "emits": ["summary.generated"],
    "flows": ["messaging"]
}

async def handler(context):
    messages = await context.state.get_group("messages")
    summary = {"total": len(messages), "status": "completed"}
    await context.state.set("summaries", "daily", summary)
    await context.streams.summary.set("latest", summary)
    context.logger.info("Daily summary generated", {"total": summary["total"]})
    await context.emit({ "topic": "summary.generated",  "data": summary })

Streams - How It Broadcasts Live Data

Push updates directly to connected clients with
await streams.mystream.set(key, value). When you update stream data from any Step, all subscribed frontend clients receive the changes instantly - no extra setup needed.
Read docs
Motia automatically discovers any file ending in  .step.ts  .step.js or _step.py  The filename tells Motia to load it, and the name in the config uniquely identifies the Step inside your system.
api.step.ts
import { ApiRouteConfig, Handlers } from 'motia'

export const config: ApiRouteConfig = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

export const handler: Handlers['SendMessage'] = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api.step.js
exports.config = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

exports.handler = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api_step.py
config = {
    "name": "SendMessage",
    "type": "api",
    "path": "/messages",
    "method": "POST",
    "emits": ["message.sent"],
    "flows": ["messaging"]
}

async def handler(req, context):
    text = req.get("body", {}).get("text")
    user_id = req.get("body", {}).get("userId")
    message = {"text": text, "userId": user_id, "status": "sent"}
    await context.state.set("messages", user_id, message)
    await context.streams.messages.set(user_id, message)
    context.logger.info("Message sent", {"userId": user_id})
    await context.emit({"topic": "message.sent", "data": message})
    return {"status": 201, "body": message}
event.step.js
import { EventConfig, Handlers } from 'motia'

export const config: EventConfig = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

export const handler: Handlers['ProcessMessage'] = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
event_step.py
exports.config = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

exports.handler = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
api_event.py
config = {
    "name": "ProcessMessage",
    "type": "event",
    "subscribes": ["message.sent"],
    "emits": ["message.processed"],
    "flows": ["messaging"]
}

async def handler(input_data, context):
    text = input_data.get("text")
    user_id = input_data.get("userId")
    status = input_data.get("status")
    processed_message = {"text": text, "userId": user_id, "status": "processed"}
    await context.state.set("processed", user_id, processed_message)
    await context.streams.processed.set(user_id, processed_message)
    context.logger.info("Message processed", {"userId": user_id})
    await context.emit({ "topic": "message.processed", "data": processed_message })
cron.step.ts
import { CronConfig, Handlers } from 'motia'

export const config: CronConfig = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

export const handler: Handlers['DailySummary'] = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
cron.step.js
exports.config = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

exports.handler = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
cron_step.py
config = {
    "name": "DailySummary",
    "type": "cron",
    "cron": "0 9 * * *",
    "emits": ["summary.generated"],
    "flows": ["messaging"]
}

async def handler(context):
    messages = await context.state.get_group("messages")
    summary = {"total": len(messages), "status": "completed"}
    await context.state.set("summaries", "daily", summary)
    await context.streams.summary.set("latest", summary)
    context.logger.info("Daily summary generated", {"total": summary["total"]})
    await context.emit({ "topic": "summary.generated","data": summary })

Trigger - How Execution Begins

Every Step has a type that defines how it triggers. Change the type, and the same pattern works for different use cases. A Step file contains two parts:

Config → defines when and how the Step runs, and gives it a unique name
Handler → the function that executes your business logic
Read docs
Motia automatically discovers any file ending in  .step.ts  .step.js or _step.py  The filename tells Motia to load it, and the name in the config uniquely identifies the Step inside your system.
api.step.ts
import { ApiRouteConfig, Handlers } from 'motia'

export const config: ApiRouteConfig = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

export const handler: Handlers['SendMessage'] = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api.step.js
exports.config = {
  name: 'SendMessage',
  type: 'api',
  path: '/messages',
  method: 'POST',
  emits: ['message.sent'],
  flows: ['messaging']
}

exports.handler = async (req, { emit, logger, state, streams }) => {
  const { text, userId } = req.body
  const message = { text, userId, status: 'sent' }
  await state.set('messages', userId, message)
  await streams.messages.set(userId, message)
  logger.info('Message sent', { userId })
  await emit({ topic: 'message.sent', data: message })
  return { status: 201, body: message }
}
api_step.py
config = {
    name: 'SendMessage',
    type: 'api',
    path: '/messages',
    "method": "POST",
    "emits": ["message.sent"],
    "flows": ["messaging"]
}

async def handler(req, context):
    text = req.get("body", {}).get("text")
    user_id = req.get("body", {}).get("userId")
    message = {"text": text, "userId": user_id, "status": "sent"}
    await context.state.set("messages", user_id, message)
    await context.streams.messages.set(user_id, message)
    context.logger.info("Message sent", {"userId": user_id})
    await context.emit({"topic": "message.sent", "data": message})
    return {"status": 201, "body": message}
event.step.ts
import { EventConfig, Handlers } from 'motia'

export const config: EventConfig = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

export const handler: Handlers['ProcessMessage'] = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
event.step.js
exports.config = {
  name: 'ProcessMessage',
  type: 'event',
  subscribes: ['message.sent'],
  emits: ['message.processed'],
  flows: ['messaging']
}

exports.handler = async (input, { emit, logger, state, streams }) => {
  const { text, userId, status } = input
  const processedMessage = { text, userId, status: 'processed' }
  await state.set('processed', userId, processedMessage)
  await streams.processed.set(userId, processedMessage)
  logger.info('Message processed', { userId })
  await emit({ topic: 'message.processed', data: processedMessage })
}
event_step.py
config = {
    "name": "ProcessMessage",
    type: 'event',
    "subscribes": ["message.sent"],
   "emits": ["message.processed"],
    "flows": ["messaging"]
}

async def handler(input_data, context):
    text = input_data.get("text")
    user_id = input_data.get("userId")
    status = input_data.get("status")
    processed_message = {"text": text, "userId": user_id, "status": "processed"}
    await context.state.set("processed", user_id, processed_message)
    await context.streams.processed.set(user_id, processed_message)
    context.logger.info("Message processed", {"userId": user_id})
    await context.emit({ "topic": "message.processed", "data": processed_message })
cron.step.ts
import { CronConfig, Handlers } from 'motia'

export const config: CronConfig = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

export const handler: Handlers['DailySummary'] = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
cron.step.js
exports.config = {
  name: 'DailySummary',
  type: 'cron',
  cron: '0 9 * * *',
  emits: ['summary.generated'],
  flows: ['messaging']
}

exports.handler = async ({ emit, state, logger, streams }) => {
  const messages = await state.getGroup('messages')
  const summary = { total: messages.length, status: 'completed' }
  await state.set('summaries', 'daily', summary)
  await streams.summary.set('latest', summary)
  logger.info('Daily summary generated', { total: summary.total })
  await emit({ topic: 'summary.generated', data: summary })
}
cron_step.py
config = {
    "name": "DailySummary",
    type: 'cron',
    "cron": "0 9 * * *",
    "emits": ["summary.generated"],
    "flows": ["messaging"]
}

async def handler(context):
    messages = await context.state.get_group("messages")
    summary = {"total": len(messages), "status": "completed"}
    await context.state.set("summaries", "daily", summary)
    await context.streams.summary.set("latest", summary)
    context.logger.info("Daily summary generated", {"total": summary["total"]})
    await context.emit({  "topic": "summary.generated", "data": summary })
Infinite composability with minimal complexity.
Master these simple core primitives — and you can build any backend system imaginable.
View Manifesto
npx motia@latest create
DEVEX

Built for devs: because backend shouldn’t mean burnout

Multi language orchestration
Connect services in different languages within one workflow.
Workbench
A visual control panel to monitor and manage your backend.
Code is the config
Define infrastructure and workflows in code, avoiding fragile configs.
Type-safe APIs
Typed endpoints with validation ensure safer integrations and fewer errors.
Streaming
Real-time Objects & Events is accessible without extra setup.
Atomic State & Execution
Data is always safe, thanks to all-or-nothing, atomic operations.
Versatility

Build complex patterns quickly: your backend,
delivered with a single core primitive.

Durable Execution
Workflows maintain state through failures, ensuring reliability.
Human in the Loop
Include manual approvals in automated workflows.
Parallelization
Run tasks simultaneously and merge results seamlessly.
Orchestrator with scheduling
Schedule jobs reliably at specific times or intervals.
Saga
Manage business processes with transaction rollback and recovery.
Left Arrow
right Arrow
AI NATIVE DEVELOPMENT

Ship faster with AI

Build, test, and scale AI-driven apps with Motia’s native support.
Custom Cursor Rules
Claude Agents & Claude.md
Built-in AI Patterns
Ready-to-Use Configurations
VELOCITY

From concept to production in under a minute.

Under 60 Seconds to Production
Instant Dev Server
Zero Configuration
Immediate Productivity
REST APIs by Default
Start your project with a single command
npx motia@latest create
Observability

Visibility into every step of your backend.

Build from APIs to fully featured Backends

Your backend evolves step by step — start simple, scale seamlessly.
Define
API Trigger
Runs when an HTTP request hits the path.
Automate
Background Jobs
Background jobs let you handle time-consuming tasks without blocking your API responses
Compose
Compose Multi-step Workflows
Automated workflows that manage complex business logic
Think
Agentic Workflow
Build intelligent agentic workflows that make decisions and automate workflows
Learn
Stream Data Seamlessly
Scale to real-time streaming agents, all powered by the same primitive
SCALABILITY

Scale seamlessly — self-hosted or in the cloud.

Use Motia’s fully managed cloud to deploy workflows instantly, or self-host with complete control.
Deploy your first App
Self-host
Deploy Motia on your infrastructure for control over data and compliance.
Set Up Self-Hosting
Motia Cloud
Select a managed option for easy scaling, updates, and less overhead.
Explore Motia Cloud
BYOC (Bring Your Own Cloud)
Run Motia on your own cloud with full data sovereignty and control.
Coming Soon
RELIABILITY

Fault-tolerant by design.

Automatic Retries
Failed tasks are retried to ensure stability and prevent data loss.
Queue Infra Abstracted
Built-in queuing eliminates the need to manage external brokers like RabbitMQ.
Event-driven Core
A resilient design where events trigger steps reliably.
Shared State Handling
Ensure consistent state in distributed workflows automatically.

Trusted by the community. Built in the open.

Motia is shaped and scaled by a global community of developers who build in public, share examples, and keep the framework moving forward together
0.7.3-beta.136
LICENSED
GITHUB #1
repository of the day
VERCEL INC. // 2025
OPEN SOURCE SOFTWARE PROGRAM

Built for fast-moving devs and modern teams

Anthony Max
Creator @ hmpl-lang.dev

I recently tried @motiadev for the backend – it works pretty well! The user interface looks quite futuristic, as if I'm not making an API, but something technologically advanced like in old anime :)

Stijn Vlerick
Freelance Web Developer / SysAdmin / DevOps @ Vlerico

Great, all-in-one backend platform framework

Jakobala
Backend Developer

Motia is great. I have tested it out. Its a real paradigm shift in how we think backend.

Craig Merry
Computer Technology Analyst III @ Northern California Power Agency

I can easily setup a backend with consistent flows with multiple AI systems. Very useful for quickly getting it together.

Joel Washington
Founder / CEO @Zero & @Shift

After reading your comprehensive architecture and comparing it with the alternatives: Motia is almost eerily perfect for your vision. The alignment is so strong it's like they read your architecture docs and built exactly what you need... This isn't coincidence - it's architectural destiny.

Rituraj Tripathy
Software engineer @Pinelabs

I love the event driven approach to building stuff that motia has taken - went through the code and it looks pretty soli. The unique thing here is how fast one can get his mvp running.

Gaurav Dhiman
Software Engineering Lead @Meta

I am already a big fan of Motia. Motia specially shines in Al Workflows that require multi-agent orchestration, as it supports event driven workflows, which allows you to decouple the agents but still have them communicate by passing payloads through events. Its much easier to maintain workflows which are event driven.

Anthony Max
Creator @ hmpl-lang.dev

I recently tried @motiadev for the backend – it works pretty well! The user interface looks quite futuristic, as if I'm not making an API, but something technologically advanced like in old anime :)

Stijn Vlerick
Freelance Web Developer / SysAdmin / DevOps @ Vlerico

Great, all-in-one backend platform framework

Jakobala
Backend Developer

Motia is great. I have tested it out. Its a real paradigm shift in how we think backend.

Craig Merry
Computer Technology Analyst III @ Northern California Power Agency

I can easily setup a backend with consistent flows with multiple AI systems. Very useful for quickly getting it together.

Joel Washington
Founder / CEO @Zero & @Shift

After reading your comprehensive architecture and comparing it with the alternatives: Motia is almost eerily perfect for your vision. The alignment is so strong it's like they read your architecture docs and built exactly what you need... This isn't coincidence - it's architectural destiny.

Rituraj Tripathy
Software engineer @Pinelabs

I love the event driven approach to building stuff that motia has taken - went through the code and it looks pretty soli. The unique thing here is how fast one can get his mvp running.

Gaurav Dhiman
Software Engineering Lead @Meta

I am already a big fan of Motia. Motia specially shines in Al Workflows that require multi-agent orchestration, as it supports event driven workflows, which allows you to decouple the agents but still have them communicate by passing payloads through events. Its much easier to maintain workflows which are event driven.

Built for fast-moving devs and modern teams

Anthony Max
Creator @ hmpl-lang.dev

I recently tried @motiadev for the backend – it works pretty well! The user interface looks quite futuristic, as if I'm not making an API, but something technologically advanced like in old anime :)

Stijn Vlerick
Freelance Web Developer / SysAdmin / DevOps @ Vlerico

Great, all-in-one backend platform framework

Jakobala
Backend Developer

Motia is great. I have tested it out. Its a real paradigm shift in how we think backend.

Craig Merry
Computer Technology Analyst III @ Northern California Power Agency

I can easily setup a backend with consistent flows with multiple AI systems. Very useful for quickly getting it together.

Joel Washington
Founder / CEO @Zero & @Shift

After reading your comprehensive architecture and comparing it with the alternatives: Motia is almost eerily perfect for your vision. The alignment is so strong it's like they read your architecture docs and built exactly what you need... This isn't coincidence - it's architectural destiny.

Rituraj Tripathy
Software engineer @Pinelabs

I love the event driven approach to building stuff that motia has taken - went through the code and it looks pretty soli. The unique thing here is how fast one can get his mvp running.

Gaurav Dhiman
Software Engineering Lead @Meta

I am already a big fan of Motia. Motia specially shines in Al Workflows that require multi-agent orchestration, as it supports event driven workflows, which allows you to decouple the agents but still have them communicate by passing payloads through events. Its much easier to maintain workflows which are event driven.

Left Arrow
right Arrow

Still Curious? Here’s What Devs Ask Most.

Faq PlusFaq Minus
What is Motia?

Motia is an open-source, unified backend framework that eliminates runtime fragmentation by bringing APIs, background jobs, queueing, streaming, state, workflows, AI agents, observability, scaling, and deployment into one unified system using a single core primitive, the “Step.”

Faq PlusFaq Minus
What is Motia Cloud?

Motia Cloud is the managed hosting platform for Motia applications, providing instant deployment with automatic scaling, global CDN, monitoring, and enterprise-grade infrastructure. Deploy with one CLI command or one-click web deployment directly from your local Workbench - no DevOps configuration required.

Faq PlusFaq Minus
Where to self-host Motia?

You can self-host Motia anywhere using Docker with the official motiadev/motia image. Simply run npx motia@latest docker setup to configure your project, then npx motia@latest docker run to deploy locally, or use the Docker image on any cloud provider like AWS, Google Cloud, Azure, or your own servers. Complete setup guide at docs.

Faq PlusFaq Minus
How do I deploy on the Cloud?

Motia Cloud provides instant deployment with two simple options: use the CLI command motia cloud deploy --api-key your-api-key --version-name 1.0.0 or go to motia.cloud for one-click web deployment directly from your local Workbench. Your app deploys with automatic scaling, monitoring in seconds. Check out the docs for more.

Faq PlusFaq Minus
What languages does Motia support?

Motia is a language-agnostic framework. You can write steps in Python, JavaScript, and TypeScript. Ruby is in beta, with more languages coming soon. However, if you understand the framework well, you can write steps in any language.

Faq PlusFaq Minus
What is your pricing model?

Motia is a free-to-use open source framework. If you're looking for Motia Cloud, stay tuned. We're working on it, and soon you'll see pricing available on our hosting page.

One framework for your entire backend.

APIs, background jobs, workflows, AI agents, streaming, and observability, all unified in a single system. Write Steps in any language. Deploy with one command.