Skip to main content
Every iii function invocation receives a traceId (via new Logger()) that is automatically propagated to downstream queue handlers. All logs emitted through the context logger are structured JSON and correlated to the active trace.

Multi-step workflow with trace correlation

import {registerWorker, Logger, TriggerAction} from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134', {
  otel: {
    enabled: true,
    serviceName: 'orders-service',
    metricsEnabled: true,
  },
})

// Step 1 — HTTP handler
iii.registerFunction(
  { id: 'orders.create', description: 'Creates an order and starts the processing pipeline' },
  async (req: ApiRequest<{ customerId: string; amount: number; items: string[] }>) => {
    const logger = new Logger()
    const { customerId, amount, items } = req.body ?? {}

    if (!customerId || !amount) {
      return { status_code: 400, body: { error: 'customerId and amount are required' } }
    }

    const orderId = `order-${Date.now()}`

    logger.info('Order created', {
      orderId,
      customerId,
      amount,
      traceId: currentTraceId(), // same traceId will appear in downstream logs
    })

    await iii.trigger({
      function_id: 'state::set',
      payload: {
        scope: 'orders',
        key: orderId,
        value: { id: orderId, customerId, amount, items, status: 'created', createdAt: new Date().toISOString() },
      },
      action: TriggerAction.Void(),
    })

    await iii.trigger({
      function_id: 'enqueue',
      payload: { topic: 'order.process', data: { orderId, amount, customerId, items } },
      action: TriggerAction.Void(),
    })

    return { status_code: 201, body: { orderId, status: 'processing' } }
  },
)

iii.registerTrigger({
  type: 'http',
  function_id: 'orders.create',
  config: { api_path: 'orders', http_method: 'POST' },
})

Step 2 — Queue processor

iii.registerFunction(
  { id: 'order.process', description: 'Processes a created order' },
  async (data: { orderId: string; amount: number; customerId: string }) => {
    const logger = new Logger()
    const { orderId, amount, customerId } = data

    logger.info('Processing order', { orderId, amount, customerId, traceId: currentTraceId() })

    await iii.trigger({
      function_id: 'state::set',
      payload: {
        scope: 'orders',
        key: orderId,
        value: { status: 'processed', processedAt: new Date().toISOString() },
      },
      action: TriggerAction.Void(),
    })

    await iii.trigger({
      function_id: 'enqueue',
      payload: { topic: 'order.notify', data: { orderId, customerId, amount, status: 'processed' } },
      action: TriggerAction.Void(),
    })

    logger.info('Order processed', { orderId, traceId: currentTraceId() })
  },
)

iii.registerTrigger({
  type: 'queue',
  function_id: 'order.process',
  config: { topic: 'order.process' },
})

Step 3 — Notification

iii.registerFunction(
  { id: 'order.notify', description: 'Sends order notification' },
  async (data: { orderId: string; customerId: string; status: string; amount: number }) => {
    const logger = new Logger()
    const { orderId, customerId, status, amount } = data

    logger.info('Sending order notification', {
      orderId,
      customerId,
      status,
      amount,
      traceId: currentTraceId(),
    })

    // integrate with your notification service here
    logger.info('Order notification sent', { orderId, traceId: currentTraceId() })
  },
)

iii.registerTrigger({
  type: 'queue',
  function_id: 'order.notify',
  config: { topic: 'order.notify' },
})

OpenTelemetry setup

// Pass otel config to registerWorker()
const iii = registerWorker('ws://localhost:49134', {
  otel: {
    enabled: true,
    serviceName: 'my-service',
    serviceVersion: '1.0.0',
    metricsEnabled: true,
    metricsExportIntervalMs: 10000,
    reconnectionConfig: {
      maxRetries: 10,
    },
  },
})
The iii SDK exports traces and metrics automatically via the engine’s OpenTelemetry pipeline (OTLP over the WebSocket). No separate exporter configuration is required.

Logger methods

MethodTypeScriptPythonRust
Infologger.info(msg, metadata?)logger.info(msg, fields?)logger.info(msg, Some(json!({...})))
Warninglogger.warn(msg, metadata?)logger.warn(msg, fields?)logger.warn(msg, None)
Errorlogger.error(msg, metadata?)logger.error(msg, fields?)logger.error(msg, None)
Debuglogger.debug(msg, metadata?)logger.debug(msg, fields?)logger.debug(msg, None)

Key concepts

  • currentTraceId() / current_trace_id() is the W3C trace-context ID for the current invocation. It is automatically propagated to all downstream workers when you emit an event — no manual header passing is needed.
  • All logs are emitted as structured JSON via the engine’s log.info / log.warn / log.error functions.
  • The Node.js SDK supports OTel configuration directly in registerWorker(). Python and Rust use the engine’s built-in OTLP pipeline.
  • Log every step at entry and exit with the traceId to make multi-step flows fully observable.