diff --git a/dev/web/discovery.mjs b/dev/web/discovery.mjs index 9ab84a6..1efc937 100644 --- a/dev/web/discovery.mjs +++ b/dev/web/discovery.mjs @@ -26,25 +26,29 @@ const ANN_FIXED_SIZE = 8; * Returns a stop() function. Call it when done (e.g. user closed the picker * or selected a node). Safe to call multiple times. */ -export function start_discovery(on_peer) { +export function start_discovery(on_peer, on_error) { const sock = dgram.createSocket({ type: 'udp4', reuseAddr: true }); const seen = new Set(); let closed = false; - sock.on('error', () => stop()); + sock.on('error', err => { + console.error('discovery socket error:', err.message); + if (on_error) { on_error(err); } + stop(); + }); sock.on('message', (msg, rinfo) => { if (msg.length < HEADER_SIZE) { return; } - const msg_type = msg.readUInt16BE(0); - const payload_len = msg.readUInt32BE(2); + const msg_type = msg.readUInt16LE(0); + const payload_len = msg.readUInt32LE(2); if (msg_type !== ANNOUNCE_TYPE) { return; } if (msg.length < HEADER_SIZE + payload_len) { return; } if (payload_len < ANN_FIXED_SIZE) { return; } const p = msg.slice(HEADER_SIZE); - const site_id = p.readUInt16BE(1); - const tcp_port = p.readUInt16BE(3); - const func_flags = p.readUInt16BE(5); + const site_id = p.readUInt16LE(1); + const tcp_port = p.readUInt16LE(3); + const func_flags = p.readUInt16LE(5); const name_len = p.readUInt8(7); if (payload_len < ANN_FIXED_SIZE + name_len) { return; } const name = p.toString('utf8', 8, 8 + name_len); @@ -74,12 +78,12 @@ function send_announce(sock) { const name = Buffer.from('web-inspector', 'utf8'); const payload_len = 8 + name.length; const buf = Buffer.allocUnsafe(6 + payload_len); - buf.writeUInt16BE(ANNOUNCE_TYPE, 0); - buf.writeUInt32BE(payload_len, 2); + buf.writeUInt16LE(ANNOUNCE_TYPE, 0); + buf.writeUInt32LE(payload_len, 2); buf.writeUInt8(1, 6); /* protocol_version */ - buf.writeUInt16BE(0, 7); /* site_id */ - buf.writeUInt16BE(0, 9); /* tcp_port (0 = no server) */ - buf.writeUInt16BE(0, 11); /* function_flags */ + buf.writeUInt16LE(0, 7); /* site_id */ + buf.writeUInt16LE(0, 9); /* tcp_port (0 = no server) */ + buf.writeUInt16LE(0, 11); /* function_flags */ buf.writeUInt8(name.length, 13); name.copy(buf, 14); sock.send(buf, 0, buf.length, DISCOVERY_PORT, MULTICAST_GROUP); diff --git a/dev/web/node_client.mjs b/dev/web/node_client.mjs index ea1107b..fca375a 100644 --- a/dev/web/node_client.mjs +++ b/dev/web/node_client.mjs @@ -33,9 +33,11 @@ export class Node_Client extends EventEmitter { this.port = port; return new Promise((resolve, reject) => { + let connected = false; const sock = net.createConnection({ host, port }); sock.once('connect', () => { + connected = true; this._socket = sock; this.connected = true; this._buf = Buffer.alloc(0); @@ -43,9 +45,11 @@ export class Node_Client extends EventEmitter { resolve(); }); - sock.once('error', err => { - if (!this.connected) { reject(err); } - else { this.emit('error', err); } + /* Single persistent error handler — errors after connect are + * non-fatal here; the 'close' event fires next and handles cleanup. */ + sock.on('error', err => { + if (!connected) { reject(err); } + else { console.error('video-node socket error:', err.message); } }); sock.on('data', chunk => this._on_data(chunk)); @@ -53,7 +57,6 @@ export class Node_Client extends EventEmitter { sock.on('close', () => { this.connected = false; this._socket = null; - /* reject all pending requests */ for (const { reject: rej } of this._pending.values()) { rej(new Error('disconnected')); } @@ -73,8 +76,8 @@ export class Node_Client extends EventEmitter { this._buf = Buffer.concat([this._buf, chunk]); while (true) { if (this._buf.length < FRAME_HEADER_SIZE) { break; } - const msg_type = this._buf.readUInt16BE(0); - const payload_len = this._buf.readUInt32BE(2); + const msg_type = this._buf.readUInt16LE(0); + const payload_len = this._buf.readUInt32LE(2); if (this._buf.length < FRAME_HEADER_SIZE + payload_len) { break; } const payload = this._buf.slice(FRAME_HEADER_SIZE, FRAME_HEADER_SIZE + payload_len); this._buf = this._buf.slice(FRAME_HEADER_SIZE + payload_len); @@ -85,7 +88,7 @@ export class Node_Client extends EventEmitter { _on_frame(msg_type, payload) { if (msg_type !== MSG_CONTROL_RESPONSE) { return; } if (payload.length < 4) { return; } - const request_id = payload.readUInt16BE(0); + const request_id = payload.readUInt16LE(0); const entry = this._pending.get(request_id); if (!entry) { return; } this._pending.delete(request_id); @@ -100,7 +103,7 @@ export class Node_Client extends EventEmitter { if (!this.connected) { return Promise.reject(new Error('not connected')); } const id = this._alloc_id(); /* patch request_id into the frame payload (bytes 6-7) */ - frame.writeUInt16BE(id, FRAME_HEADER_SIZE); + frame.writeUInt16LE(id, FRAME_HEADER_SIZE); return new Promise((resolve, reject) => { this._pending.set(id, { resolve, reject, decode: decode_fn }); this._socket.write(frame, err => { diff --git a/dev/web/protocol.mjs b/dev/web/protocol.mjs index e6019f8..4239baf 100644 --- a/dev/web/protocol.mjs +++ b/dev/web/protocol.mjs @@ -35,9 +35,10 @@ function read_str8(buf, offset) { } function read_i64(buf, offset) { - /* Returns a JS number — safe for the int32 values V4L2 uses in practice. */ - const hi = buf.readInt32BE(offset); - const lo = buf.readUInt32BE(offset + 4); + /* Little-endian i64: low 4 bytes first, high 4 bytes second. + * Returns a JS number — safe for the int32 values V4L2 uses in practice. */ + const lo = buf.readUInt32LE(offset); + const hi = buf.readInt32LE(offset + 4); return hi * 0x100000000 + lo; } @@ -47,8 +48,8 @@ function read_i64(buf, offset) { export function build_frame(msg_type, payload) { const frame = Buffer.allocUnsafe(FRAME_HEADER_SIZE + payload.length); - frame.writeUInt16BE(msg_type, 0); - frame.writeUInt32BE(payload.length, 2); + frame.writeUInt16LE(msg_type, 0); + frame.writeUInt32LE(payload.length, 2); payload.copy(frame, FRAME_HEADER_SIZE); return frame; } @@ -60,36 +61,35 @@ export function build_frame(msg_type, payload) { export function encode_enum_devices(request_id) { const p = Buffer.allocUnsafe(4); - p.writeUInt16BE(request_id, 0); - p.writeUInt16BE(CMD_ENUM_DEVICES, 2); + p.writeUInt16LE(request_id, 0); + p.writeUInt16LE(CMD_ENUM_DEVICES, 2); return build_frame(MSG_CONTROL_REQUEST, p); } export function encode_enum_controls(request_id, device_index) { const p = Buffer.allocUnsafe(6); - p.writeUInt16BE(request_id, 0); - p.writeUInt16BE(CMD_ENUM_CONTROLS, 2); - p.writeUInt16BE(device_index, 4); + p.writeUInt16LE(request_id, 0); + p.writeUInt16LE(CMD_ENUM_CONTROLS, 2); + p.writeUInt16LE(device_index, 4); return build_frame(MSG_CONTROL_REQUEST, p); } export function encode_get_control(request_id, device_index, control_id) { const p = Buffer.allocUnsafe(10); - p.writeUInt16BE(request_id, 0); - p.writeUInt16BE(CMD_GET_CONTROL, 2); - p.writeUInt16BE(device_index, 4); - p.writeUInt32BE(control_id, 6); + p.writeUInt16LE(request_id, 0); + p.writeUInt16LE(CMD_GET_CONTROL, 2); + p.writeUInt16LE(device_index, 4); + p.writeUInt32LE(control_id, 6); return build_frame(MSG_CONTROL_REQUEST, p); } export function encode_set_control(request_id, device_index, control_id, value) { const p = Buffer.allocUnsafe(14); - p.writeUInt16BE(request_id, 0); - p.writeUInt16BE(CMD_SET_CONTROL, 2); - p.writeUInt16BE(device_index, 4); - p.writeUInt16BE(0, 6); - p.writeUInt32BE(control_id, 6); - p.writeInt32BE(value, 10); + p.writeUInt16LE(request_id, 0); + p.writeUInt16LE(CMD_SET_CONTROL, 2); + p.writeUInt16LE(device_index, 4); + p.writeUInt32LE(control_id, 6); + p.writeInt32LE(value, 10); return build_frame(MSG_CONTROL_REQUEST, p); } @@ -100,8 +100,8 @@ export function encode_set_control(request_id, device_index, control_id, value) export function decode_response_header(payload) { return { - request_id: payload.readUInt16BE(0), - status: payload.readUInt16BE(2), + request_id: payload.readUInt16LE(0), + status: payload.readUInt16LE(2), }; } @@ -110,7 +110,7 @@ export function decode_enum_devices_response(payload) { if (hdr.status !== STATUS_OK) { return { ...hdr, media: [], standalone: [] }; } let pos = 4; - const media_count = payload.readUInt16BE(pos); pos += 2; + const media_count = payload.readUInt16LE(pos); pos += 2; const media = []; for (let i = 0; i < media_count; i++) { @@ -125,9 +125,9 @@ export function decode_enum_devices_response(payload) { for (let j = 0; j < vcount; j++) { s = read_str8(payload, pos); pos += s.size; const vpath = s.value; s = read_str8(payload, pos); pos += s.size; const entity_name = s.value; - const entity_type = payload.readUInt32BE(pos); pos += 4; - const entity_flags = payload.readUInt32BE(pos); pos += 4; - const device_caps = payload.readUInt32BE(pos); pos += 4; + const entity_type = payload.readUInt32LE(pos); pos += 4; + const entity_flags = payload.readUInt32LE(pos); pos += 4; + const device_caps = payload.readUInt32LE(pos); pos += 4; const pad_flags = payload.readUInt8(pos); pos++; const is_capture = payload.readUInt8(pos); pos++; video_nodes.push({ path: vpath, entity_name, entity_type, @@ -137,7 +137,7 @@ export function decode_enum_devices_response(payload) { media.push({ path, driver, model, bus_info, video_nodes }); } - const standalone_count = payload.readUInt16BE(pos); pos += 2; + const standalone_count = payload.readUInt16LE(pos); pos += 2; const standalone = []; for (let i = 0; i < standalone_count; i++) { let s; @@ -154,25 +154,25 @@ export function decode_enum_controls_response(payload) { if (hdr.status !== STATUS_OK) { return { ...hdr, controls: [] }; } let pos = 4; - const count = payload.readUInt16BE(pos); pos += 2; + const count = payload.readUInt16LE(pos); pos += 2; const controls = []; for (let i = 0; i < count; i++) { - const id = payload.readUInt32BE(pos); pos += 4; + const id = payload.readUInt32LE(pos); pos += 4; const type = payload.readUInt8(pos); pos++; - const flags = payload.readUInt32BE(pos); pos += 4; + const flags = payload.readUInt32LE(pos); pos += 4; const s = read_str8(payload, pos); pos += s.size; const name = s.value; - const min = payload.readInt32BE(pos); pos += 4; - const max = payload.readInt32BE(pos); pos += 4; - const step = payload.readInt32BE(pos); pos += 4; - const def = payload.readInt32BE(pos); pos += 4; - const cur = payload.readInt32BE(pos); pos += 4; + const min = payload.readInt32LE(pos); pos += 4; + const max = payload.readInt32LE(pos); pos += 4; + const step = payload.readInt32LE(pos); pos += 4; + const def = payload.readInt32LE(pos); pos += 4; + const cur = payload.readInt32LE(pos); pos += 4; const menu_count = payload.readUInt8(pos); pos++; const menu_items = []; for (let j = 0; j < menu_count; j++) { - const midx = payload.readUInt32BE(pos); pos += 4; + const midx = payload.readUInt32LE(pos); pos += 4; const ms = read_str8(payload, pos); pos += ms.size; const mval = read_i64(payload, pos); pos += 8; menu_items.push({ index: midx, name: ms.value, int_value: mval }); @@ -187,7 +187,7 @@ export function decode_enum_controls_response(payload) { export function decode_get_control_response(payload) { const hdr = decode_response_header(payload); - const value = (hdr.status === STATUS_OK) ? payload.readInt32BE(4) : 0; + const value = (hdr.status === STATUS_OK) ? payload.readInt32LE(4) : 0; return { ...hdr, value }; } diff --git a/dev/web/server.mjs b/dev/web/server.mjs index 94afd05..9ec9d06 100644 --- a/dev/web/server.mjs +++ b/dev/web/server.mjs @@ -2,14 +2,15 @@ * Express 5 web server — REST bridge to video-node. * * Usage: - * node server.mjs [--host IP] [--port PORT] [--discover] [--listen PORT] + * node server.mjs [--host IP] [--port PORT] [--listen PORT] * - * If --host and --port are given, connects on startup. - * If --discover is given, runs UDP discovery to find the node automatically. - * Otherwise, call POST /api/connect to connect at runtime. + * Discovery runs at startup unconditionally — the server joins the multicast + * group and sends an announcement immediately so nodes respond right away. + * The /api/discover SSE endpoint subscribes to the running peer feed. */ import express from 'express'; +import { EventEmitter } from 'node:events'; import { fileURLToPath } from 'node:url'; import path from 'node:path'; import { Node_Client } from './node_client.mjs'; @@ -25,10 +26,29 @@ const arg = name => { return i >= 0 ? args[i + 1] : null; }; -const opt_host = arg('--host'); -const opt_port = arg('--port') ? parseInt(arg('--port')) : null; -const opt_discover = args.includes('--discover'); -const listen_port = arg('--listen') ? parseInt(arg('--listen')) : 3000; +const opt_host = arg('--host'); +const opt_port = arg('--port') ? parseInt(arg('--port')) : null; +const listen_port = arg('--listen') ? parseInt(arg('--listen')) : 3000; + +/* ------------------------------------------------------------------------- + * Persistent discovery — runs from startup, feeds the SSE endpoint + * ------------------------------------------------------------------------- */ + +const peer_bus = new EventEmitter(); +const known_peers = new Map(); /* key: addr:name -> peer */ + +start_discovery( + peer => { + const key = `${peer.addr}:${peer.name}`; + known_peers.set(key, peer); + peer_bus.emit('peer', peer); + console.log(`discovered: ${peer.name} at ${peer.addr}:${peer.tcp_port}`); + }, + err => { + console.error('discovery error:', err.message); + peer_bus.emit('discovery_error', err); + }, +); /* ------------------------------------------------------------------------- * Node client (singleton) @@ -75,9 +95,14 @@ app.post('/api/connect', async (req, res) => { } }); +app.post('/api/disconnect', (_req, res) => { + client.disconnect(); + res.json({ ok: true }); +}); + /* - * SSE stream: each discovered peer arrives as a JSON event. - * The client closes the connection when done (user picked a node or dismissed). + * SSE stream — immediately replays already-known peers, then streams new ones + * as they arrive. Client closes when done (user picked a node or dismissed). */ app.get('/api/discover', (req, res) => { res.setHeader('Content-Type', 'text/event-stream'); @@ -85,16 +110,19 @@ app.get('/api/discover', (req, res) => { res.setHeader('Connection', 'keep-alive'); res.flushHeaders(); - const stop = start_discovery(peer => { - res.write(`data: ${JSON.stringify(peer)}\n\n`); + const send_peer = peer => res.write(`data: ${JSON.stringify(peer)}\n\n`); + const send_error = err => res.write(`event: discovery_error\ndata: ${JSON.stringify({ error: err.message })}\n\n`); + + /* replay peers already seen before this SSE was opened */ + for (const peer of known_peers.values()) { send_peer(peer); } + + peer_bus.on('peer', send_peer); + peer_bus.on('discovery_error', send_error); + + req.on('close', () => { + peer_bus.off('peer', send_peer); + peer_bus.off('discovery_error', send_error); }); - - req.on('close', stop); -}); - -app.post('/api/disconnect', (_req, res) => { - client.disconnect(); - res.json({ ok: true }); }); /* -- Devices --------------------------------------------------------------- */ @@ -158,16 +186,7 @@ app.listen(listen_port, () => { console.log(`listening on http://localhost:${listen_port}`); }); -if (opt_discover) { - console.log('discovering video-nodes...'); - const stop = start_discovery(peer => { - console.log(`found ${peer.name} at ${peer.addr}:${peer.tcp_port} — connecting`); - stop(); - client.connect(peer.addr, peer.tcp_port) - .then(() => console.log('connected')) - .catch(err => console.error('connect failed:', err.message)); - }); -} else if (opt_host && opt_port) { +if (opt_host && opt_port) { client.connect(opt_host, opt_port) .then(() => console.log(`connected to ${opt_host}:${opt_port}`)) .catch(err => console.error('connect failed:', err.message));