initial
This commit is contained in:
116
relay.mjs
Normal file
116
relay.mjs
Normal file
@@ -0,0 +1,116 @@
|
||||
import readline from 'node:readline';
|
||||
import net from 'node:net';
|
||||
import fs from 'node:fs';
|
||||
|
||||
|
||||
export class Relay {
|
||||
|
||||
#target_stream = null;
|
||||
#socket_server = null;
|
||||
#line_buffer_tally = 0;
|
||||
|
||||
constructor(listening_socket, target_socket, line_buffer=[], line_buffer_max_size=1024**2) {
|
||||
Object.assign(this, { listening_socket, target_socket, line_buffer, line_buffer_max_size });
|
||||
}
|
||||
serve() {
|
||||
|
||||
const server = net.createServer((socket) => {
|
||||
|
||||
console.log("socket connected");
|
||||
|
||||
const rli = readline.createInterface({
|
||||
input: socket,
|
||||
crlfDelay: Infinity
|
||||
});
|
||||
|
||||
rli.on('line', (line) => this.log_line(line));
|
||||
|
||||
socket.on('close', () => {
|
||||
console.log("socket closed");
|
||||
});
|
||||
|
||||
socket.on('error', (err) => {
|
||||
console.log("socket error", { err });
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
|
||||
server.on('error', (err) => {
|
||||
console.log("server error", { err });
|
||||
});
|
||||
|
||||
this.listening_socket.listen(server, () => {
|
||||
this.#socket_server = server;
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
buffer_line(line) {
|
||||
|
||||
this.#line_buffer_tally += line.length + 1;
|
||||
this.line_buffer.push(line); // NOTE: We may of course briefly overshoot here but we are not expecting or caring about huge oneliners for now
|
||||
|
||||
while (this.line_buffer.length && (this.#line_buffer_tally > this.line_buffer_max_size)) {
|
||||
const discarded = this.line_buffer.shift();
|
||||
this.#line_buffer_tally -= discarded.length + 1;
|
||||
console.log({ discarded });
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
log_line(line) {
|
||||
this.buffer_line(line);
|
||||
|
||||
if (this.#target_stream) {
|
||||
this.#flush();
|
||||
} else {
|
||||
this.#connect_target();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#connect_target() {
|
||||
|
||||
if (this.#target_stream) {
|
||||
return;
|
||||
}
|
||||
|
||||
const socket = net.createConnection(this.target_socket.connect_options);
|
||||
|
||||
socket.on('connect', () => {
|
||||
this.#target_stream = socket;
|
||||
this.#flush();
|
||||
});
|
||||
|
||||
socket.on('error', (err) => {
|
||||
console.log("target error", { err });
|
||||
socket.destroy();
|
||||
this.#target_stream = null;
|
||||
});
|
||||
|
||||
socket.on('close', () => {
|
||||
this.#target_stream = null;
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
|
||||
#flush() {
|
||||
|
||||
while (this.line_buffer.length && this.#target_stream) {
|
||||
|
||||
const line = this.line_buffer[0];
|
||||
const ok = this.#target_stream.write(line + '\n');
|
||||
|
||||
if (ok === false) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.line_buffer.shift();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user