Skip to main content

Installation

cargo add iii-sdk

Initialization

Create and return a connected SDK instance. The WebSocket connection is established automatically in a background Tokio task.
use iii_sdk::{register_worker, InitOptions};

let iii = register_worker("ws://localhost:49134", InitOptions::default());

Methods

shutdown

Shutdown the III client. This stops the connection loop and sends a shutdown signal. If the otel feature is enabled, this will spawn a background task to flush telemetry data, but does NOT wait for it to complete. For guaranteed telemetry flush, use shutdown_async() instead. Signature
shutdown()

shutdown_async

Shutdown the III client and flush all pending telemetry data. This method stops the connection loop and sends a shutdown signal. When the otel feature is enabled, it additionally awaits the OpenTelemetry flush, ensuring all spans, metrics, and logs are exported before returning. Signature
async shutdown_async()

register

Register a function using the RegisterFunction builder. This is the recommended API — combines ID, handler, and auto-generated request_format/response_format schemas in one step. Signature
register(reg: RegisterFunction) -> FunctionRef

Parameters

NameTypeRequiredDescription
regRegisterFunctionYesFunction registration built with RegisterFunction::new or RegisterFunction::new_async.

Example

use iii_sdk::{register_worker, InitOptions, RegisterFunction};
use serde::Deserialize;
use serde_json::json;

#[derive(Deserialize)]
struct GreetInput { name: String }

fn greet(input: GreetInput) -> Result<String, String> {
    Ok(format!("Hello, {}!", input.name))
}

// Multi-arg (positional JSON array)
fn add(a: i32, b: i32) -> Result<i32, String> { Ok(a + b) }

// Async function
async fn fetch_data(url: String) -> Result<String, String> {
    Ok(format!("fetched {}", url))
}

let iii = register_worker("ws://localhost:49134", InitOptions::default());

// Sync with struct arg — JSON: {"name": "World"}
iii.register(
    RegisterFunction::new("greet", greet)
        .description("Greet someone by name")
);

// Sync with positional args — JSON: [3, 4]
iii.register(RegisterFunction::new("add", add));

// Async
iii.register(
    RegisterFunction::new_async("fetch", fetch_data)
        .description("Fetch data from URL")
);

register_function

Register a function with the engine. Accepts any type implementing IntoFunctionRegistration: a RegisterFunction builder (preferred), or a (RegisterFunctionMessage, Handler) tuple. For the simplest API with auto-generated schemas, use RegisterFunction::new / RegisterFunction::new_async. Signature
register_function<R: IntoFunctionRegistration>(registration: R) -> FunctionRef

Parameters

NameTypeRequiredDescription
registrationR: IntoFunctionRegistrationYesA RegisterFunction builder, or a (RegisterFunctionMessage, H) tuple where H is a handler or HttpInvocationConfig.

Example

use iii_sdk::{register_worker, InitOptions, RegisterFunction, RegisterFunctionMessage, iii_fn};
use serde::Deserialize;
use serde_json::{json, Value};

#[derive(Deserialize)]
struct Input { name: String }

fn greet(input: Input) -> Result<String, String> {
    Ok(format!("Hello, {}!", input.name))
}

let iii = register_worker("ws://localhost:49134", InitOptions::default());

// Preferred: RegisterFunction builder (auto-generates request/response schemas)
iii.register_function(
    RegisterFunction::new("greet", greet)
        .description("Greet someone by name"),
);

// Alternative: tuple form with RegisterFunctionMessage (lower-level)
iii.register_function((
    RegisterFunctionMessage::with_id("echo".to_string()),
    |input: Value| async move {
        Ok(json!({"echo": input}))
    },
));

register_service

Register a service with the engine. Signature
register_service(message: RegisterServiceMessage)

Parameters

NameTypeRequiredDescription
messageRegisterServiceMessageYesService registration message with id, name, and optional metadata.

register_trigger_type

Register a custom trigger type with the engine. Signature
register_trigger_type(id: impl Into<String>, description: impl Into<String>, handler: H)

Parameters

NameTypeRequiredDescription
idimpl Into<String>YesUnique trigger type identifier.
descriptionimpl Into<String>YesHuman-readable description.
handlerHYesHandler implementing [TriggerHandler].

unregister_trigger_type

Unregister a previously registered trigger type. Signature
unregister_trigger_type(id: impl Into<String>)

Parameters

NameTypeRequiredDescription
idimpl Into<String>Yes-

register_trigger

Bind a trigger configuration to a registered function. Signature
register_trigger(input: RegisterTriggerInput) -> Result<Trigger, IIIError>

Parameters

NameTypeRequiredDescription
inputRegisterTriggerInputYesTrigger registration input with trigger_type, function_id, and config.

Example

let trigger = iii.register_trigger(RegisterTriggerInput {
    trigger_type: "http".to_string(),
    function_id: "greet".to_string(),
    config: json!({ "api_path": "/greet", "http_method": "GET" }),
})?;
// Later...
trigger.unregister();

trigger

Invoke a remote function. The routing behavior depends on the action field of the request:
  • No action: synchronous — waits for the function to return.
  • [TriggerAction::Enqueue] - async via named queue.
  • [TriggerAction::Void] — fire-and-forget.
Signature
async trigger(request: impl Into<TriggerRequest>) -> Result<Value, IIIError>

Parameters

NameTypeRequiredDescription
requestimpl Into<TriggerRequest>Yes-

Example

// Synchronous
let result = iii.trigger(TriggerRequest {
    function_id: "greet".to_string(),
    payload: json!({"name": "World"}),
    action: None,
    timeout_ms: None,
}).await?;

// Fire-and-forget
iii.trigger(TriggerRequest {
    function_id: "notify".to_string(),
    payload: json!({}),
    action: Some(TriggerAction::Void),
    timeout_ms: None,
}).await?;

// Enqueue
let receipt = iii.trigger(TriggerRequest {
    function_id: "enqueue".to_string(),
    payload: json!({"topic": "test"}),
    action: Some(TriggerAction::Enqueue { queue: "test".to_string() }),
    timeout_ms: None,
}).await?;

get_connection_state

Get the current connection state. Signature
get_connection_state() -> IIIConnectionState

list_functions

List all registered functions from the engine Signature
async list_functions() -> Result<Vec<FunctionInfo>, IIIError>

on_functions_available

Subscribe to function availability events Returns a guard that will unsubscribe when dropped Signature
on_functions_available(callback: F) -> FunctionsAvailableGuard

Parameters

NameTypeRequiredDescription
callbackFYes-

list_workers

List all connected workers from the engine Signature
async list_workers() -> Result<Vec<WorkerInfo>, IIIError>

list_triggers

List all registered triggers from the engine Signature
async list_triggers(include_internal: bool) -> Result<Vec<TriggerInfo>, IIIError>

Parameters

NameTypeRequiredDescription
include_internalboolYes-

create_channel

Create a streaming channel pair for worker-to-worker data transfer. Returns a Channel with writer, reader, and their serializable refs that can be passed as fields in invocation data to other functions. Signature
async create_channel(buffer_size: Option<usize>) -> Result<Channel, IIIError>

Parameters

NameTypeRequiredDescription
buffer_sizeOption<usize>No-

Logger

Structured logger that emits logs as OpenTelemetry LogRecords. Every log call automatically captures the active trace and span context, correlating your logs with distributed traces without any manual wiring. When OTel is not initialized, Logger gracefully falls back to the tracing crate. Pass structured data as the second argument to any log method. Using a serde_json::Value object of key-value pairs (instead of string interpolation) lets you filter, aggregate, and build dashboards in your observability backend.

info

Log an info-level message. Signature
info(message: &str, data: Option<Value>)

Parameters

NameTypeRequiredDescription
message&strYesHuman-readable log message.
dataOption<Value>NoStructured context attached as OTel log attributes. Use serde_json::json! objects to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic).

Example

logger.info("Order processed", Some(json!({ "order_id": "ord_123", "status": "completed" })));

warn

Log a warning-level message. Signature
warn(message: &str, data: Option<Value>)

Parameters

NameTypeRequiredDescription
message&strYesHuman-readable log message.
dataOption<Value>NoStructured context attached as OTel log attributes. Use serde_json::json! objects to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic).

Example

logger.warn("Retry attempt", Some(json!({ "attempt": 3, "max_retries": 5, "endpoint": "/api/charge" })));

error

Log an error-level message. Signature
error(message: &str, data: Option<Value>)

Parameters

NameTypeRequiredDescription
message&strYesHuman-readable log message.
dataOption<Value>NoStructured context attached as OTel log attributes. Use serde_json::json! objects to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic).

Example

logger.error("Payment failed", Some(json!({ "order_id": "ord_123", "gateway": "stripe", "error_code": "card_declined" })));

debug

Log a debug-level message. Signature
debug(message: &str, data: Option<Value>)

Parameters

NameTypeRequiredDescription
message&strYesHuman-readable log message.
dataOption<Value>NoStructured context attached as OTel log attributes. Use serde_json::json! objects to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic).

Example

logger.debug("Cache lookup", Some(json!({ "key": "user:42", "hit": false })));

Types

RegisterFunction · iii_fn · iii_async_fn · InitOptions · IIIError · IIIConnectionState · TriggerRequest · TriggerAction · HttpInvocationConfig · HttpAuthConfig · HttpMethod · Channel · ChannelReader · ChannelWriter · ChannelDirection · StreamChannelRef · FunctionInfo · FunctionRef · TriggerInfo · WorkerInfo · WorkerMetadata · Trigger · RegisterFunctionMessage · RegisterServiceMessage · OtelConfig · ReconnectionConfig

RegisterFunction

One-step function registration combining ID, handler, and auto-generated schemas. Use RegisterFunction::new for sync functions or RegisterFunction::new_async for async functions, then pass to III::register. Constructors
NameDescription
new(id, f)Wrap a sync function. Each param must impl DeserializeOwned, return must be Result<R: Serialize, E: Display>.
new_async(id, f)Wrap an async function. Same constraints as new.
Builder methods
NameTypeDescription
description(desc)impl Into<String>Set the function description.
metadata(meta)ValueSet function metadata.
Arity rules
ArgsJSON input formatDeserialization
0ignorednone
1{"field": "value"}Entire JSON deserialized as the arg type (use #[derive(Deserialize, JsonSchema)] structs)
Auto-generated schemas: request_format and response_format are automatically populated as JSON Schema (draft-07) using schemars. Input and output types must derive schemars::JsonSchema alongside serde::Deserialize/serde::Serialize.

iii_fn

pub fn iii_fn<F, M>(f: F) -> IIIFn<F> Wraps a sync function into an III-compatible handler. The input type must implement DeserializeOwned + JsonSchema and the return type must implement Serialize + JsonSchema.
use iii_sdk::{iii_fn, RegisterFunction};
use serde::Deserialize;
use schemars::JsonSchema;

#[derive(Deserialize, JsonSchema)]
struct AddInput { a: i32, b: i32 }

fn add(input: AddInput) -> Result<i32, String> { Ok(input.a + input.b) }

iii.register_function(RegisterFunction::new("add", add));

iii_async_fn

pub fn iii_async_fn<F, M>(f: F) -> IIIAsyncFn<F> Wraps an async function into an III-compatible handler. Same semantics as iii_fn.
use iii_sdk::{iii_async_fn, RegisterFunction};

async fn fetch(url: String) -> Result<String, String> {
    Ok(format!("fetched {}", url))
}

iii.register_function(RegisterFunction::new_async("fetch", fetch));

InitOptions

Configuration options passed to [register_worker].
NameTypeRequiredDescription
metadataOption<WorkerMetadata>NoCustom worker metadata. Auto-detected if None.
otelOption<OtelConfig>NoOpenTelemetry configuration. Requires the otel feature.

IIIError

Errors returned by the III SDK.
NameTypeRequiredDescription
NotConnectedunitYes-
TimeoutunitYes-
Runtime(String)Yes-
Remote{ code: String, message: String, stacktrace: Option<String> }Yes-
Handler(String)Yes-
Serde(String)Yes-
WebSocket(String)Yes-

IIIConnectionState

Connection state for the III WebSocket client
NameTypeRequiredDescription
DisconnectedunitYes-
ConnectingunitYes-
ConnectedunitYes-
ReconnectingunitYes-
FailedunitYes-

TriggerRequest

Request object for trigger(). Matches the Node/Python SDK signature: trigger({ function_id, payload, action?, timeout_ms? })
NameTypeRequiredDescription
function_idStringYes-
payloadValueYes-
actionOption<TriggerAction>No-
timeout_msOption<u64>No-

TriggerAction

Routing action for [TriggerRequest]. Determines how the engine handles the invocation.
  • Enqueue — Routes through a named queue for async processing.
  • Void — Fire-and-forget, no response.
NameTypeRequiredDescription
Enqueue{ queue: String }YesRoutes the invocation through a named queue.
VoidunitYesFire-and-forget routing.

HttpInvocationConfig

Configuration for registering an HTTP-invoked function (Lambda, Cloudflare Workers, etc.) instead of a local handler.
NameTypeRequiredDescription
urlStringYes-
methodHttpMethodYes-
timeout_msOption<u64>No-
headersHashMap<String, String>Yes-
authOption<HttpAuthConfig>No-

HttpAuthConfig

Authentication configuration for HTTP-invoked functions.
  • Hmac — HMAC signature verification using a shared secret.
  • Bearer — Bearer token authentication.
  • ApiKey — API key sent via a custom header.
NameTypeRequiredDescription
Hmac{ secret_key: String }Yes-
Bearer{ token_key: String }Yes-
ApiKey{ header: String, value_key: String }Yes-

HttpMethod

NameTypeRequiredDescription
GetunitYes-
PostunitYes-
PutunitYes-
PatchunitYes-
DeleteunitYes-

Channel

A streaming channel pair for worker-to-worker data transfer.
NameTypeRequiredDescription
writerChannelWriterYes-
readerChannelReaderYes-
writer_refStreamChannelRefYes-
reader_refStreamChannelRefYes-

ChannelReader

WebSocket-backed reader for streaming binary data and text messages.
NameTypeRequiredDescription
on_messageasync fn(callback: F)YesRegister a callback for text messages received on this channel.
next_binaryasync fn() -> Result<Option<Vec<u8>>, IIIError>YesRead the next binary chunk from the channel.
Text messages are dispatched to registered callbacks.
Returns None when the stream is closed.
read_allasync fn() -> Result<Vec<u8>, IIIError>YesRead the entire stream into a single Vec<u8>.
closeasync fn() -> Result<(), IIIError>Yes-

ChannelWriter

WebSocket-backed writer for streaming binary data and text messages.
NameTypeRequiredDescription
writeasync fn(data: &[u8]) -> Result<(), IIIError>Yes-
send_messageasync fn(msg: &str) -> Result<(), IIIError>Yes-
closeasync fn() -> Result<(), IIIError>Yes-

ChannelDirection

NameTypeRequiredDescription
ReadunitYes-
WriteunitYes-

StreamChannelRef

NameTypeRequiredDescription
channel_idStringYes-
access_keyStringYes-
directionChannelDirectionYes-

FunctionInfo

Function information returned by engine::functions::list. The engine auto-generates standard JSON Schema for request_format and response_format from Rust types using schemars.
NameTypeRequiredDescription
function_idStringYesUnique function identifier.
descriptionOption<String>NoHuman-readable description.
request_formatOption<Value>NoJSON Schema describing expected input. Auto-generated for functions registered via the #[service] macro.
response_formatOption<Value>NoJSON Schema describing expected output. Auto-generated for functions registered via the #[service] macro.
metadataOption<Value>NoArbitrary metadata attached to the function.

FunctionRef

NameTypeRequiredDescription
idStringYes-
unregisterfn()Yes-

TriggerInfo

Trigger information returned by engine::triggers::list
NameTypeRequiredDescription
idStringYes-
trigger_typeStringYes-
function_idStringYes-
configValueYes-

WorkerInfo

Worker information returned by engine::workers::list
NameTypeRequiredDescription
idStringYes-
nameOption<String>No-
runtimeOption<String>No-
versionOption<String>No-
osOption<String>No-
ip_addressOption<String>No-
statusStringYes-
connected_at_msu64Yes-
function_countusizeYes-
functionsVec<String>Yes-
active_invocationsusizeYes-

WorkerMetadata

Worker metadata for auto-registration
NameTypeRequiredDescription
runtimeStringYes-
versionStringYes-
nameStringYes-
osStringYes-
pidOption<u32>No-
telemetryOption<WorkerTelemetryMeta>No-

Trigger

Handle returned by III::register_trigger. Call unregister to remove the trigger from the engine.
NameTypeRequiredDescription
unregisterfn()YesRemove this trigger from the engine.

RegisterFunctionMessage

NameTypeRequiredDescription
idStringYesUnique function identifier.
descriptionOption<String>NoHuman-readable description.
request_formatOption<Value>NoJSON Schema describing expected input.
response_formatOption<Value>NoJSON Schema describing expected output.
metadataOption<Value>NoArbitrary metadata attached to the function.
invocationOption<HttpInvocationConfig>NoHTTP invocation config for externally hosted functions.
to_messagefn() -> MessageYesConvert to protocol message.

RegisterServiceMessage

NameTypeRequiredDescription
idStringYes-
nameStringYes-
descriptionOption<String>No-
parent_service_idOption<String>No-
to_messagefn() -> MessageYes-

OtelConfig

Configuration for OpenTelemetry initialization
NameTypeRequiredDescription
enabledOption<bool>No-
service_nameOption<String>No-
service_versionOption<String>No-
service_namespaceOption<String>No-
service_instance_idOption<String>No-
engine_ws_urlOption<String>No-
metrics_enabledOption<bool>No-
metrics_export_interval_msOption<u64>No-
reconnection_configOption<ReconnectionConfig>No-
shutdown_timeout_msOption<u64>NoTimeout in milliseconds for the shutdown sequence (default: 10,000)
channel_capacityOption<usize>NoCapacity of the internal telemetry message channel (default: 10,000).
This controls the in-flight message buffer between exporters and the
WebSocket connection loop. Intentionally larger than
ReconnectionConfig::max_pending_messages to absorb bursts during
normal operation while limiting stale data across reconnects.
logs_enabledOption<bool>NoWhether to enable the log exporter (default: true)
logs_flush_interval_msOption<u64>NoLog processor flush delay in milliseconds. Defaults to 100ms when not set.
logs_batch_sizeOption<usize>NoMaximum number of log records exported per batch. Defaults to 1 when not set.
fetch_instrumentation_enabledOption<bool>NoWhether to auto-instrument outgoing HTTP calls.
When Some(true) (default), execute_traced_request() can be used to
create CLIENT spans for reqwest requests. Set Some(false) to opt out.
None is treated as true.

ReconnectionConfig

Configuration for WebSocket reconnection behavior
NameTypeRequiredDescription
initial_delay_msu64Yes-
max_delay_msu64Yes-
backoff_multiplierf64Yes-
jitter_factorf64Yes-
max_retriesOption<u64>No-
max_pending_messagesusizeYesMaximum messages preserved across reconnects. Messages beyond this limit
are dropped to prevent delivering stale data after a long disconnect.
This is intentionally smaller than OtelConfig::channel_capacity (the
in-flight buffer between exporters and the WebSocket loop).
effective_initial_delay_msfn() -> u64YesReturns initial_delay_ms, clamped to a minimum of 1ms to prevent division by zero.