Motia Icon

Real-Time Streaming

Learn how to add real-time streaming updates to your Motia workflows

What You'll Build

A pet management system with real-time streaming that provides live updates to clients:

  • Stream Configuration - Define stream schemas for type-safe updates
  • API with Streaming - APIs that initialize streams and return immediately
  • Background Job Streaming - Jobs that push real-time progress updates
  • Agentic Step Streaming - AI enrichment with live progress updates
  • Multi-Step Streaming - Multiple steps updating the same stream

workbench

Getting Started

Clone the example repository:

git clone https://github.com/MotiaDev/build-your-first-app.git
cd build-your-first-app
git checkout stream-ai-agents

Install dependencies:

npm install

Set up your OpenAI API key in .env:

OPENAI_API_KEY=your_api_key_here

Start the Workbench:

npm run dev

Your Workbench will be available at http://localhost:3000.


Project Structure

.env
package.json
requirements.txt
types.d.ts

Files like features.json and tutorial/tutorial.tsx are only for the interactive tutorial and are not part of Motia's project structure.

All code examples in this guide are available in the build-your-first-app repository.

You can follow this guide to learn how to build real-time streaming with Motia step by step, or you can clone the repository and dive into our Interactive Tutorial to learn by doing directly in the Workbench.

interactive-tutorial


Understanding Real-Time Streaming

You've built APIs that return immediately, background jobs that process asynchronously, workflows that orchestrate complex logic, and agentic workflows that make intelligent decisions. But how do you give users real-time feedback while all this async processing happens in the background?

That's where streaming comes in. Motia provides streams as part of the context in any step handler - you can use them anywhere in your code. Streams use Server-Sent Events (SSE) to push live updates directly to clients as your workflow progresses.

In our pet shelter example:

  • The API initializes a stream and returns immediately with a stream ID
  • Background jobs push updates as they process (quarantine entry, health checks)
  • Agentic steps stream enrichment progress (bio generation, breed analysis)
  • Clients get live feedback throughout the entire workflow

The power is in the simplicity - streams is available in your handler's context, just like emit, logger, and state. Any step can update any stream, creating a unified real-time experience without complex orchestration.


Creating Your First Stream

Step 1: Define the Stream Configuration

First, define a stream configuration file. This makes the stream available in the context.streams object for all your step handlers.

View on GitHub:

steps/typescript/pet-creation.stream.ts
import { StreamConfig } from 'motia'
import { z } from 'zod'
 
export const config: StreamConfig = {
  /**
   * This will be available as context.streams.petCreation in the FlowContext
   */
  name: 'petCreation',
  
  /**
   * Schema defines the structure of stream updates
   */
  schema: z.object({ 
    message: z.string()
  }),
 
  /**
   * Use default storage for the stream
   */
  baseConfig: {
    storageType: 'default',
  },
}

How Stream Configuration Works

Stream configuration is simple:

  • name - Identifier for accessing the stream (e.g., context.streams.petCreation)
  • schema - Zod schema defining what data can be pushed to the stream
  • baseConfig - Storage settings (default uses in-memory storage)

Once you create this configuration file, the stream is automatically available as streams.petCreation in the context of any step handler. It's just like emit, logger, or state - part of the tools available in your handler.


Step 2: Initialize Streams from APIs

Now let's update the pet creation API to initialize a stream and return it immediately to the client.

View on GitHub:

steps/typescript/create-pet.step.ts
// steps/typescript/create-pet.step.ts
import { ApiRouteConfig, Handlers } from 'motia';
import { z } from 'zod';
import { TSStore } from './ts-store';
 
const createPetSchema = z.object({
  name: z.string().min(1, 'Name is required').trim(),
  species: z.enum(['dog', 'cat', 'bird', 'other']),
  ageMonths: z.number().int().min(0, 'Age must be a positive number'),
  weightKg: z.number().positive().optional(),
  symptoms: z.array(z.string()).optional()
});
 
export const config: ApiRouteConfig = {
  type: 'api',
  name: 'TsCreatePet',
  path: '/ts/pets',
  method: 'POST',
  emits: ['ts.pet.created', 'ts.feeding.reminder.enqueued'],
  flows: ['TsPetManagement']
};
 
export const handler: Handlers['TsCreatePet'] = async (req, { emit, logger, streams, traceId }) => {
  try {
    const validatedData = createPetSchema.parse(req.body);
 
    const pet = TSStore.create({
      name: validatedData.name,
      species: validatedData.species,
      ageMonths: validatedData.ageMonths,
      weightKg: validatedData.weightKg,
      symptoms: validatedData.symptoms
    });
 
    if (logger) {
      logger.info('🐾 Pet created', { petId: pet.id, name: pet.name, species: pet.species, status: pet.status });
    }
 
    // Create & return the initial stream record (following working pattern)
    const result = await streams.petCreation.set(traceId, 'message', { 
      message: `Pet ${pet.name} (ID: ${pet.id}) created successfully - Species: ${pet.species}, Age: ${pet.ageMonths} months, Status: ${pet.status}` 
    });
 
    if (emit) {
      await emit({
        topic: 'ts.pet.created',
        data: { petId: pet.id, event: 'pet.created', name: pet.name, species: validatedData.species, traceId }
      } as any);
 
      await emit({
        topic: 'ts.feeding.reminder.enqueued',
        data: { petId: pet.id, enqueuedAt: Date.now(), traceId }
      } as any);
    }
 
    return { 
      status: 201, 
      body: result 
    };
 
  } catch (error) {
    if (error instanceof z.ZodError) {
      return {
        status: 400,
        body: {
          message: 'Validation error',
          errors: error.errors
        }
      };
    }
 
    return {
      status: 500,
      body: { message: 'Internal server error' }
    };
  }
};

How API Stream Initialization Works

The key changes from a regular API:

  1. Access streams from context - streams is available in the FlowContext
  2. Create initial stream message - await streams.petCreation.set(traceId, 'message', data)
  3. Return the stream result - Contains stream ID and initial message
  4. Background jobs update the same stream - Using the same traceId

The API returns immediately with a stream ID. Clients can connect to this stream via SSE to receive real-time updates as background jobs process.


Step 3: Stream Updates from Background Jobs

Now let's update the feeding reminder job to push real-time updates to the stream as it processes.

View on GitHub:

steps/typescript/set-next-feeding-reminder.job.step.ts
// steps/typescript/set-next-feeding-reminder.job.step.ts
import { EventConfig, Handlers } from 'motia';
import { TSStore } from './ts-store';
 
export const config = {
  type: 'event',
  name: 'TsSetNextFeedingReminder',
  description: 'Background job that sets next feeding reminder and adds welcome notes',
  subscribes: ['ts.feeding.reminder.enqueued'],
  emits: ['ts.feeding.reminder.completed'],
  flows: ['TsPetManagement']
};
 
export const handler: Handlers['TsSetNextFeedingReminder'] = async (input, { emit, logger, streams, traceId }) => {
  const { petId, enqueuedAt } = input;
 
  if (logger) {
    logger.info('🔄 Setting next feeding reminder', { petId, enqueuedAt });
  }
 
  try {
    // Calculate next feeding time (24 hours from now)
    const nextFeedingAt = Date.now() + (24 * 60 * 60 * 1000);
    
    // Fill in non-critical details and change status to in_quarantine
    const updates = {
      notes: 'Welcome to our pet store! We\'ll take great care of this pet.',
      nextFeedingAt: nextFeedingAt,
      status: 'in_quarantine' as const
    };
 
    const updatedPet = TSStore.update(petId, updates);
    
    if (!updatedPet) {
      if (logger) {
        logger.error('❌ Failed to set feeding reminder - pet not found', { petId });
      }
      return;
    }
 
    if (logger) {
      logger.info('✅ Next feeding reminder set', { 
        petId, 
        notes: updatedPet.notes?.substring(0, 50) + '...',
        nextFeedingAt: new Date(nextFeedingAt).toISOString()
      });
    }
 
    // Stream status updates using the simple pattern
    if (streams?.petCreation && traceId) {
      await streams.petCreation.set(traceId, 'message', { 
        message: `Pet ${updatedPet.name} entered quarantine period` 
      });
 
      // Check symptoms and stream appropriate updates
      if (!updatedPet.symptoms || updatedPet.symptoms.length === 0) {
        await new Promise(resolve => setTimeout(resolve, 1000));
        await streams.petCreation.set(traceId, 'message', { 
          message: `Health check passed for ${updatedPet.name} - no symptoms found` 
        });
 
        await new Promise(resolve => setTimeout(resolve, 1000));
        await streams.petCreation.set(traceId, 'message', { 
          message: `${updatedPet.name} is healthy and ready for adoption! ✅` 
        });
      } else {
        await new Promise(resolve => setTimeout(resolve, 1000));
        await streams.petCreation.set(traceId, 'message', { 
          message: `Health check failed for ${updatedPet.name} - symptoms detected: ${updatedPet.symptoms.join(', ')}` 
        });
 
        await new Promise(resolve => setTimeout(resolve, 1000));
        await streams.petCreation.set(traceId, 'message', { 
          message: `${updatedPet.name} needs medical treatment ❌` 
        });
      }
    }
 
    if (emit) {
      (emit as any)({
        topic: 'ts.feeding.reminder.completed',
        data: { 
          petId, 
          event: 'feeding.reminder.completed',
          completedAt: Date.now(),
          processingTimeMs: Date.now() - enqueuedAt
        }
      });
    }
 
  } catch (error: any) {
    if (logger) {
      logger.error('❌ Feeding reminder job error', { petId, error: error.message });
    }
  }
};

How Background Job Streaming Works

Background jobs can push multiple updates to a stream:

  • Access the stream - streams.petCreation is available in context
  • Push updates - await streams.petCreation.set(traceId, 'message', data)
  • Use the same traceId - Links updates to the original API request
  • Send multiple updates - Each set() call sends immediately to connected clients

The background job processes asynchronously, pushing updates at each stage. Clients connected to the stream receive these updates in real-time via SSE.


Step 4: Agentic Step Streaming

Agentic steps can also stream progress updates as they generate content. This provides live feedback during potentially long-running AI operations.

View on GitHub:

steps/typescript/ai-profile-enrichment.step.ts
// steps/typescript/ai-profile-enrichment.step.ts
import { EventConfig, Handlers } from 'motia';
import { TSStore, PetProfile } from './ts-store';
 
export const config = {
  type: 'event',
  name: 'TsAiProfileEnrichment',
  description: 'Agentic step that enriches pet profiles using OpenAI',
  subscribes: ['ts.pet.created'],
  emits: [],
  flows: ['TsPetManagement']
};
 
export const handler: Handlers['TsAiProfileEnrichment'] = async (input, { logger, streams, traceId }) => {
  const { petId, name, species } = input;
 
  if (logger) {
    logger.info('🤖 AI Profile Enrichment started', { petId, name, species });
  }
 
  // Stream enrichment started event
  if (streams && traceId) {
    await (streams as any).petCreation.set(traceId, 'enrichment_started', { 
      message: `AI enrichment started for ${name}`
    } as any);
  }
 
  try {
    const apiKey = process.env.OPENAI_API_KEY;
    if (!apiKey) {
      throw new Error('OPENAI_API_KEY environment variable is not set');
    }
 
    const prompt = `Generate a pet profile for adoption purposes. Pet details:
- Name: ${name}
- Species: ${species}
 
Please provide a JSON response with these fields:
- bio: A warm, engaging 2-3 sentence description that would appeal to potential adopters
- breedGuess: Your best guess at the breed or breed mix (be specific but realistic)
- temperamentTags: An array of 3-5 personality traits (e.g., "friendly", "energetic", "calm")
- adopterHints: Practical advice for potential adopters (family type, living situation, care needs)
 
Keep it positive, realistic, and adoption-focused.`;
 
    const enrichmentFields = ['bio', 'breedGuess', 'temperamentTags', 'adopterHints'];
    const enrichedProfile: any = {};
 
    const response = await fetch('https://api.openai.com/v1/chat/completions', {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${apiKey}`,
        'Content-Type': 'application/json',
      },
      body: JSON.stringify({
        model: 'gpt-3.5-turbo',
        messages: [
          {
            role: 'system',
            content: 'You are a pet adoption specialist who creates compelling, accurate pet profiles. Always respond with valid JSON only.'
          },
          {
            role: 'user',
            content: prompt
          }
        ],
        max_tokens: 500,
        temperature: 0.7,
      }),
    });
 
    if (!response.ok) {
      throw new Error(`OpenAI API error: ${response.status} ${response.statusText}`);
    }
 
    const data = await response.json();
    const aiResponse = data.choices[0]?.message?.content;
 
    if (!aiResponse) {
      throw new Error('No response from OpenAI API');
    }
 
    let profile: PetProfile;
    try {
      profile = JSON.parse(aiResponse);
    } catch (parseError) {
      profile = {
        bio: `${name} is a wonderful ${species} looking for a loving home. This pet has a unique personality and would make a great companion.`,
        breedGuess: species === 'dog' ? 'Mixed Breed' : species === 'cat' ? 'Domestic Shorthair' : 'Mixed Breed',
        temperamentTags: ['friendly', 'loving', 'loyal'],
        adopterHints: `${name} would do well in a caring home with patience and love.`
      };
      
      if (logger) {
        logger.warn('⚠️ AI response parsing failed, using fallback profile', { petId, parseError: parseError instanceof Error ? parseError.message : String(parseError) });
      }
    }
 
    const updatedPet = TSStore.updateProfile(petId, profile);
    
    if (!updatedPet) {
      throw new Error(`Pet not found: ${petId}`);
    }
 
    if (logger) {
      logger.info('✅ AI Profile Enrichment completed', { 
        petId, 
        profile: {
          bio: profile.bio.substring(0, 50) + '...',
          breedGuess: profile.breedGuess,
          temperamentTags: profile.temperamentTags,
          adopterHints: profile.adopterHints.substring(0, 50) + '...'
        }
      });
    }
 
    // Stream each field as it's processed
    for (const field of enrichmentFields) {
      await new Promise(resolve => setTimeout(resolve, 300));
      
      const value = profile[field as keyof PetProfile];
      
      if (streams && traceId) {
        await (streams as any).petCreation.set(traceId, `progress_${field}`, { 
          message: `Generated ${field} for ${name}`
        } as any);
      }
    }
 
    // Stream enrichment completed event
    if (streams && traceId) {
      await (streams as any).petCreation.set(traceId, 'completed', { 
        message: `AI enrichment completed for ${name}`
      } as any);
    }
 
  } catch (error: any) {
    if (logger) {
      logger.error('❌ AI Profile Enrichment failed', { 
        petId, 
        error: error.message 
      });
    }
 
    const fallbackProfile: PetProfile = {
      bio: `${name} is a lovely ${species} with a unique personality, ready to find their forever home.`,
      breedGuess: species === 'dog' ? 'Mixed Breed' : species === 'cat' ? 'Domestic Shorthair' : 'Mixed Breed',
      temperamentTags: ['friendly', 'adaptable'],
      adopterHints: `${name} is looking for a patient and loving family.`
    };
 
    TSStore.updateProfile(petId, fallbackProfile);
 
    // Stream fallback profile completion
    if (streams && traceId) {
      await (streams as any).petCreation.set(traceId, 'completed', { 
        message: `AI enrichment completed with fallback profile for ${name}`
      } as any);
    }
  }
};

How Agentic Step Streaming Works

Agentic steps stream progress as they work:

  1. Stream start notification - Let users know AI processing has begun
  2. Progress updates - Stream each stage of generation (bio, breed, temperament, etc.)
  3. Stream completion - Notify when AI processing is done
  4. Error streaming - Stream errors gracefully with fallback messages

This transforms a potentially slow AI operation into an engaging real-time experience.


Testing Streaming in Action

The best way to test streams is through Workbench.

Test 1: Create a Pet with Streaming

Open Workbench and navigate to the Endpoints section, then test the Pet Creation endpoint:

Prefer using curl?

curl -X POST http://localhost:3000/ts/pets \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Max",
    "species": "dog",
    "ageMonths": 24,
    "symptoms": ["coughing"]
  }'

You'll get an immediate response with the stream result. The API returns right away while background jobs process asynchronously.

Test 2: Monitor Stream Updates in Workbench

After creating a pet, check the Tracing view in Workbench:

  1. Automatically switched to the Tracing tab so you can see the stream updates in real-time
  2. Click on the most recent trace
  3. Watch the timeline as steps execute
  4. See stream updates appear in real-time in the timeline

stream-tracing

You'll observe:

  • Pet creation completes immediately
  • Feeding reminder job streams quarantine updates
  • AI enrichment streams progress updates
  • All updates visible in the trace timeline

Test 3: Create Pet with Symptoms

Test the conditional streaming logic by creating a pet with symptoms:

curl -X POST http://localhost:3000/ts/pets \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Luna",
    "species": "cat",
    "ageMonths": 18,
    "symptoms": ["sneezing", "watery eyes"]
  }'

Watch the logs to see different stream messages based on the symptoms detected.

Test 4: Create Pet Without Symptoms

Compare the streaming behavior with a healthy pet:

curl -X POST http://localhost:3000/ts/pets \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Buddy",
    "species": "dog",
    "ageMonths": 12
  }'

The stream will show health check passed messages instead of treatment needed messages.

Observing Stream Updates

Watch the Workbench console logs to see the real-time stream updates as they're pushed:

🐾 Pet created { petId: '1', name: 'Max', species: 'dog', status: 'new' }
📋 Setting next feeding reminder { petId: '1' }
🤖 AI Profile Enrichment started { petId: '1', name: 'Max' }
✅ Next feeding reminder set { petId: '1' }
✅ AI Profile Enrichment completed { petId: '1' }

Each emoji-prefixed log corresponds to a stream update being pushed to connected clients.


🎉 Congratulations! You've built a complete real-time streaming system with Motia. Your pet management system now provides live feedback to users while complex workflows execute in the background.


What's Next?

You've now mastered the complete Motia stack:

  • API Endpoints - Build RESTful APIs with validation
  • Background Jobs - Process async tasks efficiently
  • Workflows - Orchestrate complex business logic
  • Agentic Workflows - Make intelligent decisions with AI
  • Real-Time Streaming - Provide live updates using streams in any step handler

This is the complete progression from simple APIs to intelligent, real-time systems!

Key Takeaway: Streams are just another tool in your step handler's context - use them wherever you need real-time updates!

Here are some ideas to extend your streaming implementation:

  • Add stream analytics - Track how many clients are connected, message delivery rates
  • Implement stream persistence - Use Redis adapter for stream storage across restarts
  • Create stream multiplexing - Multiple streams per workflow for different update types
  • Build progress bars - Use structured progress data (0-100%) instead of just messages
  • Add stream authentication - Ensure only authorized clients can access streams

Explore more examples in the Motia Examples Repository.

Need help? See our Community Resources for questions, examples, and discussions.