Skip to main content
State in iii is a distributed key-value store addressed by scope (group) + key (item ID). Any worker can read and write state by triggering state::get, state::set, state::delete, and state::list through the engine.

Writing state

import { registerWorker, Logger, TriggerAction } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction(
  { id: 'users.update_status', description: 'Update user status in state' },
  async (req: ApiRequest<{ status: string }>) => {
    const logger = new Logger()
    const userId = req.path_params?.id

    if (!userId) {
      return { status_code: 400, body: { error: 'Missing user ID' } }
    }

    const { status = 'active' } = req.body ?? {}

    // Write to state — addressed by scope + key
    await iii.trigger({
      function_id: 'state::set',
      payload: { scope: 'users', key: userId, value: { status, updatedAt: new Date().toISOString() } },
      action: TriggerAction.Void(),
    })

    logger.info(`Updated user ${userId} status to ${status}`)

    return { status_code: 200, body: { userId, status } }
  },
)

iii.registerTrigger({
  type: 'http',
  function_id: 'users.update_status',
  config: { api_path: 'users/:id/status', http_method: 'POST' },
})

Reading state

iii.registerFunction(
  { id: 'users.get_status', description: 'Read user status from state' },
  async (req: ApiRequest) => {
    const logger = new Logger()
    const userId = req.path_params?.id

    if (!userId) {
      return { status_code: 400, body: { error: 'Missing user id' } }
    }

    const user = await iii.trigger<{ status: string } | null>({
      function_id: 'state::get',
      payload: { scope: 'users', key: userId },
    })

    if (!user) {
      return { status_code: 404, body: { error: 'User not found' } }
    }

    logger.info('Got user status', { userId, status: user.status })
    return { status_code: 200, body: user }
  },
)

iii.registerTrigger({
  type: 'http',
  function_id: 'users.get_status',
  config: { api_path: 'users/:id/status', http_method: 'GET' },
})

Batch read with state::list

state::list returns all keys in a scope — useful in cron jobs that sweep over accumulated data.
iii.registerFunction(
  { id: 'cron.orders_audit', description: 'Checks for overdue orders' },
  async () => {
    const logger = new Logger()

    const orders = await iii.trigger<{ id: string; shipDate: string; complete: boolean; status: string }[]>({
      function_id: 'state::list',
      payload: { scope: 'orders' },
    })

    for (const order of orders ?? []) {
      if (!order.complete && new Date() > new Date(order.shipDate)) {
        logger.warn('Order overdue', { orderId: order.id })

        await iii.trigger({
          function_id: 'enqueue',
          payload: { topic: 'notification', data: { orderId: order.id, templateId: 'order-audit-warning', status: order.status } },
          action: TriggerAction.Void(),
        })
      }
    }
  },
)

iii.registerTrigger({
  type: 'cron',
  function_id: 'cron.orders_audit',
  config: { expression: '*/5 * * * *' },
})

State API reference

OperationTypeScriptPythonRust
Writeawait iii.trigger({ function_id: 'state::set', payload: { scope, key, value }, action: TriggerAction.Void() })iii.trigger({'function_id': 'state::set', 'payload': {...}, 'action': TriggerAction.Void()})iii.trigger(TriggerRequest { function_id: "state::set".into(), payload: json!({...}), action: Some(TriggerAction::Void), timeout_ms: None }).await?
Read oneawait iii.trigger({ function_id: 'state::get', payload: { scope, key } })iii.trigger({'function_id': 'state::get', 'payload': {...}})iii.trigger(TriggerRequest { function_id: "state::get".into(), payload: json!({...}), action: None, timeout_ms: None }).await?
Read all in scopeawait iii.trigger({ function_id: 'state::list', payload: { scope } })iii.trigger({'function_id': 'state::list', 'payload': {...}})iii.trigger(TriggerRequest { function_id: "state::list".into(), payload: json!({...}), action: None, timeout_ms: None }).await?
Deleteawait iii.trigger({ function_id: 'state::delete', payload: { scope, key }, action: TriggerAction.Void() })iii.trigger({'function_id': 'state::delete', 'payload': {...}, 'action': TriggerAction.Void()})iii.trigger(TriggerRequest { function_id: "state::delete".into(), payload: json!({...}), action: Some(TriggerAction::Void), timeout_ms: None }).await?

Key concepts

  • State is addressed by scope (equivalent to groupId in streams) and key (item ID). Use scope as a logical namespace ("users", "orders") and key as the unique identifier within it.
  • state::list returns all items in a scope as an array. Use it sparingly in hot paths; it’s best suited for cron-driven batch sweeps.
  • State is separate from named streams. Streams have real-time WebSocket push; state does not.
  • Unlike streams, state has no update semantics — to modify a record, read it first then write the merged result back with state::set.