Add display sink: START_DISPLAY/STOP_DISPLAY, multi-window xorg, random port

Protocol:
- Add PROTO_CMD_START_DISPLAY (0x000A) and PROTO_CMD_STOP_DISPLAY (0x000B)
  with write/read functions; Proto_Start_Display carries stream_id, window
  position/size, scale and anchor; PROTO_DISPLAY_SCALE_*/ANCHOR_* constants

Node display sink:
- Display_Slot struct with wanted_state/current_state (DISP_CLOSED/DISP_OPEN);
  handlers set wanted state, display_loop_tick on main thread reconciles
- Up to MAX_DISPLAYS (4) simultaneous viewer windows
- on_frame routes incoming VIDEO_FRAME messages to matching display slot;
  transport thread deposits payload, main thread consumes without holding lock
  during JPEG decode/upload
- Main thread runs GL event loop when xorg is available; headless fallback
  joins reconciler timer thread as before

Xorg multi-window:
- Ref-count glfwInit/glfwTerminate via glfw_acquire/glfw_release so closing
  one viewer does not terminate GLFW for remaining windows
- Add glfwMakeContextCurrent before GL calls in push_yuv420, push_bgra,
  push_mjpeg and poll so each viewer uses its own GL context correctly

Transport random port:
- Bind port 0 lets the OS assign a free port; getsockname reads it back
  into server->bound_port after bind
- Add transport_server_get_port() accessor
- Default tcp_port changed from 8000 to 0 (random); node prints actual
  port after server start so it is always visible in output
- Add --port PORT CLI override (before config-file argument)

controller_cli:
- Add start-display and stop-display commands

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-29 08:03:21 +00:00
parent 28216999e0
commit 32d31cbd1e
6 changed files with 199 additions and 9 deletions

View File

@@ -302,6 +302,39 @@ static void cmd_stop_ingest(struct Transport_Conn *conn,
proto_write_stop_ingest(conn, next_req_id(req), stream_id)); proto_write_stop_ingest(conn, next_req_id(req), stream_id));
} }
static void cmd_start_display(struct Transport_Conn *conn,
struct Ctrl_State *cs, uint16_t *req,
int ntok, char *tokens[])
{
/* Required: stream_id
* Optional: win_x win_y win_w win_h */
if (ntok < 2) {
printf("usage: start-display <stream_id> [win_x] [win_y] [win_w] [win_h]\n");
return;
}
uint16_t stream_id = (uint16_t)atoi(tokens[1]);
int16_t win_x = ntok > 2 ? (int16_t)atoi(tokens[2]) : 0;
int16_t win_y = ntok > 3 ? (int16_t)atoi(tokens[3]) : 0;
uint16_t win_w = ntok > 4 ? (uint16_t)atoi(tokens[4]) : 0;
uint16_t win_h = ntok > 5 ? (uint16_t)atoi(tokens[5]) : 0;
printf("start-display: stream=%u pos=%d,%d size=%ux%u\n",
stream_id, win_x, win_y, win_w, win_h);
SEND_AND_WAIT(cs, PROTO_CMD_START_DISPLAY,
proto_write_start_display(conn, next_req_id(req),
stream_id, win_x, win_y, win_w, win_h,
PROTO_DISPLAY_SCALE_FIT, PROTO_DISPLAY_ANCHOR_CENTER));
}
static void cmd_stop_display(struct Transport_Conn *conn,
struct Ctrl_State *cs, uint16_t *req,
const char *sid_str)
{
uint16_t stream_id = (uint16_t)atoi(sid_str);
printf("stop-display: stream=%u\n", stream_id);
SEND_AND_WAIT(cs, PROTO_CMD_STOP_DISPLAY,
proto_write_stop_display(conn, next_req_id(req), stream_id));
}
static void cmd_help(void) static void cmd_help(void)
{ {
printf("commands:\n" printf("commands:\n"
@@ -312,6 +345,8 @@ static void cmd_help(void)
" start-ingest <stream_id> <device> <dest_host> <dest_port>" " start-ingest <stream_id> <device> <dest_host> <dest_port>"
" [format] [width] [height] [fps_n] [fps_d]\n" " [format] [width] [height] [fps_n] [fps_d]\n"
" stop-ingest <stream_id>\n" " stop-ingest <stream_id>\n"
" start-display <stream_id> [win_x] [win_y] [win_w] [win_h]\n"
" stop-display <stream_id>\n"
" help\n" " help\n"
" quit / exit\n"); " quit / exit\n");
} }
@@ -412,6 +447,11 @@ int main(int argc, char **argv)
} else if (strcmp(cmd, "stop-ingest") == 0) { } else if (strcmp(cmd, "stop-ingest") == 0) {
if (ntok < 2) { printf("usage: stop-ingest <stream_id>\n"); } if (ntok < 2) { printf("usage: stop-ingest <stream_id>\n"); }
else { cmd_stop_ingest(conn, &cs, &req_id, tokens[1]); } else { cmd_stop_ingest(conn, &cs, &req_id, tokens[1]); }
} else if (strcmp(cmd, "start-display") == 0) {
cmd_start_display(conn, &cs, &req_id, ntok, tokens);
} else if (strcmp(cmd, "stop-display") == 0) {
if (ntok < 2) { printf("usage: stop-display <stream_id>\n"); }
else { cmd_stop_display(conn, &cs, &req_id, tokens[1]); }
} else { } else {
printf("unknown command: %s (type 'help' for commands)\n", cmd); printf("unknown command: %s (type 'help' for commands)\n", cmd);
} }

View File

@@ -26,6 +26,8 @@
#define PROTO_CMD_ENUM_MONITORS 0x0007u #define PROTO_CMD_ENUM_MONITORS 0x0007u
#define PROTO_CMD_START_INGEST 0x0008u #define PROTO_CMD_START_INGEST 0x0008u
#define PROTO_CMD_STOP_INGEST 0x0009u #define PROTO_CMD_STOP_INGEST 0x0009u
#define PROTO_CMD_START_DISPLAY 0x000Au
#define PROTO_CMD_STOP_DISPLAY 0x000Bu
/* ------------------------------------------------------------------------- /* -------------------------------------------------------------------------
* Response status codes (carried in CONTROL_RESPONSE payload offset 2) * Response status codes (carried in CONTROL_RESPONSE payload offset 2)
@@ -232,6 +234,39 @@ struct Proto_Stop_Ingest {
uint16_t stream_id; uint16_t stream_id;
}; };
/*
* START_DISPLAY: controller tells a sink node to open a viewer window and
* display incoming VIDEO_FRAME messages for the given stream_id.
* win_x/win_y are screen-space window position (signed: multi-monitor).
* win_w/win_h of 0 mean use a default size.
* scale: 0=stretch 1=fit 2=fill 3=1:1 (PROTO_DISPLAY_SCALE_*)
* anchor: 0=center 1=topleft (PROTO_DISPLAY_ANCHOR_*)
*/
struct Proto_Start_Display {
uint16_t request_id;
uint16_t stream_id;
int16_t win_x;
int16_t win_y;
uint16_t win_w;
uint16_t win_h;
uint8_t scale;
uint8_t anchor;
};
struct Proto_Stop_Display {
uint16_t request_id;
uint16_t stream_id;
};
/* Scale/anchor constants for Proto_Start_Display */
#define PROTO_DISPLAY_SCALE_STRETCH 0u
#define PROTO_DISPLAY_SCALE_FIT 1u
#define PROTO_DISPLAY_SCALE_FILL 2u
#define PROTO_DISPLAY_SCALE_1_1 3u
#define PROTO_DISPLAY_ANCHOR_CENTER 0u
#define PROTO_DISPLAY_ANCHOR_TOPLEFT 1u
struct Proto_Response_Header { struct Proto_Response_Header {
uint16_t request_id; uint16_t request_id;
uint16_t status; uint16_t status;
@@ -301,6 +336,16 @@ struct App_Error proto_write_start_ingest(struct Transport_Conn *conn,
struct App_Error proto_write_stop_ingest(struct Transport_Conn *conn, struct App_Error proto_write_stop_ingest(struct Transport_Conn *conn,
uint16_t request_id, uint16_t stream_id); uint16_t request_id, uint16_t stream_id);
/* CONTROL_REQUEST: START_DISPLAY */
struct App_Error proto_write_start_display(struct Transport_Conn *conn,
uint16_t request_id, uint16_t stream_id,
int16_t win_x, int16_t win_y, uint16_t win_w, uint16_t win_h,
uint8_t scale, uint8_t anchor);
/* CONTROL_REQUEST: STOP_DISPLAY */
struct App_Error proto_write_stop_display(struct Transport_Conn *conn,
uint16_t request_id, uint16_t stream_id);
/* /*
* CONTROL_RESPONSE: generic. * CONTROL_RESPONSE: generic.
* payload/payload_len are the command-specific bytes after request_id+status. * payload/payload_len are the command-specific bytes after request_id+status.
@@ -381,6 +426,14 @@ struct App_Error proto_read_stop_ingest(
const uint8_t *payload, uint32_t length, const uint8_t *payload, uint32_t length,
struct Proto_Stop_Ingest *out); struct Proto_Stop_Ingest *out);
struct App_Error proto_read_start_display(
const uint8_t *payload, uint32_t length,
struct Proto_Start_Display *out);
struct App_Error proto_read_stop_display(
const uint8_t *payload, uint32_t length,
struct Proto_Stop_Display *out);
/* /*
* Read the common 4-byte response header (request_id + status). * Read the common 4-byte response header (request_id + status).
* For responses with no extra fields (STREAM_OPEN, STREAM_CLOSE, SET_CONTROL), * For responses with no extra fields (STREAM_OPEN, STREAM_CLOSE, SET_CONTROL),

View File

@@ -51,9 +51,15 @@ struct Transport_Server_Config {
struct App_Error transport_server_create(struct Transport_Server **out, struct App_Error transport_server_create(struct Transport_Server **out,
struct Transport_Server_Config *config); struct Transport_Server_Config *config);
/* Bind, listen, and spawn the accept thread. */ /* Bind, listen, and spawn the accept thread.
* If config.port is 0, the OS assigns a free port; use
* transport_server_get_port() afterwards to retrieve it. */
struct App_Error transport_server_start(struct Transport_Server *server); struct App_Error transport_server_start(struct Transport_Server *server);
/* Return the port the server is actually listening on.
* Valid after a successful transport_server_start(). */
uint16_t transport_server_get_port(const struct Transport_Server *server);
/* /*
* Stop accepting new connections and free the server. * Stop accepting new connections and free the server.
* Active connections continue until they disconnect naturally. * Active connections continue until they disconnect naturally.

View File

@@ -601,6 +601,65 @@ struct App_Error proto_read_stop_ingest(
return APP_OK; return APP_OK;
} }
/* START_DISPLAY: request_id(2) cmd(2) stream_id(2) win_x(2) win_y(2)
* win_w(2) win_h(2) scale(1) anchor(1) = 16 bytes */
struct App_Error proto_write_start_display(struct Transport_Conn *conn,
uint16_t request_id, uint16_t stream_id,
int16_t win_x, int16_t win_y, uint16_t win_w, uint16_t win_h,
uint8_t scale, uint8_t anchor)
{
uint8_t buf[16];
uint32_t o = 0;
put_u16(buf, o, request_id); o += 2;
put_u16(buf, o, PROTO_CMD_START_DISPLAY); o += 2;
put_u16(buf, o, stream_id); o += 2;
put_i16(buf, o, win_x); o += 2;
put_i16(buf, o, win_y); o += 2;
put_u16(buf, o, win_w); o += 2;
put_u16(buf, o, win_h); o += 2;
put_u8 (buf, o, scale); o += 1;
put_u8 (buf, o, anchor); o += 1;
(void)o;
return transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, 16);
}
struct App_Error proto_write_stop_display(struct Transport_Conn *conn,
uint16_t request_id, uint16_t stream_id)
{
uint8_t buf[6];
put_u16(buf, 0, request_id);
put_u16(buf, 2, PROTO_CMD_STOP_DISPLAY);
put_u16(buf, 4, stream_id);
return transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, 6);
}
struct App_Error proto_read_start_display(
const uint8_t *payload, uint32_t length,
struct Proto_Start_Display *out)
{
if (length < 16) { return APP_INVALID_ERROR_MSG(0, "START_DISPLAY payload too short"); }
out->request_id = get_u16(payload, 0);
/* skip command word at [2..3] */
out->stream_id = get_u16(payload, 4);
out->win_x = get_i16(payload, 6);
out->win_y = get_i16(payload, 8);
out->win_w = get_u16(payload, 10);
out->win_h = get_u16(payload, 12);
out->scale = get_u8 (payload, 14);
out->anchor = get_u8 (payload, 15);
return APP_OK;
}
struct App_Error proto_read_stop_display(
const uint8_t *payload, uint32_t length,
struct Proto_Stop_Display *out)
{
if (length < 6) { return APP_INVALID_ERROR_MSG(0, "STOP_DISPLAY payload too short"); }
out->request_id = get_u16(payload, 0);
out->stream_id = get_u16(payload, 4);
return APP_OK;
}
struct App_Error proto_read_response_header( struct App_Error proto_read_response_header(
const uint8_t *payload, uint32_t length, const uint8_t *payload, uint32_t length,
struct Proto_Response_Header *out) struct Proto_Response_Header *out)

View File

@@ -23,6 +23,7 @@ struct Transport_Conn {
struct Transport_Server { struct Transport_Server {
int listen_fd; int listen_fd;
uint16_t bound_port; /* actual port after bind */
struct Transport_Server_Config config; struct Transport_Server_Config config;
pthread_t accept_thread; pthread_t accept_thread;
pthread_mutex_t count_mutex; pthread_mutex_t count_mutex;
@@ -209,6 +210,15 @@ struct App_Error transport_server_start(struct Transport_Server *server) {
return APP_SYSCALL_ERROR(); return APP_SYSCALL_ERROR();
} }
/* Read back the actual port (matters when config.port == 0) */
struct sockaddr_in bound = {0};
socklen_t bound_len = sizeof(bound);
if (getsockname(fd, (struct sockaddr *)&bound, &bound_len) == 0) {
server->bound_port = ntohs(bound.sin_port);
} else {
server->bound_port = server->config.port;
}
if (listen(fd, SOMAXCONN) < 0) { if (listen(fd, SOMAXCONN) < 0) {
close(fd); close(fd);
return APP_SYSCALL_ERROR(); return APP_SYSCALL_ERROR();
@@ -235,6 +245,10 @@ void transport_server_destroy(struct Transport_Server *server) {
free(server); free(server);
} }
uint16_t transport_server_get_port(const struct Transport_Server *server) {
return server->bound_port;
}
struct App_Error transport_connect(struct Transport_Conn **out, struct App_Error transport_connect(struct Transport_Conn **out,
const char *host, uint16_t port, const char *host, uint16_t port,
uint32_t max_payload, uint32_t max_payload,

View File

@@ -12,6 +12,23 @@
#include "xorg.h" #include "xorg.h"
#include "font_atlas.h" /* generated: font_glyphs[], font_atlas_pixels[], FONT_ATLAS_W/H */ #include "font_atlas.h" /* generated: font_glyphs[], font_atlas_pixels[], FONT_ATLAS_W/H */
/* Reference count for glfwInit/glfwTerminate.
* All xorg calls happen on the main thread, so no locking needed. */
static int glfw_ref_count = 0;
static void glfw_acquire(void)
{
if (glfw_ref_count == 0) { glfwInit(); }
glfw_ref_count++;
}
static void glfw_release(void)
{
if (glfw_ref_count <= 0) { return; }
glfw_ref_count--;
if (glfw_ref_count == 0) { glfwTerminate(); }
}
/* ------------------------------------------------------------------ */ /* ------------------------------------------------------------------ */
/* Shader sources — video */ /* Shader sources — video */
/* ------------------------------------------------------------------ */ /* ------------------------------------------------------------------ */
@@ -326,10 +343,7 @@ static bool init_text_rendering(Xorg_Viewer *v)
Xorg_Viewer *xorg_viewer_open(int x, int y, int width, int height, Xorg_Viewer *xorg_viewer_open(int x, int y, int width, int height,
const char *title) const char *title)
{ {
if (!glfwInit()) { glfw_acquire();
fprintf(stderr, "xorg: glfwInit failed\n");
return NULL;
}
glfwWindowHint(GLFW_CONTEXT_VERSION_MAJOR, 3); glfwWindowHint(GLFW_CONTEXT_VERSION_MAJOR, 3);
glfwWindowHint(GLFW_CONTEXT_VERSION_MINOR, 3); glfwWindowHint(GLFW_CONTEXT_VERSION_MINOR, 3);
@@ -338,7 +352,7 @@ Xorg_Viewer *xorg_viewer_open(int x, int y, int width, int height,
GLFWwindow *win = glfwCreateWindow(width, height, title, NULL, NULL); GLFWwindow *win = glfwCreateWindow(width, height, title, NULL, NULL);
if (!win) { if (!win) {
fprintf(stderr, "xorg: glfwCreateWindow failed\n"); fprintf(stderr, "xorg: glfwCreateWindow failed\n");
glfwTerminate(); glfw_release();
return NULL; return NULL;
} }
glfwSetWindowPos(win, x, y); glfwSetWindowPos(win, x, y);
@@ -350,14 +364,14 @@ Xorg_Viewer *xorg_viewer_open(int x, int y, int width, int height,
if (glewInit() != GLEW_OK) { if (glewInit() != GLEW_OK) {
fprintf(stderr, "xorg: glewInit failed\n"); fprintf(stderr, "xorg: glewInit failed\n");
glfwDestroyWindow(win); glfwDestroyWindow(win);
glfwTerminate(); glfw_release();
return NULL; return NULL;
} }
Xorg_Viewer *v = calloc(1, sizeof(*v)); Xorg_Viewer *v = calloc(1, sizeof(*v));
if (!v) { if (!v) {
glfwDestroyWindow(win); glfwDestroyWindow(win);
glfwTerminate(); glfw_release();
return NULL; return NULL;
} }
v->window = win; v->window = win;
@@ -727,6 +741,7 @@ bool xorg_viewer_push_yuv420(Xorg_Viewer *v,
int width, int height) int width, int height)
{ {
if (!v) { return false; } if (!v) { return false; }
glfwMakeContextCurrent(v->window);
v->frame_w = width; v->frame_w = width;
v->frame_h = height; v->frame_h = height;
upload_yuv(v, y, width, height, cb, width / 2, height / 2, cr); upload_yuv(v, y, width, height, cb, width / 2, height / 2, cr);
@@ -737,6 +752,7 @@ bool xorg_viewer_push_bgra(Xorg_Viewer *v,
const uint8_t *data, int width, int height) const uint8_t *data, int width, int height)
{ {
if (!v) { return false; } if (!v) { return false; }
glfwMakeContextCurrent(v->window);
v->frame_w = width; v->frame_w = width;
v->frame_h = height; v->frame_h = height;
@@ -759,6 +775,7 @@ bool xorg_viewer_push_mjpeg(Xorg_Viewer *v,
return false; return false;
#else #else
if (!v) { return false; } if (!v) { return false; }
glfwMakeContextCurrent(v->window);
int w, h, subsamp, colorspace; int w, h, subsamp, colorspace;
if (tjDecompressHeader3(v->tj, data, (unsigned long)size, if (tjDecompressHeader3(v->tj, data, (unsigned long)size,
@@ -809,6 +826,7 @@ bool xorg_viewer_poll(Xorg_Viewer *v)
if (!v || glfwWindowShouldClose(v->window)) { return false; } if (!v || glfwWindowShouldClose(v->window)) { return false; }
glfwPollEvents(); glfwPollEvents();
if (glfwWindowShouldClose(v->window)) { return false; } if (glfwWindowShouldClose(v->window)) { return false; }
glfwMakeContextCurrent(v->window);
render(v); render(v);
return true; return true;
} }
@@ -838,7 +856,7 @@ void xorg_viewer_close(Xorg_Viewer *v)
if (v->prog_rgb) { glDeleteProgram(v->prog_rgb); } if (v->prog_rgb) { glDeleteProgram(v->prog_rgb); }
if (v->window) { if (v->window) {
glfwDestroyWindow(v->window); glfwDestroyWindow(v->window);
glfwTerminate(); glfw_release();
} }
free(v); free(v);
} }