Skip to main content

Stream Triggers

Stream triggers enable real-time, event-driven flow execution by listening to message streams. Perfect for high-throughput data processing, real-time monitoring, event-driven architectures, and system integration. Stream triggers are only available in Advanced Mode and require streams to be configured in the QuivaWorks hosting dashboard.

Overview

Streams are persistent message channels that allow high-throughput, low-latency communication between systems. Stream triggers listen to these streams and execute flows when messages arrive. Key Features:
  • Real-time message processing
  • High throughput (thousands of messages per second)
  • Low latency (milliseconds)
  • Message ordering guarantees
  • Built-in retry and error handling
  • Message filtering by subject
  • Scalable architecture

How It Works

Create a stream in the QuivaWorks hosting dashboard. Configure gateway mappings to write messages to the stream. Add a Stream trigger to your flow. Messages written to the stream automatically trigger your flow. Flow receives message data and processes it in real-time.

Stream Types

Stream Message Trigger

Triggers on every message written to the selected stream. No filtering applied. All messages trigger the flow. Best for processing every event. Configuration:
  • Select stream from dropdown
  • Messages arrive in order
  • Flow executes for each message
Use when: You need to process all messages on a stream.

Message Subject Trigger

Triggers only when message subject matches specified pattern. Filters messages before triggering flow. Only matching messages trigger the flow. Reduces unnecessary flow executions. Configuration:
  • Select stream from dropdown
  • Specify subject pattern to match
  • Supports wildcards and patterns
Use when: You want to filter messages by subject before processing.

Configuration

Stream Selection

Choose from available streams configured in your hosting dashboard. Streams must be created before adding stream triggers. Stream name identifies the message channel. Stream naming convention:
  • Use descriptive names (e.g., “orders”, “user-events”, “analytics”)
  • Avoid special characters
  • Use lowercase and hyphens

Subject Filtering (Message Subject Trigger)

Filter messages by subject pattern: Exact match:
order.created
Wildcard patterns:
order.*
user.*.updated
*.critical
Multiple subjects: Configure multiple triggers for different subjects on same stream.

Processing Mode

Sequential Processing: Messages processed in order, one at a time. Guarantees message ordering. Lower throughput but maintains order. Parallel Processing: Multiple messages processed simultaneously. Higher throughput. No ordering guarantees. Choose based on whether message order matters for your use case.

Accessing Message Data

Message Structure

All message data is available under $.trigger.message:
// Message content
const data = $.trigger.message.data;

// Message metadata
const subject = $.trigger.message.subject;
const timestamp = $.trigger.message.timestamp;
const messageId = $.trigger.message.id;
const stream = $.trigger.message.stream;

// Example values
// data: {"orderId": "ORD-123", "status": "created"}
// subject: "order.created"
// timestamp: "2025-10-14T10:30:00.123Z"
// messageId: "msg_abc123"
// stream: "orders"

Message Data

Message data is typically JSON:
const messageData = $.trigger.message.data;

// Access properties
const orderId = messageData.orderId;
const customerId = messageData.customerId;
const amount = messageData.amount;

// For nested data
const address = messageData.customer.address;
const city = address.city;

Message Metadata

Access message metadata for tracking and debugging:
// Message ID (unique identifier)
const messageId = $.trigger.message.id;

// Timestamp (when message was written)
const timestamp = $.trigger.message.timestamp;

// Stream name
const streamName = $.trigger.message.stream;

// Subject (message topic)
const subject = $.trigger.message.subject;

// Sequence number (message order in stream)
const sequence = $.trigger.message.sequence;

Use Cases

Real-Time Order Processing

Scenario: Process orders as they are created Stream: orders Message Subjects:
  • order.created
  • order.updated
  • order.cancelled
Flow:
  • Trigger: Message Subject (order.created)
  • Agent: Validate order details
  • HTTP Request: Check inventory
  • Condition: In stock?
    • Yes: Process payment
    • No: Notify customer
  • HTTP Request: Update order management system
Benefits: Instant order processing, real-time inventory checks, immediate customer notifications.

System Event Monitoring

Scenario: Monitor system events for alerts Stream: system-events Message Subjects:
  • system.error
  • system.warning
  • system.critical
Flow:
  • Trigger: Message Subject (system.critical)
  • Agent: Analyze error details
  • Condition: Requires immediate attention?
    • Yes: Page on-call engineer
    • No: Log for review
  • HTTP Request: Create incident ticket
Benefits: Real-time alerting, automatic escalation, comprehensive logging.

User Activity Tracking

Scenario: Track and respond to user activities Stream: user-events Message Subjects:
  • user.signup
  • user.login
  • user.purchase
  • user.churn-risk
Flow:
  • Trigger: Message Subject (user.signup)
  • Agent: Analyze user profile
  • HTTP Request: Send welcome email
  • HTTP Request: Create onboarding tasks
  • HTTP Request: Notify account manager
Benefits: Immediate engagement, personalized onboarding, activity tracking.

IoT Data Processing

Scenario: Process sensor data in real-time Stream: sensor-data Message Subjects:
  • sensor.temperature
  • sensor.humidity
  • sensor.pressure
Flow:
  • Trigger: Stream Message (sensor-data)
  • Agent: Analyze sensor readings
  • Condition: Threshold exceeded?
    • Yes: Send alert
    • No: Log data
  • HTTP Request: Update dashboard
Benefits: Real-time monitoring, instant alerts, data aggregation.

Financial Transaction Processing

Scenario: Process financial transactions Stream: transactions Message Subjects:
  • transaction.initiated
  • transaction.completed
  • transaction.failed
  • transaction.fraudulent
Flow:
  • Trigger: Message Subject (transaction.initiated)
  • Agent: Fraud detection analysis
  • Condition: Suspicious activity?
    • Yes: Flag for review
    • No: Process transaction
  • HTTP Request: Update account balance
  • HTTP Request: Send receipt
Benefits: Real-time fraud detection, instant processing, audit trail.

Content Moderation

Scenario: Moderate user-generated content Stream: content-submissions Message Subjects:
  • content.submitted
  • content.flagged
  • content.reported
Flow:
  • Trigger: Message Subject (content.submitted)
  • Agent: Content moderation analysis
  • Condition: Contains violations?
    • Yes: Remove and notify user
    • No: Publish content
  • HTTP Request: Update content database
Benefits: Real-time moderation, automatic filtering, user safety.

Gateway Configuration

Streams require gateway mappings to write messages. Configure in QuivaWorks hosting dashboard.

Creating Gateway Mapping

Navigate to hosting dashboard. Go to Gateway section. Click “Add Mapping”. Configure: Endpoint Path: URL path that accepts messages (e.g., /api/events) Target Stream: Stream to write messages to Subject Template: Subject pattern for messages Authentication: API key or other auth method

Writing Messages

Once gateway configured, write messages via HTTP:
curl -X POST https://gateway.quiva.ai/api/events \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -d '{
    "subject": "order.created",
    "data": {
      "orderId": "ORD-123",
      "customerId": "CUST-456",
      "amount": 99.99
    }
  }'
From application:
async function publishEvent(subject, data) {
  await fetch('https://gateway.quiva.ai/api/events', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${API_KEY}`
    },
    body: JSON.stringify({ subject, data })
  });
}

// Usage
await publishEvent('order.created', {
  orderId: 'ORD-123',
  customerId: 'CUST-456',
  amount: 99.99
});

Best Practices

Message Design

Keep messages small: Aim for under 1 KB per message. Large messages slow processing. Use clear subjects: Subject should indicate message type. Use hierarchical naming (e.g., order.created, order.updated). Include timestamps: Always include event timestamp in message data. Add message IDs: Include unique identifier for tracking and deduplication. Structure data consistently: Use consistent JSON schema across message types.

Subject Naming

Use hierarchical structure:
resource.action
user.created
user.updated
user.deleted

order.created
order.shipped
order.delivered
Use dots for hierarchy:
system.error.database
system.error.api
system.warning.performance
Be specific:
Good: order.payment.completed
Bad: order.done

Error Handling

Implement retry logic: Messages that fail processing will be retried automatically. Handle poison messages: Messages that consistently fail should be moved to dead-letter queue. Log failures: Track failed messages for debugging. Set timeout limits: Prevent long-running flows from blocking stream.
try {
  await processMessage($.trigger.message.data);
} catch (error) {
  console.error('Message processing failed:', {
    messageId: $.trigger.message.id,
    subject: $.trigger.message.subject,
    error: error.message
  });
  
  // Decide whether to retry
  if (error.retryable) {
    throw error; // Will retry
  } else {
    // Move to dead-letter queue
    await moveToDeadLetter($.trigger.message);
  }
}

Performance Optimization

Batch when possible: Group related operations together. Use parallel processing: For independent messages, enable parallel processing. Minimize external calls: Reduce API calls to external services. Cache frequently used data: Cache reference data to reduce lookups. Monitor throughput: Track messages processed per second.

Monitoring and Observability

Track message metrics:
  • Messages received per second
  • Processing time per message
  • Error rate
  • Retry rate
  • Queue depth
Set up alerts:
  • High error rate
  • Processing delays
  • Queue backup
  • Failed messages
Log message processing:
console.log('Processing message', {
  messageId: $.trigger.message.id,
  subject: $.trigger.message.subject,
  timestamp: $.trigger.message.timestamp,
  processingStarted: new Date().toISOString()
});

// ... process message ...

console.log('Message processed', {
  messageId: $.trigger.message.id,
  duration: Date.now() - startTime,
  success: true
});

Message Ordering

Sequential Processing

Messages processed in order they were written to stream. Next message waits until current message completes. Guarantees ordering but lower throughput. Use when:
  • Order matters (e.g., account balance updates)
  • State depends on sequence (e.g., status transitions)
  • Dependencies between messages

Parallel Processing

Multiple messages processed simultaneously. No ordering guarantees. Higher throughput. Use when:
  • Messages are independent
  • Order doesn’t matter
  • High throughput required
  • Idempotent operations

Ordering Guarantees

Within a stream: Messages ordered by write time. Across streams: No ordering guarantees. Subject filtering: Order maintained within filtered subject.

Advanced Features

Dead Letter Queue

Messages that fail repeatedly are moved to dead letter queue. Prevents poison messages from blocking stream. Allows manual inspection and reprocessing. Configuration:
  • Set max retry attempts (default: 3)
  • Configure dead letter stream
  • Set retention period
Access dead letter messages:
  • View in hosting dashboard
  • Reprocess manually
  • Analyze for patterns

Message Replay

Replay historical messages from stream. Useful for:
  • Reprocessing after bug fixes
  • Backfilling data
  • Testing with production data
  • Disaster recovery
Replay from timestamp:
Replay from: 2025-10-14T00:00:00Z
Replay to: 2025-10-14T23:59:59Z

Stream Analytics

Monitor stream health and performance:
  • Message rate (per second/minute/hour)
  • Processing latency (p50, p95, p99)
  • Error rate
  • Consumer lag
  • Queue depth
Dashboard metrics:
  • Real-time message rate chart
  • Latency distribution
  • Error rate over time
  • Consumer performance

Multi-Consumer Patterns

Multiple flows can consume from same stream. Each consumer processes independently. Useful for:
  • Different processing logic per consumer
  • Separation of concerns
  • Parallel processing pipelines
Example: Stream: user-events Consumer 1: Analytics processing Consumer 2: Email notifications Consumer 3: Database updates

Troubleshooting

Messages Not Triggering Flow

Check stream exists: Verify stream created in hosting dashboard. Check gateway mapping: Ensure gateway configured to write to stream. Check flow is active: Verify flow is published and trigger enabled. Check subject filter: Ensure message subject matches filter pattern. Review stream logs: Check if messages arriving at stream.

Processing Delays

High message volume: Too many messages for current processing capacity. Slow flow execution: Flow steps taking too long to complete. External API delays: Third-party services responding slowly. Resource limits: Hitting compute or memory limits. Solutions:
  • Enable parallel processing
  • Optimize flow steps
  • Increase resource allocation
  • Add more consumers

Message Loss

Check retention settings: Messages may have expired. Check consumer acknowledgment: Messages not acknowledged may be lost. Check error logs: Processing errors may cause message loss. Review dead letter queue: Failed messages moved to DLQ.

High Error Rates

Validate message format: Ensure messages match expected schema. Check external services: Verify APIs and services are accessible. Review error logs: Identify common error patterns. Test with sample messages: Validate flow with known good messages.

Stream Limits by Plan

Free Tier:
  • Not available (requires Pro or higher)
Starter Plan:
  • Not available (requires Pro or higher)
Pro Plan:
  • 5 streams
  • 10,000 messages per hour
  • 7-day message retention
  • Sequential processing only
Team Plan:
  • 25 streams
  • 100,000 messages per hour
  • 30-day message retention
  • Parallel processing enabled
  • Dead letter queue
Enterprise Plan:
  • Unlimited streams
  • Custom message limits
  • Custom retention
  • Advanced features
  • Dedicated infrastructure
  • Priority support

Performance Considerations

Throughput

Sequential processing:
  • Depends on flow execution time
  • Typically 10-100 messages per second
  • Guaranteed ordering
Parallel processing:
  • Much higher throughput
  • Typically 100-1,000 messages per second
  • No ordering guarantees

Latency

End-to-end latency:
  • Message write to stream: Less than 10ms
  • Trigger activation: Less than 50ms
  • Flow execution: Depends on flow complexity
Total latency: Typically under 1 second for simple flows.

Scalability

Vertical scaling: Increase resources per consumer. Horizontal scaling: Add more consumers (parallel processing). Stream partitioning: Split stream by subject for parallel processing.

Migration Guide

From Webhooks to Streams

Why migrate:
  • Higher throughput
  • Lower latency
  • Better ordering guarantees
  • Built-in retry logic
Migration steps:
  • Create stream in hosting dashboard
  • Configure gateway mapping
  • Update webhook sender to use gateway
  • Add stream trigger to flow
  • Test with sample messages
  • Gradually migrate traffic

From Polling to Streams

Why migrate:
  • Real-time processing (no polling delay)
  • More efficient (event-driven)
  • Lower costs (no wasted polls)
  • Better resource usage
Migration steps:
  • Identify polling endpoints
  • Create streams for events
  • Configure gateway mappings
  • Replace schedule triggers with stream triggers
  • Remove polling logic

Next Steps