The dispatcher
@tikab-interactive/fusion-ai-host/server is the VM-side glue: an in-memory HostDispatcher that
routes a tool call to a connected device's stream and awaits its result. It is schema-agnostic — the
consuming app owns the device-registry table and its migration (the same rule as
fusion-db).
import { hostDispatcher } from "@tikab-interactive/fusion-ai-host/server";hostDispatcher is a process-wide singleton; import it from the route handlers that serve the device
stream and the result endpoint, and from the server function that dispatches.
API
| Method | Use |
|---|---|
connect(deviceId, sink) | Register a device's SSE sink (in the stream route). Returns an unregister fn. |
dispatch(deviceId, call, ms?) | Push a call to a device and await its HostResult. Resolves (never rejects) with device offline / device timed out. |
deliver(result) | Resolve the pending call a device's result POST corresponds to. Returns false if unknown. |
isOnline(deviceId) | Whether a device holds a live stream on this process. |
onlineDevices() | The ids with a live stream. |
drop(deviceId) | Forget a device's connection (on revoke). |
The wire protocol is shared with the device half:
/** A tool call dispatched from the server down to a device. */
export interface HostCall {
/** Correlates the dispatch with its result. Generated server-side per call. */
callId: string;
/** The tool name to run on the device. */
tool: string;
/** The tool input (validated against the device-side zod schema before execution). */
input: unknown;
/**
* Two-phase control for mutating tools. Absent (or "auto") = a read/action runs, a mutation
* returns its proposal (dry-run). "apply" = run the mutation for real (sent by the server only
* after the proposal was approved, with the SERVER-stored input). Reads ignore it.
*/
mode?: "auto" | "apply";
}
/** A device's reply to a {@link HostCall}. */
export interface HostResult {
callId: string;
ok: boolean;
/** Present when `ok` — JSON-serialisable tool output, OR a {@link Proposal} when `proposal`. */
result?: unknown;
/** True when `result` is a dry-run proposal (a mutating tool awaiting approval), not the effect. */
proposal?: boolean;
/** Present when `!ok` — a human-readable error (unknown tool, scope violation, throw, …). */
error?: string;
}Wiring it into an app
The app supplies the three seams the dispatcher needs — a stream route, a result route, and a dispatch call — plus its own device-registry table:
const device = await authDevice(request); // your Bearer-token check
const stream = new ReadableStream({
start(controller) {
const send = (event: string, data: string) =>
controller.enqueue(encode(`event: ${event}\ndata: ${data}\n\n`));
const off = hostDispatcher.connect(device.id, send);
// … keep-alive pings; call off() on cancel
},
});
return new Response(stream, { headers: { "content-type": "text/event-stream" } });hostDispatcher.deliver(await request.json());const result = await hostDispatcher.dispatch(deviceId, { callId, tool, input });
if (result.proposal) {
/* store it, surface the diff, and re-dispatch with mode:"apply" on approval */
}Single-replica caveat
The dispatcher's connection registry + pending-call map are in-memory. A call dispatched on one app replica can't reach a device whose SSE stream lives on another, so a deployment using it must run a single replica (or pin sticky routing). Moving the registry behind Centrifugo — so server→device push and the result path survive multiple replicas — is the documented end state.