Stream Functions
Stream functions provide persistent, ordered message storage with event sourcing capabilities. Use streams to build audit logs, maintain event-driven state, and implement replay-based systems with subject-based message routing.What are Streams? Streams are persistent logs that listen to subject patterns and store messages in order. Messages are identified by subjects and can be retrieved by sequence number. Streams support aggregation through folding, where messages are reduced to build current state.
Function List
stream-create
Create a new stream
stream-publish
Publish message to subject
stream-get
Retrieve messages by subject
stream-search
Search messages in stream
stream-aggregate
Fold messages into current state
stream-list
List all streams
stream-unset
Remove properties from aggregate
stream-tombstone
Mark subject as deleted
stream-poison-pill
Reset aggregate state
stream-create
Create a new stream that listens to specified subject patterns and stores messages persistently.Parameters
Name of the stream. Must be unique and contain only alphanumeric characters, dashes, and underscores.Naming conventions:
- Use descriptive names:
order-events,user-activity - Indicate purpose:
audit-log,state-changes
Array of subject patterns the stream listens to. Supports wildcards (
* for single token, > for multiple tokens).Examples:["orders.>"]- All order-related subjects["users.*.created", "users.*.updated"]- Specific user events["events.production.>"]- All production events
Optional description of the stream’s purpose.
Storage backend type.Options:
File- Persistent file storage (recommended)Memory- In-memory storage (faster, not persistent)
Message retention policy.Options:
Limits- Retain until configured limits reached (default)Interest- Retain while consumers are interestedWorkQueue- Remove messages after acknowledgment
Maximum number of messages to store.
-1 for unlimited.Maximum total size in bytes.
-1 for unlimited.Maximum age of messages in nanoseconds. Older messages are automatically deleted.
Maximum messages per subject. Useful for keeping only recent events per entity.
Policy when stream reaches limits.Options:
Old- Discard oldest messages (default)New- Reject new messages
Number of stream replicas for high availability (1-5).
Window in nanoseconds to detect duplicate messages. Default is 2 minutes.
Response
Example Usage
stream-publish
Publish a message to a subject. If a stream is configured to listen to this subject pattern, it will store the message.Parameters
Subject to publish the message to. This determines which streams receive the message.Subject patterns:
orders.123.created- Specific order eventusers.456.updated- User update eventevents.production.error- Production error event
Message payload. Can be a JSON object or string.Best practices:
- Include event type:
{"event": "order_created"} - Include timestamp:
{"timestamp": "2025-10-16T10:30:00Z"} - Include relevant IDs:
{"order_id": "123", "user_id": "456"}
Response
Example Usage
Common Patterns
Entity State Changes
Entity State Changes
Track all changes to an entity
User Activity Tracking
User Activity Tracking
Track user actions
stream-get
Retrieve messages from a stream by subject. Can retrieve all messages for a subject or a specific sequence range.Parameters
Name of the stream to retrieve messages from.
Subject to retrieve messages for. Must match messages exactly (no wildcards).
Start from this sequence number (inclusive). If provided, enables sequence-based retrieval.
End at this sequence number (inclusive). Requires
from_sequence.Maximum number of messages to return. Requires
from_sequence.If
from_sequence, limit, or to_sequence are provided, the function uses sequence-based retrieval. Otherwise, it retrieves all messages for the subject.Response
Example Usage
stream-search
Search for messages within a stream using a subject pattern and return matching results.Parameters
Name of the stream to search.
Subject pattern to search for. Supports wildcards.Examples:
orders.*- All direct order subjectsorders.>- All order subjects (including nested)users.123.*- All events for user 123
Maximum number of results to return (1-1000).
Response
Example Usage
stream-aggregate
Fold messages for a subject into current state using event sourcing. Messages are processed in order and reduced using lodash merge, with special control messages for state manipulation.Event Sourcing with Folding: Aggregate rebuilds current state by replaying all messages for a subject in order. Normal messages are merged, while control messages (unset, tombstone, poison-pill) modify the fold behavior.
Parameters
Name of the stream to aggregate from.
Subject pattern to search and aggregate. Supports wildcards.
Folding Logic
The aggregate function processes messages in order with this logic:- Normal messages: Merged into aggregate using lodash
merge() type: 'unset': Removes specified paths from aggregatetype: 'poison-pilled': Resets aggregate to empty object{}type: 'tombstoned': Stops processing (ignores subsequent messages)
Response
Example Usage
Folding Example
Given these messages in order:stream-list
List all streams in your account with their metadata and configuration.Parameters
No parameters required.Response
Example Usage
stream-unset
Publish an unset control message that removes specified properties from the aggregate when folded.Use Case: Remove sensitive data, correct mistakes, or clean up deprecated fields from the current state without affecting message history.
Parameters
Subject to publish the unset message to. Must match the subject used in aggregation.
Property path to remove from aggregate. Supports dot notation for nested properties.Examples:
"email"- Remove top-level property"address.street"- Remove nested property"metadata.temporary"- Remove from nested object
Response
Example Usage
How It Works
stream-tombstone
Publish a tombstone control message that stops processing further messages when encountered during aggregation. Use to mark a subject as deleted while preserving history.Tombstone Pattern: Marks an entity as deleted without removing history. Aggregation stops at the tombstone, ignoring all subsequent messages.
Parameters
Subject to publish the tombstone message to. Future aggregations will stop at this message.
Response
Example Usage
How It Works
stream-poison-pill
Publish a poison pill control message that resets the aggregate to an empty object when encountered during folding. Use to start fresh or correct corrupted state.Reset Pattern: Clears all previous state and starts fresh from this point. Useful for major state corrections or entity resets.
Parameters
Subject to publish the poison pill message to. Aggregate will reset to
{} at this message.Response
Example Usage
How It Works
Best Practices
Use Meaningful Subjects
Structure subjects hierarchically:
entity.id.event (e.g., orders.123.created)Include Event Types
Always include event type in value:
{"event": "order_created"}Add Timestamps
Include timestamps in message payloads for debugging and analytics
Aggregate by Entity
Use wildcards to aggregate all events for an entity:
orders.123.>Control Message Ordering
Understand fold order: normal merge → unset → poison-pill → tombstone
Preserve History
Control messages don’t delete history, only affect aggregation
Event Sourcing Patterns
Entity State Management
Entity State Management
Track all changes to an entity and rebuild current state
Audit Log
Audit Log
Maintain complete audit trail
Temporal Queries
Temporal Queries
Query state at any point in time