Durable streams for real-time data subscriptions.
modules::stream::StreamModule
Architecture
Data Flow
When a worker triggers stream::set, the engine:
Persists the data via the configured adapter (Redis or KvStore)
Publishes a notification to all WebSocket clients subscribed to that stream and group
Evaluates registered stream triggers and fires matching handlers
A single stream::set handles persistence, real-time delivery, and reactive logic in one operation.
Groups
Streams organize data hierarchically: stream_name > group_id > item_id.
stream_name identifies the top-level stream (e.g. chat, presence, dashboard)
group_id partitions data within a stream (e.g. room-1, team-alpha)
item_id uniquely identifies a record within a group (e.g. user-123, msg-456)
Clients subscribe at the group level by connecting to ws://host:port/stream/{stream_name}/{group_id}/. They receive all item-level changes within that group.
Sample Configuration
- class : modules::stream::StreamModule
config :
port : ${STREAM_PORT:3112}
host : 0.0.0.0
adapter :
class : modules::stream::adapters::RedisAdapter
config :
redis_url : ${REDIS_URL:redis://localhost:6379}
Configuration
The port to listen on. Defaults to 3112.
The host to listen on. Defaults to 0.0.0.0.
The authentication function to use. It’s a path to a function that will be used to authenticate the client. You can
register the function using the iii SDK and then use the path to the function here.
The adapter to use. It’s the adapter that will be used to store the streams. You can register the adapter using the
iii SDK and then use the path to the adapter here.
Adapters
modules::stream::adapters::RedisAdapter
Uses Redis as the backend for the streams. Stores stream data in Redis and leverages Redis Pub/Sub for real-time event delivery.
class : modules::stream::adapters::RedisAdapter
config :
redis_url : ${REDIS_URL:redis://localhost:6379}
Configuration
The URL of the Redis instance to use.
modules::stream::adapters::KvStore
Built-in key-value store. Supports in-memory or file-based persistence. No external dependencies required.
class : modules::stream::adapters::KvStore
config :
store_method : file_based
file_path : ./data/streams_store.db
Configuration
Storage method. Options: in_memory (lost on restart) or file_based (persisted to disk).
Directory path for file-based storage. Each stream is stored as a separate file.
Functions
Sets a value in the stream.
The ID of the stream to set the value in.
The group ID of the stream to set the value in.
The item ID of the stream to set the value in.
The value to set in the stream.
The previous value, or null if the item was newly created.
The value now stored in the stream.
Gets a value from the stream.
The ID of the stream to retrieve the value from.
The group ID in the stream to retrieve the value from.
The item ID in the stream to retrieve.
The value retrieved from the stream.
Deletes a value from the stream.
The ID of the stream to delete the value from.
The group ID in the stream to delete the value from.
The item ID in the stream to delete.
The value that was deleted, or null if the item did not exist.
Retrieves a group from the stream. This function will return all the items in the group.
The ID of the stream to retrieve the group from.
The group ID in the stream to retrieve the group from.
The group retrieved from the stream. It’s an array of items in the group.
List all groups in a stream.
The ID of the stream to list groups from.
An array of group IDs in the stream.
List all streams with their group metadata.
This function takes no parameters.
An array of stream metadata objects. Each object has an id (string) and a groups (string[]) field.
The total number of streams.
Send a custom event to all subscribers of a stream group.
The ID of the stream to send the event to.
The group ID in the stream to send the event to.
The event type string delivered to subscribers.
Optional item ID to associate the event with.
The event payload delivered to subscribers.
Atomically update an item in the stream using a list of operations.
The ID of the stream containing the item to update.
The group ID in the stream containing the item to update.
The item ID in the stream to update.
The list of atomic operations to apply. Each operation is a tagged object with a type field (set, merge, increment, decrement, or remove) and associated fields (path, value, by).
The previous value, or null if the item was newly created.
The value now stored in the stream after applying the operations.
Authentication
It’s possible to implement a function to handle authentication.
Define a function to handle the authentication. It received one single argument with the request data.
The HTTP headers sent with the request.
query_params
Record<string, string[]>
required
Query parameters in the request, as a map from key to array of string values.
The remote address (IP) of the request.
Node / TypeScript
Python
Rust
iii . registerFunction ({ id: 'onAuth' }, ( input ) => ({
context: { name: 'John Doe' },
}))
def on_auth ( input ):
return { 'context' : { 'name' : 'John Doe' }}
iii.register_function({ 'id' : 'onAuth' }, on_auth)
iii . register_function (( RegisterFunctionMessage :: with_id ( "onAuth" . into ()), | _input | async move {
Ok ( json! ({ "context" : { "name" : "John Doe" } }))
}));
Make sure you add the function to the configuration file.
- class : modules::stream::StreamModule
config :
auth_function : onAuth
Now whenever someone opens a websocket connection, the function onAuth will be called with the request data.
Trigger Types
This module adds three trigger types: stream (item changes), stream:join (WebSocket connect), and stream:leave (WebSocket disconnect).
stream:join and stream:leave
Fire when a client connects or disconnects via WebSocket. Both trigger types deliver the same payload to the handler:
The subscription ID, used for uniqueness and logging.
The stream name of the subscription.
The group ID of the subscription.
The item ID of the subscription, if provided by the client.
The context generated by the authentication layer.
stream
Fires when an item changes in the stream (via stream::set, stream::update, or stream::delete). Register with a config object to filter which stream, group, or item triggers the handler:
The stream name to watch. Only changes on this stream fire the handler.
If set, only changes within this group fire the handler.
If set, only changes to this specific item fire the handler.
Function ID for conditional execution. The engine invokes it with the event payload; if it returns false, the handler function is not called.
Sample Code
Node / TypeScript
Python
Rust
const fn = iii . registerFunction ({ id: 'onJoin' }, ( input ) => {
console . log ( 'Joined stream' , input )
return {}
})
iii . registerTrigger ({
type: 'stream:join' ,
function_id: fn . id ,
config: {},
})
def on_join ( input ):
print ( 'Joined stream' , input )
return {}
iii.register_function({ 'id' : 'onJoin' }, on_join)
iii.register_trigger({ 'type' : 'stream:join' , 'function_id' : 'onJoin' , 'config' : {}})
iii . register_function (( RegisterFunctionMessage :: with_id ( "onJoin" . into ()), | input | async move {
println! ( "Joined stream: {:?}" , input ));
Ok ( json! ({}))
});
iii . register_trigger ( RegisterTriggerInput { trigger_type : "stream:join" . into (), function_id : "onJoin" . into (), config : json! ({}) }) ? ;
Usage Example: Real-Time Presence
Streams organize data by stream_name, group_id, and item_id. Use for live presence, collaborative docs, or dashboards:
Node / TypeScript
Python
Rust
import { registerWorker , TriggerAction } from 'iii-sdk'
const iii = registerWorker ( 'ws://localhost:49134' )
iii . trigger ({
function_id: 'stream::set' ,
payload: {
stream_name: 'presence' ,
group_id: 'room-1' ,
item_id: 'user-123' ,
data: { name: 'Alice' , online: true , lastSeen: new Date (). toISOString () },
},
action: TriggerAction . Void (),
})
const user = await iii . trigger ({
function_id: 'stream::get' ,
payload: {
stream_name: 'presence' ,
group_id: 'room-1' ,
item_id: 'user-123' ,
},
})
const roomMembers = await iii . trigger ({
function_id: 'stream::list' ,
payload: {
stream_name: 'presence' ,
group_id: 'room-1' ,
},
})
iii . trigger ({
function_id: 'stream::delete' ,
payload: {
stream_name: 'presence' ,
group_id: 'room-1' ,
item_id: 'user-123' ,
},
action: TriggerAction . Void (),
})
from iii import register_worker, TriggerAction
iii = register_worker( 'ws://localhost:49134' )
iii.trigger({
'function_id' : 'stream::set' ,
'payload' : {
'stream_name' : 'presence' ,
'group_id' : 'room-1' ,
'item_id' : 'user-123' ,
'data' : { 'name' : 'Alice' , 'online' : True , 'lastSeen' : '2026-01-01T00:00:00Z' },
},
'action' : TriggerAction.Void(),
})
user = iii.trigger({
'function_id' : 'stream::get' ,
'payload' : {
'stream_name' : 'presence' ,
'group_id' : 'room-1' ,
'item_id' : 'user-123' ,
},
})
room_members = iii.trigger({
'function_id' : 'stream::list' ,
'payload' : {
'stream_name' : 'presence' ,
'group_id' : 'room-1' ,
},
})
iii.trigger({
'function_id' : 'stream::delete' ,
'payload' : {
'stream_name' : 'presence' ,
'group_id' : 'room-1' ,
'item_id' : 'user-123' ,
},
'action' : TriggerAction.Void(),
})
use iii_sdk :: {register_worker, InitOptions , TriggerRequest , TriggerAction };
use serde_json :: json;
let iii = register_worker ( "ws://localhost:49134" , InitOptions :: default ());
iii . trigger ( TriggerRequest :: new ( "stream::set" , json! ({
"stream_name" : "presence" ,
"group_id" : "room-1" ,
"item_id" : "user-123" ,
"data" : { "name" : "Alice" , "online" : true }
})) . action ( TriggerAction :: void ())) . await ? ;
let user = iii . trigger ( TriggerRequest :: new ( "stream::get" , json! ({
"stream_name" : "presence" ,
"group_id" : "room-1" ,
"item_id" : "user-123"
}))) . await ? ;
let room_members = iii . trigger ( TriggerRequest :: new ( "stream::list" , json! ({
"stream_name" : "presence" ,
"group_id" : "room-1"
}))) . await ? ;
iii . trigger ( TriggerRequest :: new ( "stream::delete" , json! ({
"stream_name" : "presence" ,
"group_id" : "room-1" ,
"item_id" : "user-123"
})) . action ( TriggerAction :: void ())) . await ? ;
Clients connect via WebSocket to ws://host:3112/stream/presence/room-1/ and receive real-time updates when items change.
Usage Example: Join with Auth Context
Configure the stream module with an auth function:
- class : modules::stream::StreamModule
config :
port : 3112
host : 0.0.0.0
auth_function : stream::auth
adapter :
class : modules::stream::adapters::KvStore
config :
store_method : file_based
file_path : ./data/stream_store
Register the auth function. Clients may send the token via Authorization: Bearer <token> (Node.js) or Sec-WebSocket-Protocol: Authorization,<token> (browser stream-client):
Node / TypeScript
Python
Rust
iii . registerFunction ({ id: 'stream::auth' }, ( input ) => {
const auth = input . headers ?.[ 'authorization' ]?. replace ( / ^ Bearer \s + / i , '' )
const proto = input . headers ?.[ 'sec-websocket-protocol' ]
const token = auth ?? ( proto ?. startsWith ( 'Authorization,' ) ? proto . slice ( 13 ) : null )
return token ? { context: { userId: 'user-from-token' } } : { context: null }
})
def stream_auth ( input ):
auth = input .get( 'headers' , {}).get( 'authorization' , '' )
token = auth.replace( 'Bearer ' , '' , 1 ) if auth.startswith( 'Bearer ' ) else None
if token:
return { 'context' : { 'userId' : 'user-from-token' }}
return { 'context' : None }
iii.register_function({ 'id' : 'stream::auth' }, stream_auth)
iii . register_function (( RegisterFunctionMessage :: with_id ( "stream::auth" . into ()), | input | async move {
let headers = input . get ( "headers" ) . and_then ( | h | h . as_object ()));
let token = headers
. and_then ( | h | h . get ( "authorization" ))
. and_then ( | v | v . as_str ())
. and_then ( | s | s . strip_prefix ( "Bearer " ));
match token {
Some ( _ ) => Ok ( json! ({ "context" : { "userId" : "user-from-token" } })),
None => Ok ( json! ({ "context" : null })),
}
});
Join/leave triggers receive the auth context:
Node / TypeScript
Python
Rust
const fn = iii . registerFunction ({ id: 'onJoin' }, ( input ) => {
const { stream_name , group_id , id : itemId , context } = input
if ( context ?. userId ) {
console . log ( `User ${ context . userId } joined ${ stream_name } / ${ group_id } / ${ itemId } ` )
}
return {}
})
iii . registerTrigger ({
type: 'stream:join' ,
function_id: fn . id ,
config: {},
})
def on_join ( input ):
context = input .get( 'context' , {})
if context.get( 'userId' ):
print ( f "User { context[ 'userId' ] } joined { input [ 'stream_name' ] } / { input [ 'group_id' ] } / { input .get( 'id' , '' ) } " )
return {}
iii.register_function({ 'id' : 'onJoin' }, on_join)
iii.register_trigger({ 'type' : 'stream:join' , 'function_id' : 'onJoin' , 'config' : {}})
iii . register_function (( RegisterFunctionMessage :: with_id ( "onJoin" . into ()), | input | async move {
if let Some ( user_id ) = input . get ( "context" ) . and_then ( | c | c . get ( "userId" )) . and_then ( | u | u . as_str ()) {
let stream = input [ "stream_name" ] . as_str () . unwrap_or ( "" ));
let group = input [ "group_id" ] . as_str () . unwrap_or ( "" );
let item = input [ "id" ] . as_str () . unwrap_or ( "" );
println! ( "User {} joined {}/{}/{}" , user_id , stream , group , item );
}
Ok ( json! ({}))
});
iii . register_trigger ( RegisterTriggerInput { trigger_type : "stream:join" . into (), function_id : "onJoin" . into (), config : json! ({}) }) ? ;
Usage Example: Conditional Join
Node / TypeScript
Python
Rust
const conditionFn = iii . registerFunction (
{ id: 'conditions::requireContext' },
async ( input ) => input . context ?. userId != null ,
)
const fn = iii . registerFunction ({ id: 'onJoin' }, ( input ) => {
console . log ( 'User joined:' , input . context ?. userId , input . stream_name )
return {}
})
iii . registerTrigger ({
type: 'stream:join' ,
function_id: fn . id ,
config: { condition_function_id: conditionFn . id },
})
def require_context ( input ):
return input .get( 'context' , {}).get( 'userId' ) is not None
iii.register_function({ 'id' : 'conditions::requireContext' }, require_context)
def on_join ( input ):
print ( 'User joined:' , input .get( 'context' , {}).get( 'userId' ), input [ 'stream_name' ])
return {}
iii.register_function({ 'id' : 'onJoin' }, on_join)
iii.register_trigger({
'type' : 'stream:join' ,
'function_id' : 'onJoin' ,
'config' : { 'condition_function_id' : 'conditions::requireContext' },
})
iii . register_function (( RegisterFunctionMessage :: with_id ( "conditions::requireContext" . into ()), | input | async move {
let has_user = input
. get ( "context" )
. and_then ( | c | c . get ( "userId" ))
. is_some ());
Ok ( json! ( has_user ))
});
iii . register_function (( RegisterFunctionMessage :: with_id ( "onJoin" . into ()), | input | async move {
let user_id = input . get ( "context" ) . and_then ( | c | c . get ( "userId" )) . and_then ( | u | u . as_str ()) . unwrap_or ( "" ));
let stream = input [ "stream_name" ] . as_str () . unwrap_or ( "" );
println! ( "User joined: {} {}" , user_id , stream );
Ok ( json! ({}))
});
iii . register_trigger ( RegisterTriggerInput { trigger_type : "stream:join" . into (), function_id : "onJoin" . into (), config : json! ({
"condition_function_id" : "conditions::requireContext"
}) }) ? ;