/* * TCP client for video-node. * Handles frame reassembly, request/response correlation by request_id, * and reconnection state. */ import net from 'node:net'; import { EventEmitter } from 'node:events'; import { FRAME_HEADER_SIZE, MSG_CONTROL_RESPONSE, encode_enum_devices, encode_enum_controls, encode_get_control, encode_set_control, decode_enum_devices_response, decode_enum_controls_response, decode_get_control_response, decode_set_control_response, } from './protocol.mjs'; export class Node_Client extends EventEmitter { constructor() { super(); this._socket = null; this._buf = Buffer.alloc(0); this._pending = new Map(); /* request_id -> { resolve, reject, decode } */ this._next_id = 1; this.connected = false; this.host = null; this.port = null; } connect(host, port) { if (this._socket) { this._socket.destroy(); } this.host = host; this.port = port; return new Promise((resolve, reject) => { let connected = false; const sock = net.createConnection({ host, port }); sock.once('connect', () => { connected = true; this._socket = sock; this.connected = true; this._buf = Buffer.alloc(0); this.emit('connect'); resolve(); }); /* Single persistent error handler — errors after connect are * non-fatal here; the 'close' event fires next and handles cleanup. */ sock.on('error', err => { if (!connected) { reject(err); } else { console.error('video-node socket error:', err.message); } }); sock.on('data', chunk => this._on_data(chunk)); sock.on('close', () => { this.connected = false; this._socket = null; for (const { reject: rej } of this._pending.values()) { rej(new Error('disconnected')); } this._pending.clear(); this.emit('disconnect'); }); }); } disconnect() { if (this._socket) { this._socket.destroy(); } } /* ---------------------------------------------------------------------- */ _on_data(chunk) { this._buf = Buffer.concat([this._buf, chunk]); while (true) { if (this._buf.length < FRAME_HEADER_SIZE) { break; } const msg_type = this._buf.readUInt16LE(0); const payload_len = this._buf.readUInt32LE(2); if (this._buf.length < FRAME_HEADER_SIZE + payload_len) { break; } const payload = this._buf.slice(FRAME_HEADER_SIZE, FRAME_HEADER_SIZE + payload_len); this._buf = this._buf.slice(FRAME_HEADER_SIZE + payload_len); this._on_frame(msg_type, payload); } } _on_frame(msg_type, payload) { if (msg_type !== MSG_CONTROL_RESPONSE) { return; } if (payload.length < 4) { return; } const request_id = payload.readUInt16LE(0); const entry = this._pending.get(request_id); if (!entry) { return; } this._pending.delete(request_id); try { entry.resolve(entry.decode(payload)); } catch (err) { entry.reject(err); } } _request(frame, decode_fn) { if (!this.connected) { return Promise.reject(new Error('not connected')); } const id = this._alloc_id(); /* patch request_id into the frame payload (bytes 6-7) */ frame.writeUInt16LE(id, FRAME_HEADER_SIZE); return new Promise((resolve, reject) => { this._pending.set(id, { resolve, reject, decode: decode_fn }); this._socket.write(frame, err => { if (err) { this._pending.delete(id); reject(err); } }); }); } _alloc_id() { const id = this._next_id; this._next_id = (this._next_id % 0xFFFF) + 1; return id; } /* ---------------------------------------------------------------------- */ enum_devices() { return this._request(encode_enum_devices(0), decode_enum_devices_response); } enum_controls(device_index) { return this._request(encode_enum_controls(0, device_index), decode_enum_controls_response); } get_control(device_index, control_id) { return this._request(encode_get_control(0, device_index, control_id), decode_get_control_response); } set_control(device_index, control_id, value) { return this._request(encode_set_control(0, device_index, control_id, value), decode_set_control_response); } }