Skip to content
fusion-ai-host

The dispatcher

@tikab-interactive/fusion-ai-host/server is the VM-side glue: an in-memory HostDispatcher that routes a tool call to a connected device's stream and awaits its result. It is schema-agnostic — the consuming app owns the device-registry table and its migration (the same rule as fusion-db).

import { hostDispatcher } from "@tikab-interactive/fusion-ai-host/server";

hostDispatcher is a process-wide singleton; import it from the route handlers that serve the device stream and the result endpoint, and from the server function that dispatches.

Loading diagram...

API

MethodUse
connect(deviceId, sink)Register a device's SSE sink (in the stream route). Returns an unregister fn.
dispatch(deviceId, call, ms?)Push a call to a device and await its HostResult. Resolves (never rejects) with device offline / device timed out.
deliver(result)Resolve the pending call a device's result POST corresponds to. Returns false if unknown.
isOnline(deviceId)Whether a device holds a live stream on this process.
onlineDevices()The ids with a live stream.
drop(deviceId)Forget a device's connection (on revoke).

The wire protocol is shared with the device half:

/** A tool call dispatched from the server down to a device. */
export interface HostCall {
	/** Correlates the dispatch with its result. Generated server-side per call. */
	callId: string;
	/** The tool name to run on the device. */
	tool: string;
	/** The tool input (validated against the device-side zod schema before execution). */
	input: unknown;
	/**
	 * Two-phase control for mutating tools. Absent (or "auto") = a read/action runs, a mutation
	 * returns its proposal (dry-run). "apply" = run the mutation for real (sent by the server only
	 * after the proposal was approved, with the SERVER-stored input). Reads ignore it.
	 */
	mode?: "auto" | "apply";
}
 
/** A device's reply to a {@link HostCall}. */
export interface HostResult {
	callId: string;
	ok: boolean;
	/** Present when `ok` — JSON-serialisable tool output, OR a {@link Proposal} when `proposal`. */
	result?: unknown;
	/** True when `result` is a dry-run proposal (a mutating tool awaiting approval), not the effect. */
	proposal?: boolean;
	/** Present when `!ok` — a human-readable error (unknown tool, scope violation, throw, …). */
	error?: string;
}

Wiring it into an app

The app supplies the three seams the dispatcher needs — a stream route, a result route, and a dispatch call — plus its own device-registry table:

api/host/stream — the device subscribes here
const device = await authDevice(request); // your Bearer-token check
const stream = new ReadableStream({
  start(controller) {
    const send = (event: string, data: string) =>
      controller.enqueue(encode(`event: ${event}\ndata: ${data}\n\n`));
    const off = hostDispatcher.connect(device.id, send);
    // … keep-alive pings; call off() on cancel
  },
});
return new Response(stream, { headers: { "content-type": "text/event-stream" } });
api/host/result — the device posts results here
hostDispatcher.deliver(await request.json());
a dispatch call — reads return; mutations return a proposal
const result = await hostDispatcher.dispatch(deviceId, { callId, tool, input });
if (result.proposal) {
  /* store it, surface the diff, and re-dispatch with mode:"apply" on approval */
}

Single-replica caveat

The dispatcher's connection registry + pending-call map are in-memory. A call dispatched on one app replica can't reach a device whose SSE stream lives on another, so a deployment using it must run a single replica (or pin sticky routing). Moving the registry behind Centrifugo — so server→device push and the result path survive multiple replicas — is the documented end state.