import express from 'express'; import { WebSocketServer } from 'ws'; import http from 'node:http'; import serveIndex from 'serve-index'; import Busboy from 'busboy'; const app = express(); const server = http.createServer(app); const wss = new WebSocketServer({ noServer: true }); // resource -> Set const subscriptions = new Map(); function subscribe(ws, resource) { if (!subscriptions.has(resource)) { subscriptions.set(resource, new Set()); } subscriptions.get(resource).add(ws); ws._subscriptions ??= new Set(); ws._subscriptions.add(resource); } function unsubscribeAll(ws) { if (!ws._subscriptions) return; for (const resource of ws._subscriptions) { subscriptions.get(resource)?.delete(ws); } } // WebSocket upgrade server.on('upgrade', (req, socket, head) => { if (req.url !== '/subscription') { socket.destroy(); return; } wss.handleUpgrade(req, socket, head, ws => { wss.emit('connection', ws, req); }); }); wss.on('connection', ws => { ws.on('message', msg => { let data; try { data = JSON.parse(msg.toString()); } catch { return; } if (data.cmd === 'subscribe' && typeof data.resource === 'string') { subscribe(ws, data.resource); } }); ws.on('close', () => { unsubscribeAll(ws); }); }); // Emit endpoint app.post('/emit/:resource', (req, res) => { const { resource } = req.params; const chunks = []; req.on('data', chunk => chunks.push(chunk)); req.on('end', () => { const payload = Buffer.concat(chunks); res.status(204).end(); const subs = subscriptions.get(resource); if (subs) { for (const ws of subs) { try { ws.send(payload); console.log('[emit] sent to subscriber'); } catch (err) { console.log('[emit] send failed, removing subscriber:', err?.message); subs.delete(ws); } } } }); }); app.post('/emit-multipart', (req, res) => { const bb = Busboy({ headers: req.headers }); let resource = null; const chunks = []; bb.on('field', (name, value) => { if (name === 'resource') { resource = value; } }); bb.on('file', (name, file) => { file.on('data', chunk => chunks.push(chunk)); }); bb.on('close', () => { if (!resource) { res.status(400).end('missing resource'); return; } const payload = Buffer.concat(chunks); res.status(204).end(); const subs = subscriptions.get(resource); if (subs) { for (const ws of subs) { try { ws.send(payload); console.log('[emit-multipart] sent to subscriber'); } catch (err) { console.log('[emit-multipart] send failed, removing subscriber:', err?.message); subs.delete(ws); } } } }); req.pipe(bb); }); app.use('/', express.static('static'), serveIndex('static', { icons: true })); server.listen(3888);