Topic-based publish/subscribe messaging for broadcasting events to multiple subscribers in real time.
modules::pubsub::PubSubModule
Sample Configuration
- class : modules::pubsub::PubSubModule
config :
adapter :
class : modules::pubsub::LocalAdapter
Configuration
The adapter to use for pub/sub distribution. Defaults to modules::pubsub::LocalAdapter (in-memory) when not specified.
Adapters
modules::pubsub::LocalAdapter
In-memory pub/sub using broadcast channels. Messages are delivered only to subscribers running in the same engine process. No external dependencies required.
class : modules::pubsub::LocalAdapter
modules::pubsub::RedisAdapter
Uses Redis Pub/Sub as the backend. Enables event delivery across multiple engine instances.
class : modules::pubsub::RedisAdapter
config :
redis_url : ${REDIS_URL:redis://localhost:6379}
Configuration
The URL of the Redis instance to use.
Functions
Publish an event to a topic. All functions subscribed to that topic will be invoked with the payload.
The topic to publish to. Must not be empty.
The event payload to broadcast. Can be any JSON-serializable value.
Trigger Type
This module adds a new Trigger Type: subscribe.
The topic to subscribe to. The function will be invoked whenever an event is published to this topic.
Subscribe Event Payload
The handler receives the raw value passed as data to the publish call. No envelope is added.
The exact value published to the topic. Shape is determined entirely by the publisher.
Sample Code
const fn = iii . registerFunction (
{ id: 'notifications::onOrderShipped' },
async ( data ) => {
console . log ( 'Order shipped:' , data )
return {}
},
)
iii . registerTrigger ({
type: 'subscribe' ,
function_id: fn . id ,
config: { topic: 'orders.shipped' },
})
await iii . trigger ({
function_id: 'publish' ,
payload: {
topic: 'orders.shipped' ,
data: { orderId: 'abc-123' , address: '123 Main St' },
},
action: TriggerAction . Void (),
})
def on_order_shipped ( data ):
print ( 'Order shipped:' , data)
return {}
iii.register_function({ 'id' : 'notifications::onOrderShipped' }, on_order_shipped)
iii.register_trigger({ 'type' : 'subscribe' , 'function_id' : 'notifications::onOrderShipped' , 'config' : { 'topic' : 'orders.shipped' }})
iii.trigger({
'function_id' : 'publish' ,
'payload' : {
'topic' : 'orders.shipped' ,
'data' : { 'orderId' : 'abc-123' , 'address' : '123 Main St' },
},
})
iii . register_function (
RegisterFunctionMessage :: with_id ( "notifications::onOrderShipped" . into ()),
| data | async move {
println! ( "Order shipped: {:?}" , data );
Ok ( json! ({}))
},
);
iii . register_trigger ( RegisterTriggerInput {
trigger_type : "subscribe" . into (),
function_id : "notifications::onOrderShipped" . into (),
config : json! ({ "topic" : "orders.shipped" }),
}) ? ;
iii . trigger ( TriggerRequest {
function_id : "publish" . into (),
payload : json! ({
"topic" : "orders.shipped" ,
"data" : { "orderId" : "abc-123" , "address" : "123 Main St" }
}),
action : Some ( TriggerAction :: Void ),
timeout_ms : None ,
}) . await ? ;
Usage Example: Fanout Notification
One publisher triggers two independent subscribers on the same topic:
const emailFn = iii . registerFunction (
{ id: 'notifications::sendEmailAlert' },
async ( data ) => {
await sendEmail ( data . userId , `Order ${ data . orderId } shipped` )
return {}
},
)
const pushFn = iii . registerFunction (
{ id: 'notifications::sendPushAlert' },
async ( data ) => {
await sendPushNotification ( data . userId , `Order ${ data . orderId } shipped` )
return {}
},
)
iii . registerTrigger ({
type: 'subscribe' ,
function_id: emailFn . id ,
config: { topic: 'orders.shipped' },
})
iii . registerTrigger ({
type: 'subscribe' ,
function_id: pushFn . id ,
config: { topic: 'orders.shipped' },
})
await iii . trigger ({
function_id: 'publish' ,
payload: {
topic: 'orders.shipped' ,
data: { orderId: 'abc-123' , userId: 'user-456' },
},
})
def send_email_alert ( data ):
send_email(data[ 'userId' ], f "Order { data[ 'orderId' ] } shipped" )
return {}
def send_push_alert ( data ):
send_push_notification(data[ 'userId' ], f "Order { data[ 'orderId' ] } shipped" )
return {}
iii.register_function({ 'id' : 'notifications::sendEmailAlert' }, send_email_alert)
iii.register_function({ 'id' : 'notifications::sendPushAlert' }, send_push_alert)
iii.register_trigger({ 'type' : 'subscribe' , 'function_id' : 'notifications::sendEmailAlert' , 'config' : { 'topic' : 'orders.shipped' }})
iii.register_trigger({ 'type' : 'subscribe' , 'function_id' : 'notifications::sendPushAlert' , 'config' : { 'topic' : 'orders.shipped' }})
iii.trigger({
'function_id' : 'publish' ,
'payload' : {
'topic' : 'orders.shipped' ,
'data' : { 'orderId' : 'abc-123' , 'userId' : 'user-456' },
},
})
use iii_sdk :: { RegisterFunctionMessage , RegisterTriggerInput , TriggerRequest };
use serde_json :: json;
iii . register_function (
RegisterFunctionMessage :: with_id ( "notifications::sendEmailAlert" . into ()),
| data | async move {
let order_id = data [ "orderId" ] . as_str () . unwrap_or ( "" );
let user_id = data [ "userId" ] . as_str () . unwrap_or ( "" );
send_email ( user_id , & format! ( "Order {} shipped" , order_id )) . await ? ;
Ok ( json! ({}))
},
);
iii . register_function (
RegisterFunctionMessage :: with_id ( "notifications::sendPushAlert" . into ()),
| data | async move {
let order_id = data [ "orderId" ] . as_str () . unwrap_or ( "" );
let user_id = data [ "userId" ] . as_str () . unwrap_or ( "" );
send_push_notification ( user_id , & format! ( "Order {} shipped" , order_id )) . await ? ;
Ok ( json! ({}))
},
);
iii . register_trigger ( RegisterTriggerInput {
trigger_type : "subscribe" . into (),
function_id : "notifications::sendEmailAlert" . into (),
config : json! ({ "topic" : "orders.shipped" }),
}) ? ;
iii . register_trigger ( RegisterTriggerInput {
trigger_type : "subscribe" . into (),
function_id : "notifications::sendPushAlert" . into (),
config : json! ({ "topic" : "orders.shipped" }),
}) ? ;
iii . trigger ( TriggerRequest {
function_id : "publish" . into (),
payload : json! ({
"topic" : "orders.shipped" ,
"data" : { "orderId" : "abc-123" , "userId" : "user-456" }
}),
action : None ,
timeout_ms : None ,
}) . await ? ;
PubSub vs Queue
Feature PubSub Queue Delivery Broadcast to all subscribers Single consumer per message Persistence No (fire-and-forget) Yes (with retries and DLQ) Ordering Not guaranteed FIFO within topic Best for Real-time notifications, fanout Reliable background processing
PubSub Flow