Skip to main content

Stream Functions

Stream functions provide persistent, ordered message storage with event sourcing capabilities. Use streams to build audit logs, maintain event-driven state, and implement replay-based systems with subject-based message routing.
What are Streams? Streams are persistent logs that listen to subject patterns and store messages in order. Messages are identified by subjects and can be retrieved by sequence number. Streams support aggregation through folding, where messages are reduced to build current state.

Function List


stream-create

Create a new stream that listens to specified subject patterns and stores messages persistently.

Parameters

name
string
required
Name of the stream. Must be unique and contain only alphanumeric characters, dashes, and underscores.Naming conventions:
  • Use descriptive names: order-events, user-activity
  • Indicate purpose: audit-log, state-changes
subject
array
required
Array of subject patterns the stream listens to. Supports wildcards (* for single token, > for multiple tokens).Examples:
  • ["orders.>"] - All order-related subjects
  • ["users.*.created", "users.*.updated"] - Specific user events
  • ["events.production.>"] - All production events
description
string
Optional description of the stream’s purpose.
storage
string
default:"File"
Storage backend type.Options:
  • File - Persistent file storage (recommended)
  • Memory - In-memory storage (faster, not persistent)
retention
string
default:"Limits"
Message retention policy.Options:
  • Limits - Retain until configured limits reached (default)
  • Interest - Retain while consumers are interested
  • WorkQueue - Remove messages after acknowledgment
max_msgs
integer
default:"-1"
Maximum number of messages to store. -1 for unlimited.
max_bytes
integer
default:"-1"
Maximum total size in bytes. -1 for unlimited.
max_age
integer
Maximum age of messages in nanoseconds. Older messages are automatically deleted.
max_msgs_per_subject
integer
Maximum messages per subject. Useful for keeping only recent events per entity.
discard
string
default:"Old"
Policy when stream reaches limits.Options:
  • Old - Discard oldest messages (default)
  • New - Reject new messages
num_replicas
integer
default:"1"
Number of stream replicas for high availability (1-5).
duplicate_window
integer
Window in nanoseconds to detect duplicate messages. Default is 2 minutes.

Response

{
  "status_code": 200,
  "body": {
    "message": "Stream created successfully"
  }
}

Example Usage

{
  "name": "order-events",
  "subject": ["orders.>"],
  "description": "All order-related events",
  "max_msgs": 1000000,
  "max_age": 2592000000000000
}

stream-publish

Publish a message to a subject. If a stream is configured to listen to this subject pattern, it will store the message.

Parameters

subject
string
required
Subject to publish the message to. This determines which streams receive the message.Subject patterns:
  • orders.123.created - Specific order event
  • users.456.updated - User update event
  • events.production.error - Production error event
value
object | string
required
Message payload. Can be a JSON object or string.Best practices:
  • Include event type: {"event": "order_created"}
  • Include timestamp: {"timestamp": "2025-10-16T10:30:00Z"}
  • Include relevant IDs: {"order_id": "123", "user_id": "456"}

Response

{
  "status_code": 200,
  "body": {
    "message": "Message published successfully"
  }
}

Example Usage

{
  "subject": "orders.ORD-001.created",
  "value": {
    "event": "order_created",
    "order_id": "ORD-001",
    "customer_id": "CUST-123",
    "amount": 150.00,
    "items": 3,
    "timestamp": "2025-10-16T10:30:00Z"
  }
}

Common Patterns

Track all changes to an entity
Subject: "orders.{order_id}.{event}"
Examples:
  - orders.ORD-001.created
  - orders.ORD-001.paid
  - orders.ORD-001.shipped
  - orders.ORD-001.delivered
Track user actions
Subject: "users.{user_id}.activity"
Value: {
  "event": "action_name",
  "details": {...}
}

stream-get

Retrieve messages from a stream by subject. Can retrieve all messages for a subject or a specific sequence range.

Parameters

stream
string
required
Name of the stream to retrieve messages from.
subject
string
required
Subject to retrieve messages for. Must match messages exactly (no wildcards).
from_sequence
integer
Start from this sequence number (inclusive). If provided, enables sequence-based retrieval.
to_sequence
integer
End at this sequence number (inclusive). Requires from_sequence.
limit
integer
Maximum number of messages to return. Requires from_sequence.
If from_sequence, limit, or to_sequence are provided, the function uses sequence-based retrieval. Otherwise, it retrieves all messages for the subject.

Response

{
  "status_code": 200,
  "body": {
    "body": {
      "results_total": 3,
      "results": [
        {
          "created": 1697458200000,
          "subject": "orders.ORD-001.created",
          "value": "{\"event\":\"order_created\",\"amount\":150}"
        }
      ]
    },
    "metadata": {
      "stream": "order-events",
      "subject": "orders.ORD-001.created"
    }
  }
}

Example Usage

{
  "stream": "order-events",
  "subject": "orders.ORD-001.created"
}

Search for messages within a stream using a subject pattern and return matching results.

Parameters

stream
string
required
Name of the stream to search.
Subject pattern to search for. Supports wildcards.Examples:
  • orders.* - All direct order subjects
  • orders.> - All order subjects (including nested)
  • users.123.* - All events for user 123
limit
integer
required
Maximum number of results to return (1-1000).

Response

{
  "status_code": 200,
  "body": {
    "body": {
      "results_total": 45,
      "results": [
        {
          "created": 1697458200000,
          "subject": "orders.ORD-001.created",
          "value": "{\"event\":\"order_created\"}"
        },
        {
          "created": 1697458260000,
          "subject": "orders.ORD-001.paid",
          "value": "{\"event\":\"order_paid\"}"
        }
      ]
    },
    "metadata": {
      "stream": "order-events",
      "subject": "orders.>"
    }
  }
}

Example Usage

{
  "stream": "order-events",
  "search": "orders.ORD-001.>",
  "limit": 100
}

stream-aggregate

Fold messages for a subject into current state using event sourcing. Messages are processed in order and reduced using lodash merge, with special control messages for state manipulation.
Event Sourcing with Folding: Aggregate rebuilds current state by replaying all messages for a subject in order. Normal messages are merged, while control messages (unset, tombstone, poison-pill) modify the fold behavior.

Parameters

stream
string
required
Name of the stream to aggregate from.
subject
string
required
Subject pattern to search and aggregate. Supports wildcards.

Folding Logic

The aggregate function processes messages in order with this logic:
  1. Normal messages: Merged into aggregate using lodash merge()
  2. type: 'unset': Removes specified paths from aggregate
  3. type: 'poison-pilled': Resets aggregate to empty object {}
  4. type: 'tombstoned': Stops processing (ignores subsequent messages)

Response

{
  "status_code": 200,
  "body": {
    "body": {
      "order_id": "ORD-001",
      "customer_id": "CUST-123",
      "amount": 150.00,
      "status": "shipped",
      "tracking": "1Z999AA"
    },
    "metadata": {
      "stream": "order-events",
      "subject": "orders.ORD-001.>"
    }
  }
}

Example Usage

{
  "stream": "order-events",
  "subject": "orders.ORD-001.>"
}

Folding Example

Given these messages in order:
Message 1: {"order_id": "ORD-001", "status": "created", "amount": 150}
Message 2: {"status": "paid", "payment_id": "PAY-123"}
Message 3: {"status": "shipped", "tracking": "1Z999AA"}
Message 4: {"type": "unset", "path": "payment_id"}
Result after folding:
{
  "order_id": "ORD-001",
  "status": "shipped",
  "amount": 150,
  "tracking": "1Z999AA"
  // payment_id removed by unset
}

stream-list

List all streams in your account with their metadata and configuration.

Parameters

No parameters required.

Response

{
  "status_code": 200,
  "body": {
    "body": {
      "results_total": 3,
      "results": [
        {
          "created": 1694170800000,
          "description": "Order lifecycle events",
          "messages_total": 12458,
          "metadata": {},
          "name": "order-events",
          "subjects": ["orders.>"]
        },
        {
          "created": 1694257200000,
          "description": "User activity tracking",
          "messages_total": 98234,
          "metadata": {},
          "name": "user-activity",
          "subjects": ["users.*.activity"]
        }
      ]
    }
  }
}

Example Usage

{
  "function": "stream-list"
}

stream-unset

Publish an unset control message that removes specified properties from the aggregate when folded.
Use Case: Remove sensitive data, correct mistakes, or clean up deprecated fields from the current state without affecting message history.

Parameters

subject
string
required
Subject to publish the unset message to. Must match the subject used in aggregation.
path
string
required
Property path to remove from aggregate. Supports dot notation for nested properties.Examples:
  • "email" - Remove top-level property
  • "address.street" - Remove nested property
  • "metadata.temporary" - Remove from nested object

Response

{
  "status_code": 200,
  "body": {
    "message": "Unset message published successfully"
  }
}

Example Usage

{
  "subject": "users.user-456.state",
  "path": "temporary_token"
}

How It Works

Initial aggregate: {"name": "John", "email": "[email protected]", "temp": "data"}

Publish unset: {"subject": "users.123.state", "path": "temp"}

After aggregation: {"name": "John", "email": "[email protected]"}

stream-tombstone

Publish a tombstone control message that stops processing further messages when encountered during aggregation. Use to mark a subject as deleted while preserving history.
Tombstone Pattern: Marks an entity as deleted without removing history. Aggregation stops at the tombstone, ignoring all subsequent messages.

Parameters

subject
string
required
Subject to publish the tombstone message to. Future aggregations will stop at this message.

Response

{
  "status_code": 200,
  "body": {
    "message": "Tombstone message published successfully"
  }
}

Example Usage

{
  "subject": "users.user-789.state"
}

How It Works

Message 1: {"name": "Alice", "status": "active"}
Message 2: {"email": "[email protected]"}
Message 3: {"type": "tombstoned"}
Message 4: {"status": "reactivated"}  ← This is ignored

Aggregate stops at tombstone: {"name": "Alice", "email": "[email protected]"}

stream-poison-pill

Publish a poison pill control message that resets the aggregate to an empty object when encountered during folding. Use to start fresh or correct corrupted state.
Reset Pattern: Clears all previous state and starts fresh from this point. Useful for major state corrections or entity resets.

Parameters

subject
string
required
Subject to publish the poison pill message to. Aggregate will reset to {} at this message.

Response

{
  "status_code": 200,
  "body": {
    "message": "Poison pill message published successfully"
  }
}

Example Usage

{
  "subject": "orders.ORD-001.state"
}

How It Works

Message 1: {"order_id": "ORD-001", "status": "created", "amount": 150}
Message 2: {"status": "paid"}
Message 3: {"type": "poison-pilled"}  ← Resets to {}
Message 4: {"order_id": "ORD-001", "status": "created", "amount": 200}
Message 5: {"status": "paid"}

Final aggregate: {"order_id": "ORD-001", "status": "paid", "amount": 200}

Best Practices

Use Meaningful Subjects

Structure subjects hierarchically: entity.id.event (e.g., orders.123.created)

Include Event Types

Always include event type in value: {"event": "order_created"}

Add Timestamps

Include timestamps in message payloads for debugging and analytics

Aggregate by Entity

Use wildcards to aggregate all events for an entity: orders.123.>

Control Message Ordering

Understand fold order: normal merge → unset → poison-pill → tombstone

Preserve History

Control messages don’t delete history, only affect aggregation

Event Sourcing Patterns

Track all changes to an entity and rebuild current state
1. Publish events: orders.123.created, orders.123.paid
2. Aggregate: Get current order state
3. Control: Use unset to remove fields, poison-pill to reset
Maintain complete audit trail
1. Every state change is a message
2. Search by subject to see full history
3. Tombstone when entity deleted (preserves history)
Query state at any point in time
1. Get messages up to specific sequence
2. Aggregate to rebuild state at that moment
3. Compare states across time

Next Steps