import readline from 'node:readline'; import net from 'node:net'; import fs from 'node:fs'; export class Relay { #target_stream = null; #socket_server = null; #line_buffer_tally = 0; constructor(listening_socket, target_socket, line_buffer=[], line_buffer_max_size=1024**2) { Object.assign(this, { listening_socket, target_socket, line_buffer, line_buffer_max_size }); } serve() { const server = net.createServer((socket) => { console.log("socket connected"); const rli = readline.createInterface({ input: socket, crlfDelay: Infinity }); rli.on('line', (line) => this.log_line(line)); socket.on('close', () => { console.log("socket closed"); }); socket.on('error', (err) => { console.log("socket error", { err }); }); }); server.on('error', (err) => { console.log("server error", { err }); }); this.listening_socket.listen(server, () => { this.#socket_server = server; }); } buffer_line(line) { this.#line_buffer_tally += line.length + 1; this.line_buffer.push(line); // NOTE: We may of course briefly overshoot here but we are not expecting or caring about huge oneliners for now while (this.line_buffer.length && (this.#line_buffer_tally > this.line_buffer_max_size)) { const discarded = this.line_buffer.shift(); this.#line_buffer_tally -= discarded.length + 1; console.log({ discarded }); } } log_line(line) { this.buffer_line(line); if (this.#target_stream) { this.#flush(); } else { this.#connect_target(); } } #connect_target() { if (this.#target_stream) { return; } const socket = net.createConnection(this.target_socket.connect_options); socket.on('connect', () => { this.#target_stream = socket; this.#flush(); }); socket.on('error', (err) => { console.log("target error", { err }); socket.destroy(); this.#target_stream = null; }); socket.on('close', () => { this.#target_stream = null; }); } #flush() { while (this.line_buffer.length && this.#target_stream) { const line = this.line_buffer[0]; const ok = this.#target_stream.write(line + '\n'); if (ok === false) { return; } this.line_buffer.shift(); } } }