Skip to main content
Data channels provide a streaming primitive for moving binary data between functions — even when those functions run in different worker processes. Instead of serializing an entire payload into a single invocation message, channels let producers write data incrementally while consumers read it as a stream.

Why Channels Exist

Function invocations in iii pass data as JSON-serializable messages. This works well for structured payloads, but breaks down when dealing with large binary blobs (files, media, datasets) or data that is produced over time (progress updates, partial results). Channels solve this by giving each side a Node.js stream backed by a WebSocket connection through the engine. A channel has two ends:
  • A writer that exposes a Writable stream for sending data.
  • A reader that exposes a Readable stream for receiving data.
Each end also has a ref — a small, serializable object (StreamChannelRef) that can be passed as a field inside iii.trigger() data. When the receiving function deserializes the ref, the SDK automatically connects it to the engine and materializes the corresponding ChannelWriter or ChannelReader.

How Channels Work

Function A creates a channel, passes the readerRef to Function B through a regular function call, then writes data to the writer stream. The engine routes the data through a WebSocket-backed pipe to Function B, where it arrives on the reader stream. When Function A ends the writer, Function B’s reader emits end.

Creating a Channel

const channel = await iii.createChannel()
The returned object contains both local stream objects and their serializable counterparts:
PropertyNode / TypeScriptPythonRust
Writer (local)channel.writer (ChannelWriter)channel.writer (ChannelWriter)channel.writer (ChannelWriter)
Reader (local)channel.reader (ChannelReader)channel.reader (ChannelReader)channel.reader (ChannelReader)
Writer ref (serializable)channel.writerRefchannel.writer_refchannel.writer_ref
Reader ref (serializable)channel.readerRefchannel.reader_refchannel.reader_ref
Pass a ref as a field in iii.trigger() data to hand the other end of the channel to another function. The SDK on the receiving side automatically materializes the ref into a ChannelWriter or ChannelReader. The StreamChannelRef is a plain object that survives JSON serialization:
type StreamChannelRef = {
  channel_id: string
  access_key: string
  direction: 'read' | 'write'
}

ChannelWriter

ChannelWriter wraps a WebSocket connection and exposes:
CapabilityNode / TypeScriptPythonRust
Write binary datawriter.stream.write(data)writer.stream.write(data) (sync, fire-and-forget) or await writer.write(data) (async)writer.write(&data).await
Send text messagewriter.sendMessage(msg)writer.send_message(msg) (sync, fire-and-forget) or await writer.send_message_async(msg) (async)writer.send_message(&msg).await
Closewriter.stream.end()writer.close() (sync, fire-and-forget) or await writer.close_async() (async)writer.close().await
Access writable streamwriter.streamwriter.stream (WritableStream)N/A
Data written to the writer is automatically chunked into 64 KB frames and sent over the WebSocket. The writer lazily connects to the engine on first write, so creating a channel is cheap even if the writer is not used immediately. The Python ChannelWriter provides both sync and async APIs. The sync methods (send_message(), close()) are fire-and-forget wrappers that queue work on the event loop. The writer.stream property exposes a WritableStream with sync write(data) and end(data?) methods that mirror Node.js Writable semantics.

ChannelReader

ChannelReader wraps a WebSocket connection and exposes:
CapabilityNode / TypeScriptPythonRust
Read as streamfor await (const chunk of reader.stream)async for chunk in readerreader.next_binary().await / reader.read_all().await
Read all at onceCollect chunks manuallyawait reader.read_all()reader.read_all().await
Listen for text messagesreader.onMessage(callback)reader.on_message(callback)reader.on_message(callback).await
Access readable streamreader.streamreader.stream (ReadableStream)N/A
The Python ChannelReader implements __aiter__, so use async for chunk in reader to iterate over binary chunks. Text messages received on the same WebSocket are dispatched to callbacks registered via reader.on_message(callback) where callback is Callable[[str], Any] — it receives the raw text message string. Backpressure is handled automatically by pausing the WebSocket when the stream buffer is full. The reader connects lazily, establishing the WebSocket when reading begins.

Example: One-Way Data Streaming

In this pattern a sender function creates a channel, writes data to it, and passes the reader ref to a processor function. The processor reads the stream, computes a result, and returns it.
const processor = iii.registerFunction(
  { id: 'data.processor' },
  async (input: { label: string; reader: ChannelReader }) => {
    const chunks: Buffer[] = []
    for await (const chunk of input.reader.stream) {
      chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk))
    }

    const records = JSON.parse(Buffer.concat(chunks).toString('utf-8'))
    const sum = records.reduce((acc: number, r: { value: number }) => acc + r.value, 0)

    return {
      label: input.label,
      count: records.length,
      sum,
      average: sum / records.length,
    }
  },
)

const sender = iii.registerFunction(
  { id: 'data.sender' },
  async (input: { records: { name: string; value: number }[] }) => {
    const channel = await iii.createChannel()

    const writePromise = new Promise<void>((resolve, reject) => {
      const payload = Buffer.from(JSON.stringify(input.records))
      channel.writer.stream.end(payload, (err?: Error | null) => {
        if (err) reject(err)
        else resolve()
      })
    })

    const result = await iii.trigger({
      function_id: 'data.processor',
      payload: { label: 'metrics-batch', reader: channel.readerRef },
    })

    await writePromise
    return result
  },
)
The sender writes all records as a single JSON buffer and immediately ends the stream. The processor reads until the stream closes, parses the JSON, and returns computed statistics.

Example: Bidirectional Streaming with Progress

When two functions need to exchange data in both directions, create two channels — one for input, one for output. The writer’s sendMessage() method provides a side channel for progress updates or metadata that doesn’t mix with the binary data stream.
const worker = iii.registerFunction(
  { id: 'stream.worker' },
  async (input: { reader: ChannelReader; writer: ChannelWriter }) => {
    const { reader, writer } = input
    const chunks: Buffer[] = []
    let chunkCount = 0

    for await (const chunk of reader.stream) {
      chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk))
      chunkCount++
      writer.sendMessage(
        JSON.stringify({ type: 'progress', chunks_received: chunkCount }),
      )
    }

    const fullData = Buffer.concat(chunks).toString('utf-8')
    const words = fullData.split(/\s+/).filter(Boolean)

    writer.sendMessage(
      JSON.stringify({
        type: 'complete',
        word_count: words.length,
        byte_count: Buffer.concat(chunks).length,
      }),
    )

    writer.stream.end(
      Buffer.from(JSON.stringify({ words: words.slice(0, 5), total: words.length })),
    )

    return { status: 'done' }
  },
)

const coordinator = iii.registerFunction(
  { id: 'stream.coordinator' },
  async (input: { text: string; chunkSize: number }) => {
    const inputChannel = await iii.createChannel()
    const outputChannel = await iii.createChannel()

    const messages: unknown[] = []
    outputChannel.reader.onMessage(msg => {
      messages.push(JSON.parse(msg))
    })

    const textBuf = Buffer.from(input.text)
    const writePromise = new Promise<void>((resolve, reject) => {
      let offset = 0
      const writeNext = () => {
        while (offset < textBuf.length) {
          const end = Math.min(offset + input.chunkSize, textBuf.length)
          const chunk = textBuf.subarray(offset, end)
          offset = end

          if (!inputChannel.writer.stream.write(chunk)) {
            inputChannel.writer.stream.once('drain', writeNext)
            return
          }
        }
        inputChannel.writer.stream.end((err?: Error | null) => {
          if (err) reject(err)
          else resolve()
        })
      }
      writeNext()
    })

    const triggerPromise = iii.trigger({
      function_id: 'stream.worker',
      payload: {
        reader: inputChannel.readerRef,
        writer: outputChannel.writerRef,
      },
    })

    const resultChunks: Buffer[] = []
    for await (const chunk of outputChannel.reader.stream) {
      resultChunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk))
    }

    await writePromise
    const workerResult = await triggerPromise
    const binaryResult = JSON.parse(Buffer.concat(resultChunks).toString('utf-8'))

    return { messages, binaryResult, workerResult }
  },
)
The coordinator creates two channels: one for input (sending text chunks to the worker) and one for output (receiving binary results back). The worker reads from the input channel, sends progress updates via sendMessage(), and writes the final result to the output channel’s stream. Meanwhile, the coordinator listens for text messages on the output channel’s reader and reads the binary result from the output reader stream.