import readline from 'node:readline'; import fs from 'node:fs'; import net from 'node:net'; export class Stream_Relay { #target_stream = null; #line_buffer_tally = 0; #connecting = false; constructor(source_stream, target_socket, line_buffer=[], line_buffer_max_size=1024**2) { Object.assign(this, { source_stream, target_socket, line_buffer, line_buffer_max_size }); } serve() { const rli = readline.createInterface({ input: this.source_stream, crlfDelay: Infinity }); rli.on('line', (line) => this.log_line(line)); } buffer_line(line) { this.#line_buffer_tally += line.length + 1; this.line_buffer.push(line); // NOTE: We may of course briefly overshoot here but we are not expecting or caring about huge oneliners for now while (this.line_buffer.length && (this.#line_buffer_tally > this.line_buffer_max_size)) { const discarded = this.line_buffer.shift(); this.#line_buffer_tally -= discarded.length + 1; console.log({ discarded }); } } log_line(line) { this.buffer_line(line); if (this.#target_stream) { this.#flush(); } else { this.#connect_target(); } } #connect_target() { if (this.#target_stream || this.#connecting) { return; } this.#connecting = true; const socket = net.createConnection(this.target_socket.connect_options); socket.on('connect', () => { this.#connecting = false; this.#target_stream = socket; this.#flush(); }); socket.on('error', (err) => { console.log("target error", { err }); socket.destroy(); this.#connecting = false; }); socket.on('close', () => { this.#target_stream = null; }); } #flush() { while (this.line_buffer.length && this.#target_stream) { const line = this.line_buffer[0]; const ok = this.#target_stream.write(line + '\n'); if (ok === false) { return; } this.line_buffer.shift(); } } }