Fix web inspector: endianness (LE), ECONNRESET, persistent discovery

- protocol.mjs: all reads/writes switched to LE to match serial.h
- node_client.mjs: persistent error handler prevents ECONNRESET crash
- discovery.mjs: remove unnecessary SO_REUSEPORT
- server.mjs: discovery runs at startup (not per SSE open); uses
  EventEmitter + known_peers Map so SSE replays existing peers on connect

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-27 01:45:33 +00:00
parent 62c25247ef
commit e1151410ad
4 changed files with 112 additions and 86 deletions

View File

@@ -26,25 +26,29 @@ const ANN_FIXED_SIZE = 8;
* Returns a stop() function. Call it when done (e.g. user closed the picker * Returns a stop() function. Call it when done (e.g. user closed the picker
* or selected a node). Safe to call multiple times. * or selected a node). Safe to call multiple times.
*/ */
export function start_discovery(on_peer) { export function start_discovery(on_peer, on_error) {
const sock = dgram.createSocket({ type: 'udp4', reuseAddr: true }); const sock = dgram.createSocket({ type: 'udp4', reuseAddr: true });
const seen = new Set(); const seen = new Set();
let closed = false; let closed = false;
sock.on('error', () => stop()); sock.on('error', err => {
console.error('discovery socket error:', err.message);
if (on_error) { on_error(err); }
stop();
});
sock.on('message', (msg, rinfo) => { sock.on('message', (msg, rinfo) => {
if (msg.length < HEADER_SIZE) { return; } if (msg.length < HEADER_SIZE) { return; }
const msg_type = msg.readUInt16BE(0); const msg_type = msg.readUInt16LE(0);
const payload_len = msg.readUInt32BE(2); const payload_len = msg.readUInt32LE(2);
if (msg_type !== ANNOUNCE_TYPE) { return; } if (msg_type !== ANNOUNCE_TYPE) { return; }
if (msg.length < HEADER_SIZE + payload_len) { return; } if (msg.length < HEADER_SIZE + payload_len) { return; }
if (payload_len < ANN_FIXED_SIZE) { return; } if (payload_len < ANN_FIXED_SIZE) { return; }
const p = msg.slice(HEADER_SIZE); const p = msg.slice(HEADER_SIZE);
const site_id = p.readUInt16BE(1); const site_id = p.readUInt16LE(1);
const tcp_port = p.readUInt16BE(3); const tcp_port = p.readUInt16LE(3);
const func_flags = p.readUInt16BE(5); const func_flags = p.readUInt16LE(5);
const name_len = p.readUInt8(7); const name_len = p.readUInt8(7);
if (payload_len < ANN_FIXED_SIZE + name_len) { return; } if (payload_len < ANN_FIXED_SIZE + name_len) { return; }
const name = p.toString('utf8', 8, 8 + name_len); const name = p.toString('utf8', 8, 8 + name_len);
@@ -74,12 +78,12 @@ function send_announce(sock) {
const name = Buffer.from('web-inspector', 'utf8'); const name = Buffer.from('web-inspector', 'utf8');
const payload_len = 8 + name.length; const payload_len = 8 + name.length;
const buf = Buffer.allocUnsafe(6 + payload_len); const buf = Buffer.allocUnsafe(6 + payload_len);
buf.writeUInt16BE(ANNOUNCE_TYPE, 0); buf.writeUInt16LE(ANNOUNCE_TYPE, 0);
buf.writeUInt32BE(payload_len, 2); buf.writeUInt32LE(payload_len, 2);
buf.writeUInt8(1, 6); /* protocol_version */ buf.writeUInt8(1, 6); /* protocol_version */
buf.writeUInt16BE(0, 7); /* site_id */ buf.writeUInt16LE(0, 7); /* site_id */
buf.writeUInt16BE(0, 9); /* tcp_port (0 = no server) */ buf.writeUInt16LE(0, 9); /* tcp_port (0 = no server) */
buf.writeUInt16BE(0, 11); /* function_flags */ buf.writeUInt16LE(0, 11); /* function_flags */
buf.writeUInt8(name.length, 13); buf.writeUInt8(name.length, 13);
name.copy(buf, 14); name.copy(buf, 14);
sock.send(buf, 0, buf.length, DISCOVERY_PORT, MULTICAST_GROUP); sock.send(buf, 0, buf.length, DISCOVERY_PORT, MULTICAST_GROUP);

View File

@@ -33,9 +33,11 @@ export class Node_Client extends EventEmitter {
this.port = port; this.port = port;
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let connected = false;
const sock = net.createConnection({ host, port }); const sock = net.createConnection({ host, port });
sock.once('connect', () => { sock.once('connect', () => {
connected = true;
this._socket = sock; this._socket = sock;
this.connected = true; this.connected = true;
this._buf = Buffer.alloc(0); this._buf = Buffer.alloc(0);
@@ -43,9 +45,11 @@ export class Node_Client extends EventEmitter {
resolve(); resolve();
}); });
sock.once('error', err => { /* Single persistent error handler — errors after connect are
if (!this.connected) { reject(err); } * non-fatal here; the 'close' event fires next and handles cleanup. */
else { this.emit('error', err); } sock.on('error', err => {
if (!connected) { reject(err); }
else { console.error('video-node socket error:', err.message); }
}); });
sock.on('data', chunk => this._on_data(chunk)); sock.on('data', chunk => this._on_data(chunk));
@@ -53,7 +57,6 @@ export class Node_Client extends EventEmitter {
sock.on('close', () => { sock.on('close', () => {
this.connected = false; this.connected = false;
this._socket = null; this._socket = null;
/* reject all pending requests */
for (const { reject: rej } of this._pending.values()) { for (const { reject: rej } of this._pending.values()) {
rej(new Error('disconnected')); rej(new Error('disconnected'));
} }
@@ -73,8 +76,8 @@ export class Node_Client extends EventEmitter {
this._buf = Buffer.concat([this._buf, chunk]); this._buf = Buffer.concat([this._buf, chunk]);
while (true) { while (true) {
if (this._buf.length < FRAME_HEADER_SIZE) { break; } if (this._buf.length < FRAME_HEADER_SIZE) { break; }
const msg_type = this._buf.readUInt16BE(0); const msg_type = this._buf.readUInt16LE(0);
const payload_len = this._buf.readUInt32BE(2); const payload_len = this._buf.readUInt32LE(2);
if (this._buf.length < FRAME_HEADER_SIZE + payload_len) { break; } if (this._buf.length < FRAME_HEADER_SIZE + payload_len) { break; }
const payload = this._buf.slice(FRAME_HEADER_SIZE, FRAME_HEADER_SIZE + payload_len); const payload = this._buf.slice(FRAME_HEADER_SIZE, FRAME_HEADER_SIZE + payload_len);
this._buf = this._buf.slice(FRAME_HEADER_SIZE + payload_len); this._buf = this._buf.slice(FRAME_HEADER_SIZE + payload_len);
@@ -85,7 +88,7 @@ export class Node_Client extends EventEmitter {
_on_frame(msg_type, payload) { _on_frame(msg_type, payload) {
if (msg_type !== MSG_CONTROL_RESPONSE) { return; } if (msg_type !== MSG_CONTROL_RESPONSE) { return; }
if (payload.length < 4) { return; } if (payload.length < 4) { return; }
const request_id = payload.readUInt16BE(0); const request_id = payload.readUInt16LE(0);
const entry = this._pending.get(request_id); const entry = this._pending.get(request_id);
if (!entry) { return; } if (!entry) { return; }
this._pending.delete(request_id); this._pending.delete(request_id);
@@ -100,7 +103,7 @@ export class Node_Client extends EventEmitter {
if (!this.connected) { return Promise.reject(new Error('not connected')); } if (!this.connected) { return Promise.reject(new Error('not connected')); }
const id = this._alloc_id(); const id = this._alloc_id();
/* patch request_id into the frame payload (bytes 6-7) */ /* patch request_id into the frame payload (bytes 6-7) */
frame.writeUInt16BE(id, FRAME_HEADER_SIZE); frame.writeUInt16LE(id, FRAME_HEADER_SIZE);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this._pending.set(id, { resolve, reject, decode: decode_fn }); this._pending.set(id, { resolve, reject, decode: decode_fn });
this._socket.write(frame, err => { this._socket.write(frame, err => {

View File

@@ -35,9 +35,10 @@ function read_str8(buf, offset) {
} }
function read_i64(buf, offset) { function read_i64(buf, offset) {
/* Returns a JS number — safe for the int32 values V4L2 uses in practice. */ /* Little-endian i64: low 4 bytes first, high 4 bytes second.
const hi = buf.readInt32BE(offset); * Returns a JS number — safe for the int32 values V4L2 uses in practice. */
const lo = buf.readUInt32BE(offset + 4); const lo = buf.readUInt32LE(offset);
const hi = buf.readInt32LE(offset + 4);
return hi * 0x100000000 + lo; return hi * 0x100000000 + lo;
} }
@@ -47,8 +48,8 @@ function read_i64(buf, offset) {
export function build_frame(msg_type, payload) { export function build_frame(msg_type, payload) {
const frame = Buffer.allocUnsafe(FRAME_HEADER_SIZE + payload.length); const frame = Buffer.allocUnsafe(FRAME_HEADER_SIZE + payload.length);
frame.writeUInt16BE(msg_type, 0); frame.writeUInt16LE(msg_type, 0);
frame.writeUInt32BE(payload.length, 2); frame.writeUInt32LE(payload.length, 2);
payload.copy(frame, FRAME_HEADER_SIZE); payload.copy(frame, FRAME_HEADER_SIZE);
return frame; return frame;
} }
@@ -60,36 +61,35 @@ export function build_frame(msg_type, payload) {
export function encode_enum_devices(request_id) { export function encode_enum_devices(request_id) {
const p = Buffer.allocUnsafe(4); const p = Buffer.allocUnsafe(4);
p.writeUInt16BE(request_id, 0); p.writeUInt16LE(request_id, 0);
p.writeUInt16BE(CMD_ENUM_DEVICES, 2); p.writeUInt16LE(CMD_ENUM_DEVICES, 2);
return build_frame(MSG_CONTROL_REQUEST, p); return build_frame(MSG_CONTROL_REQUEST, p);
} }
export function encode_enum_controls(request_id, device_index) { export function encode_enum_controls(request_id, device_index) {
const p = Buffer.allocUnsafe(6); const p = Buffer.allocUnsafe(6);
p.writeUInt16BE(request_id, 0); p.writeUInt16LE(request_id, 0);
p.writeUInt16BE(CMD_ENUM_CONTROLS, 2); p.writeUInt16LE(CMD_ENUM_CONTROLS, 2);
p.writeUInt16BE(device_index, 4); p.writeUInt16LE(device_index, 4);
return build_frame(MSG_CONTROL_REQUEST, p); return build_frame(MSG_CONTROL_REQUEST, p);
} }
export function encode_get_control(request_id, device_index, control_id) { export function encode_get_control(request_id, device_index, control_id) {
const p = Buffer.allocUnsafe(10); const p = Buffer.allocUnsafe(10);
p.writeUInt16BE(request_id, 0); p.writeUInt16LE(request_id, 0);
p.writeUInt16BE(CMD_GET_CONTROL, 2); p.writeUInt16LE(CMD_GET_CONTROL, 2);
p.writeUInt16BE(device_index, 4); p.writeUInt16LE(device_index, 4);
p.writeUInt32BE(control_id, 6); p.writeUInt32LE(control_id, 6);
return build_frame(MSG_CONTROL_REQUEST, p); return build_frame(MSG_CONTROL_REQUEST, p);
} }
export function encode_set_control(request_id, device_index, control_id, value) { export function encode_set_control(request_id, device_index, control_id, value) {
const p = Buffer.allocUnsafe(14); const p = Buffer.allocUnsafe(14);
p.writeUInt16BE(request_id, 0); p.writeUInt16LE(request_id, 0);
p.writeUInt16BE(CMD_SET_CONTROL, 2); p.writeUInt16LE(CMD_SET_CONTROL, 2);
p.writeUInt16BE(device_index, 4); p.writeUInt16LE(device_index, 4);
p.writeUInt16BE(0, 6); p.writeUInt32LE(control_id, 6);
p.writeUInt32BE(control_id, 6); p.writeInt32LE(value, 10);
p.writeInt32BE(value, 10);
return build_frame(MSG_CONTROL_REQUEST, p); return build_frame(MSG_CONTROL_REQUEST, p);
} }
@@ -100,8 +100,8 @@ export function encode_set_control(request_id, device_index, control_id, value)
export function decode_response_header(payload) { export function decode_response_header(payload) {
return { return {
request_id: payload.readUInt16BE(0), request_id: payload.readUInt16LE(0),
status: payload.readUInt16BE(2), status: payload.readUInt16LE(2),
}; };
} }
@@ -110,7 +110,7 @@ export function decode_enum_devices_response(payload) {
if (hdr.status !== STATUS_OK) { return { ...hdr, media: [], standalone: [] }; } if (hdr.status !== STATUS_OK) { return { ...hdr, media: [], standalone: [] }; }
let pos = 4; let pos = 4;
const media_count = payload.readUInt16BE(pos); pos += 2; const media_count = payload.readUInt16LE(pos); pos += 2;
const media = []; const media = [];
for (let i = 0; i < media_count; i++) { for (let i = 0; i < media_count; i++) {
@@ -125,9 +125,9 @@ export function decode_enum_devices_response(payload) {
for (let j = 0; j < vcount; j++) { for (let j = 0; j < vcount; j++) {
s = read_str8(payload, pos); pos += s.size; const vpath = s.value; s = read_str8(payload, pos); pos += s.size; const vpath = s.value;
s = read_str8(payload, pos); pos += s.size; const entity_name = s.value; s = read_str8(payload, pos); pos += s.size; const entity_name = s.value;
const entity_type = payload.readUInt32BE(pos); pos += 4; const entity_type = payload.readUInt32LE(pos); pos += 4;
const entity_flags = payload.readUInt32BE(pos); pos += 4; const entity_flags = payload.readUInt32LE(pos); pos += 4;
const device_caps = payload.readUInt32BE(pos); pos += 4; const device_caps = payload.readUInt32LE(pos); pos += 4;
const pad_flags = payload.readUInt8(pos); pos++; const pad_flags = payload.readUInt8(pos); pos++;
const is_capture = payload.readUInt8(pos); pos++; const is_capture = payload.readUInt8(pos); pos++;
video_nodes.push({ path: vpath, entity_name, entity_type, video_nodes.push({ path: vpath, entity_name, entity_type,
@@ -137,7 +137,7 @@ export function decode_enum_devices_response(payload) {
media.push({ path, driver, model, bus_info, video_nodes }); media.push({ path, driver, model, bus_info, video_nodes });
} }
const standalone_count = payload.readUInt16BE(pos); pos += 2; const standalone_count = payload.readUInt16LE(pos); pos += 2;
const standalone = []; const standalone = [];
for (let i = 0; i < standalone_count; i++) { for (let i = 0; i < standalone_count; i++) {
let s; let s;
@@ -154,25 +154,25 @@ export function decode_enum_controls_response(payload) {
if (hdr.status !== STATUS_OK) { return { ...hdr, controls: [] }; } if (hdr.status !== STATUS_OK) { return { ...hdr, controls: [] }; }
let pos = 4; let pos = 4;
const count = payload.readUInt16BE(pos); pos += 2; const count = payload.readUInt16LE(pos); pos += 2;
const controls = []; const controls = [];
for (let i = 0; i < count; i++) { for (let i = 0; i < count; i++) {
const id = payload.readUInt32BE(pos); pos += 4; const id = payload.readUInt32LE(pos); pos += 4;
const type = payload.readUInt8(pos); pos++; const type = payload.readUInt8(pos); pos++;
const flags = payload.readUInt32BE(pos); pos += 4; const flags = payload.readUInt32LE(pos); pos += 4;
const s = read_str8(payload, pos); pos += s.size; const s = read_str8(payload, pos); pos += s.size;
const name = s.value; const name = s.value;
const min = payload.readInt32BE(pos); pos += 4; const min = payload.readInt32LE(pos); pos += 4;
const max = payload.readInt32BE(pos); pos += 4; const max = payload.readInt32LE(pos); pos += 4;
const step = payload.readInt32BE(pos); pos += 4; const step = payload.readInt32LE(pos); pos += 4;
const def = payload.readInt32BE(pos); pos += 4; const def = payload.readInt32LE(pos); pos += 4;
const cur = payload.readInt32BE(pos); pos += 4; const cur = payload.readInt32LE(pos); pos += 4;
const menu_count = payload.readUInt8(pos); pos++; const menu_count = payload.readUInt8(pos); pos++;
const menu_items = []; const menu_items = [];
for (let j = 0; j < menu_count; j++) { for (let j = 0; j < menu_count; j++) {
const midx = payload.readUInt32BE(pos); pos += 4; const midx = payload.readUInt32LE(pos); pos += 4;
const ms = read_str8(payload, pos); pos += ms.size; const ms = read_str8(payload, pos); pos += ms.size;
const mval = read_i64(payload, pos); pos += 8; const mval = read_i64(payload, pos); pos += 8;
menu_items.push({ index: midx, name: ms.value, int_value: mval }); menu_items.push({ index: midx, name: ms.value, int_value: mval });
@@ -187,7 +187,7 @@ export function decode_enum_controls_response(payload) {
export function decode_get_control_response(payload) { export function decode_get_control_response(payload) {
const hdr = decode_response_header(payload); const hdr = decode_response_header(payload);
const value = (hdr.status === STATUS_OK) ? payload.readInt32BE(4) : 0; const value = (hdr.status === STATUS_OK) ? payload.readInt32LE(4) : 0;
return { ...hdr, value }; return { ...hdr, value };
} }

View File

@@ -2,14 +2,15 @@
* Express 5 web server — REST bridge to video-node. * Express 5 web server — REST bridge to video-node.
* *
* Usage: * Usage:
* node server.mjs [--host IP] [--port PORT] [--discover] [--listen PORT] * node server.mjs [--host IP] [--port PORT] [--listen PORT]
* *
* If --host and --port are given, connects on startup. * Discovery runs at startup unconditionally — the server joins the multicast
* If --discover is given, runs UDP discovery to find the node automatically. * group and sends an announcement immediately so nodes respond right away.
* Otherwise, call POST /api/connect to connect at runtime. * The /api/discover SSE endpoint subscribes to the running peer feed.
*/ */
import express from 'express'; import express from 'express';
import { EventEmitter } from 'node:events';
import { fileURLToPath } from 'node:url'; import { fileURLToPath } from 'node:url';
import path from 'node:path'; import path from 'node:path';
import { Node_Client } from './node_client.mjs'; import { Node_Client } from './node_client.mjs';
@@ -27,9 +28,28 @@ const arg = name => {
const opt_host = arg('--host'); const opt_host = arg('--host');
const opt_port = arg('--port') ? parseInt(arg('--port')) : null; const opt_port = arg('--port') ? parseInt(arg('--port')) : null;
const opt_discover = args.includes('--discover');
const listen_port = arg('--listen') ? parseInt(arg('--listen')) : 3000; const listen_port = arg('--listen') ? parseInt(arg('--listen')) : 3000;
/* -------------------------------------------------------------------------
* Persistent discovery — runs from startup, feeds the SSE endpoint
* ------------------------------------------------------------------------- */
const peer_bus = new EventEmitter();
const known_peers = new Map(); /* key: addr:name -> peer */
start_discovery(
peer => {
const key = `${peer.addr}:${peer.name}`;
known_peers.set(key, peer);
peer_bus.emit('peer', peer);
console.log(`discovered: ${peer.name} at ${peer.addr}:${peer.tcp_port}`);
},
err => {
console.error('discovery error:', err.message);
peer_bus.emit('discovery_error', err);
},
);
/* ------------------------------------------------------------------------- /* -------------------------------------------------------------------------
* Node client (singleton) * Node client (singleton)
* ------------------------------------------------------------------------- */ * ------------------------------------------------------------------------- */
@@ -75,9 +95,14 @@ app.post('/api/connect', async (req, res) => {
} }
}); });
app.post('/api/disconnect', (_req, res) => {
client.disconnect();
res.json({ ok: true });
});
/* /*
* SSE stream: each discovered peer arrives as a JSON event. * SSE stream — immediately replays already-known peers, then streams new ones
* The client closes the connection when done (user picked a node or dismissed). * as they arrive. Client closes when done (user picked a node or dismissed).
*/ */
app.get('/api/discover', (req, res) => { app.get('/api/discover', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Content-Type', 'text/event-stream');
@@ -85,16 +110,19 @@ app.get('/api/discover', (req, res) => {
res.setHeader('Connection', 'keep-alive'); res.setHeader('Connection', 'keep-alive');
res.flushHeaders(); res.flushHeaders();
const stop = start_discovery(peer => { const send_peer = peer => res.write(`data: ${JSON.stringify(peer)}\n\n`);
res.write(`data: ${JSON.stringify(peer)}\n\n`); const send_error = err => res.write(`event: discovery_error\ndata: ${JSON.stringify({ error: err.message })}\n\n`);
});
req.on('close', stop); /* replay peers already seen before this SSE was opened */
}); for (const peer of known_peers.values()) { send_peer(peer); }
app.post('/api/disconnect', (_req, res) => { peer_bus.on('peer', send_peer);
client.disconnect(); peer_bus.on('discovery_error', send_error);
res.json({ ok: true });
req.on('close', () => {
peer_bus.off('peer', send_peer);
peer_bus.off('discovery_error', send_error);
});
}); });
/* -- Devices --------------------------------------------------------------- */ /* -- Devices --------------------------------------------------------------- */
@@ -158,16 +186,7 @@ app.listen(listen_port, () => {
console.log(`listening on http://localhost:${listen_port}`); console.log(`listening on http://localhost:${listen_port}`);
}); });
if (opt_discover) { if (opt_host && opt_port) {
console.log('discovering video-nodes...');
const stop = start_discovery(peer => {
console.log(`found ${peer.name} at ${peer.addr}:${peer.tcp_port} — connecting`);
stop();
client.connect(peer.addr, peer.tcp_port)
.then(() => console.log('connected'))
.catch(err => console.error('connect failed:', err.message));
});
} else if (opt_host && opt_port) {
client.connect(opt_host, opt_port) client.connect(opt_host, opt_port)
.then(() => console.log(`connected to ${opt_host}:${opt_port}`)) .then(() => console.log(`connected to ${opt_host}:${opt_port}`))
.catch(err => console.error('connect failed:', err.message)); .catch(err => console.error('connect failed:', err.message));