Initial commit
This commit is contained in:
137
server.mjs
Normal file
137
server.mjs
Normal file
@@ -0,0 +1,137 @@
|
||||
import express from 'express';
|
||||
import { WebSocketServer } from 'ws';
|
||||
import http from 'node:http';
|
||||
import serveIndex from 'serve-index';
|
||||
import Busboy from 'busboy';
|
||||
|
||||
|
||||
const app = express();
|
||||
const server = http.createServer(app);
|
||||
const wss = new WebSocketServer({ noServer: true });
|
||||
|
||||
|
||||
// resource -> Set<WebSocket>
|
||||
const subscriptions = new Map();
|
||||
|
||||
function subscribe(ws, resource) {
|
||||
if (!subscriptions.has(resource)) {
|
||||
subscriptions.set(resource, new Set());
|
||||
}
|
||||
subscriptions.get(resource).add(ws);
|
||||
ws._subscriptions ??= new Set();
|
||||
ws._subscriptions.add(resource);
|
||||
}
|
||||
|
||||
function unsubscribeAll(ws) {
|
||||
if (!ws._subscriptions) return;
|
||||
for (const resource of ws._subscriptions) {
|
||||
subscriptions.get(resource)?.delete(ws);
|
||||
}
|
||||
}
|
||||
|
||||
// WebSocket upgrade
|
||||
server.on('upgrade', (req, socket, head) => {
|
||||
if (req.url !== '/subscription') {
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
|
||||
wss.handleUpgrade(req, socket, head, ws => {
|
||||
wss.emit('connection', ws, req);
|
||||
});
|
||||
});
|
||||
|
||||
wss.on('connection', ws => {
|
||||
ws.on('message', msg => {
|
||||
let data;
|
||||
try {
|
||||
data = JSON.parse(msg.toString());
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.cmd === 'subscribe' && typeof data.resource === 'string') {
|
||||
subscribe(ws, data.resource);
|
||||
}
|
||||
});
|
||||
|
||||
ws.on('close', () => {
|
||||
unsubscribeAll(ws);
|
||||
});
|
||||
});
|
||||
|
||||
// Emit endpoint
|
||||
|
||||
app.post('/emit/:resource', (req, res) => {
|
||||
const { resource } = req.params;
|
||||
const chunks = [];
|
||||
|
||||
req.on('data', chunk => chunks.push(chunk));
|
||||
req.on('end', () => {
|
||||
const payload = Buffer.concat(chunks);
|
||||
res.status(204).end();
|
||||
|
||||
const subs = subscriptions.get(resource);
|
||||
if (subs) {
|
||||
for (const ws of subs) {
|
||||
try {
|
||||
ws.send(payload);
|
||||
console.log('[emit] sent to subscriber');
|
||||
} catch (err) {
|
||||
console.log('[emit] send failed, removing subscriber:', err?.message);
|
||||
subs.delete(ws);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
app.post('/emit-multipart', (req, res) => {
|
||||
const bb = Busboy({ headers: req.headers });
|
||||
|
||||
let resource = null;
|
||||
const chunks = [];
|
||||
|
||||
bb.on('field', (name, value) => {
|
||||
if (name === 'resource') {
|
||||
resource = value;
|
||||
}
|
||||
});
|
||||
|
||||
bb.on('file', (name, file) => {
|
||||
file.on('data', chunk => chunks.push(chunk));
|
||||
});
|
||||
|
||||
bb.on('close', () => {
|
||||
if (!resource) {
|
||||
res.status(400).end('missing resource');
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = Buffer.concat(chunks);
|
||||
res.status(204).end();
|
||||
|
||||
const subs = subscriptions.get(resource);
|
||||
if (subs) {
|
||||
for (const ws of subs) {
|
||||
try {
|
||||
ws.send(payload);
|
||||
console.log('[emit-multipart] sent to subscriber');
|
||||
} catch (err) {
|
||||
console.log('[emit-multipart] send failed, removing subscriber:', err?.message);
|
||||
subs.delete(ws);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
req.pipe(bb);
|
||||
});
|
||||
|
||||
|
||||
app.use('/', express.static('static'), serveIndex('static', { icons: true }));
|
||||
|
||||
|
||||
server.listen(3888);
|
||||
Reference in New Issue
Block a user