Files
video-setup/dev/web/node_client.mjs
mikael-lovqvists-claude-agent e1151410ad Fix web inspector: endianness (LE), ECONNRESET, persistent discovery
- protocol.mjs: all reads/writes switched to LE to match serial.h
- node_client.mjs: persistent error handler prevents ECONNRESET crash
- discovery.mjs: remove unnecessary SO_REUSEPORT
- server.mjs: discovery runs at startup (not per SSE open); uses
  EventEmitter + known_peers Map so SSE replays existing peers on connect

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-27 01:45:33 +00:00

139 lines
4.0 KiB
JavaScript

/*
* TCP client for video-node.
* Handles frame reassembly, request/response correlation by request_id,
* and reconnection state.
*/
import net from 'node:net';
import { EventEmitter } from 'node:events';
import {
FRAME_HEADER_SIZE, MSG_CONTROL_RESPONSE,
encode_enum_devices, encode_enum_controls,
encode_get_control, encode_set_control,
decode_enum_devices_response, decode_enum_controls_response,
decode_get_control_response, decode_set_control_response,
} from './protocol.mjs';
export class Node_Client extends EventEmitter {
constructor() {
super();
this._socket = null;
this._buf = Buffer.alloc(0);
this._pending = new Map(); /* request_id -> { resolve, reject, decode } */
this._next_id = 1;
this.connected = false;
this.host = null;
this.port = null;
}
connect(host, port) {
if (this._socket) { this._socket.destroy(); }
this.host = host;
this.port = port;
return new Promise((resolve, reject) => {
let connected = false;
const sock = net.createConnection({ host, port });
sock.once('connect', () => {
connected = true;
this._socket = sock;
this.connected = true;
this._buf = Buffer.alloc(0);
this.emit('connect');
resolve();
});
/* Single persistent error handler — errors after connect are
* non-fatal here; the 'close' event fires next and handles cleanup. */
sock.on('error', err => {
if (!connected) { reject(err); }
else { console.error('video-node socket error:', err.message); }
});
sock.on('data', chunk => this._on_data(chunk));
sock.on('close', () => {
this.connected = false;
this._socket = null;
for (const { reject: rej } of this._pending.values()) {
rej(new Error('disconnected'));
}
this._pending.clear();
this.emit('disconnect');
});
});
}
disconnect() {
if (this._socket) { this._socket.destroy(); }
}
/* ---------------------------------------------------------------------- */
_on_data(chunk) {
this._buf = Buffer.concat([this._buf, chunk]);
while (true) {
if (this._buf.length < FRAME_HEADER_SIZE) { break; }
const msg_type = this._buf.readUInt16LE(0);
const payload_len = this._buf.readUInt32LE(2);
if (this._buf.length < FRAME_HEADER_SIZE + payload_len) { break; }
const payload = this._buf.slice(FRAME_HEADER_SIZE, FRAME_HEADER_SIZE + payload_len);
this._buf = this._buf.slice(FRAME_HEADER_SIZE + payload_len);
this._on_frame(msg_type, payload);
}
}
_on_frame(msg_type, payload) {
if (msg_type !== MSG_CONTROL_RESPONSE) { return; }
if (payload.length < 4) { return; }
const request_id = payload.readUInt16LE(0);
const entry = this._pending.get(request_id);
if (!entry) { return; }
this._pending.delete(request_id);
try {
entry.resolve(entry.decode(payload));
} catch (err) {
entry.reject(err);
}
}
_request(frame, decode_fn) {
if (!this.connected) { return Promise.reject(new Error('not connected')); }
const id = this._alloc_id();
/* patch request_id into the frame payload (bytes 6-7) */
frame.writeUInt16LE(id, FRAME_HEADER_SIZE);
return new Promise((resolve, reject) => {
this._pending.set(id, { resolve, reject, decode: decode_fn });
this._socket.write(frame, err => {
if (err) { this._pending.delete(id); reject(err); }
});
});
}
_alloc_id() {
const id = this._next_id;
this._next_id = (this._next_id % 0xFFFF) + 1;
return id;
}
/* ---------------------------------------------------------------------- */
enum_devices() {
return this._request(encode_enum_devices(0), decode_enum_devices_response);
}
enum_controls(device_index) {
return this._request(encode_enum_controls(0, device_index), decode_enum_controls_response);
}
get_control(device_index, control_id) {
return this._request(encode_get_control(0, device_index, control_id), decode_get_control_response);
}
set_control(device_index, control_id, value) {
return this._request(encode_set_control(0, device_index, control_id, value), decode_set_control_response);
}
}