Skip to main content
This example builds a complete Todo API using the iii SDK directly. It covers custom stream registration (iii.createStream), reading and writing via stream::get / stream::set / stream::delete, and the helper stream::list for group-level queries.

Custom stream

The iii SDK lets you register a custom stream backed by any storage — in-memory, a database, or a remote service. The engine routes stream::get / stream::set / stream::delete / stream::list requests to your implementation.
// stream.ts
import { registerWorker } from 'iii-sdk'

export type Todo = {
  id: string
  description: string
  groupId: string
  createdAt: string
  dueDate?: string
  completedAt: string | null
}

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

let todos: Todo[] = []

iii.createStream('todo', {
  get: async (input) => todos.find((t) => t.id === input.item_id) ?? null,

  set: async (input) => {
    const existing = todos.find((t) => t.id === input.item_id)
    if (existing) {
      const updated = { ...existing, ...input.data }
      todos = todos.map((t) => (t.id === input.item_id ? updated : t))
      return { old_value: existing, new_value: updated }
    }
    const newTodo: Todo = {
      id: input.item_id,
      groupId: input.group_id,
      description: input.data.description,
      createdAt: new Date().toISOString(),
      dueDate: input.data.dueDate,
      completedAt: null,
    }
    todos.push(newTodo)
    return { old_value: undefined, new_value: newTodo }
  },

  delete: async (input) => {
    const old_value = todos.find((t) => t.id === input.item_id)
    todos = todos.filter((t) => t.id !== input.item_id)
    return { old_value }
  },

  list: async (input) => todos.filter((t) => t.groupId === input.group_id),
  listGroups: async () => [...new Set(todos.map((t) => t.groupId))],
  update: async () => { throw new Error('Not implemented') },
})

export { iii }

Create

import {Logger, TriggerAction} from 'iii-sdk'
import { iii } from './stream'

iii.registerFunction(
  { id: 'api.post.todo', description: 'Create a new todo' },
  async (req: ApiRequest<{ description: string; dueDate?: string }>) => {
    const logger = new Logger()
    const { description, dueDate } = req.body ?? {}

    if (!description) {
      return { status_code: 400, body: { error: 'Description is required' } } satisfies ApiResponse
    }

    const todoId = `todo-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`

    logger.info('Creating todo', { todoId })

    const todo = await iii.trigger({
      function_id: 'stream::set',
      payload: {
        stream_name: 'todo',
        group_id: 'inbox',
        item_id: todoId,
        data: {
          id: todoId,
          description,
          groupId: 'inbox',
          createdAt: new Date().toISOString(),
          dueDate,
          completedAt: null,
        },
      },
    })

    logger.info('Todo created', { todoId })
    return { status_code: 201, body: todo } satisfies ApiResponse
  },
)

iii.registerTrigger({
  type: 'http',
  function_id: 'api.post.todo',
  config: { api_path: 'todo', http_method: 'POST', description: 'Create a new todo' },
})

Update

iii.registerFunction(
  { id: 'api.put.todo', description: 'Update a todo' },
  async (req: ApiRequest) => {
    const logger = new Logger()
    const todoId = req.path_params?.id

    const existing = todoId
      ? await iii.trigger<Todo | null>({
          function_id: 'stream::get',
          payload: {
            stream_name: 'todo',
            group_id: 'inbox',
            item_id: todoId,
          },
        })
      : null

    if (!existing) {
      return { status_code: 404, body: { error: 'Todo not found' } } satisfies ApiResponse
    }

    logger.info('Updating todo', { todoId })

    const updated = await iii.trigger({
      function_id: 'stream::set',
      payload: {
        stream_name: 'todo',
        group_id: 'inbox',
        item_id: todoId,
        data: { ...existing, ...req.body },
      },
    })

    logger.info('Todo updated', { todoId })
    return { status_code: 200, body: updated } satisfies ApiResponse
  },
)

iii.registerTrigger({
  type: 'http',
  function_id: 'api.put.todo',
  config: { api_path: 'todo/:id', http_method: 'PUT', description: 'Update a todo' },
})

Delete

iii.registerFunction(
  { id: 'api.delete.todo', description: 'Delete a todo' },
  async (req: ApiRequest<{ todoId: string }>) => {
    const logger = new Logger()
    const { todoId } = req.body ?? {}

    if (!todoId) {
      return { status_code: 400, body: { error: 'todoId is required' } } satisfies ApiResponse
    }

    logger.info('Deleting todo', { todoId })

    iii.trigger({
      function_id: 'stream::delete',
      payload: {
        stream_name: 'todo',
        group_id: 'inbox',
        item_id: todoId,
      },
      action: TriggerAction.Void(),
    })

    logger.info('Todo deleted', { todoId })
    return { status_code: 200, body: { success: true } } satisfies ApiResponse
  },
)

iii.registerTrigger({
  type: 'http',
  function_id: 'api.delete.todo',
  config: { api_path: 'todo', http_method: 'DELETE', description: 'Delete a todo' },
})

Key concepts

  • iii.createStream(name, impl) registers a custom stream with get / set / delete / list / listGroups / update methods. The engine routes stream::* requests to this implementation.
  • stream::set returns the stored value — in the custom implementation above, the set handler shapes the result.
  • stream::get returns null if the item doesn’t exist — always guard before writing an update.
  • In Rust, Streams::new(iii.clone()) wraps the engine connection and provides streams.update(key, ops) for atomic modifications.