diff --git a/dev/cli/Makefile b/dev/cli/Makefile index 633d783..650e3e3 100644 --- a/dev/cli/Makefile +++ b/dev/cli/Makefile @@ -51,17 +51,19 @@ all: \ $(CLI_BUILD)/controller_cli # Module objects delegate to their sub-makes. -$(COMMON_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/common -$(MEDIA_CTRL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/media_ctrl -$(V4L2_CTRL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl -$(SERIAL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/serial -$(TRANSPORT_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/transport -$(DISCOVERY_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/discovery -$(CONFIG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/config -$(PROTOCOL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/protocol -$(TEST_IMAGE_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/test_image -$(XORG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/xorg -$(RECONCILER_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/reconciler +# 'force' ensures the sub-make is always invoked so it can check source timestamps itself. +.PHONY: force +$(COMMON_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/common +$(MEDIA_CTRL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/media_ctrl +$(V4L2_CTRL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl +$(SERIAL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/serial +$(TRANSPORT_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/transport +$(DISCOVERY_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/discovery +$(CONFIG_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/config +$(PROTOCOL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/protocol +$(TEST_IMAGE_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/test_image +$(XORG_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/xorg +$(RECONCILER_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/reconciler # Compile each CLI source to its own .o (generates .d alongside). $(CLI_BUILD)/%.o: %.c | $(CLI_BUILD) diff --git a/src/node/Makefile b/src/node/Makefile index b241f29..9e60899 100644 --- a/src/node/Makefile +++ b/src/node/Makefile @@ -28,17 +28,19 @@ $(NODE_BUILD)/video-node: $(MAIN_OBJ) \ $(MAIN_OBJ): main.c | $(NODE_BUILD) $(CC) $(CFLAGS) $(DEPFLAGS) -c -o $@ $< -$(COMMON_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/common -$(MEDIA_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/media_ctrl -$(V4L2_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl -$(SERIAL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/serial -$(TRANSPORT_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/transport -$(DISCOVERY_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/discovery -$(CONFIG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/config -$(PROTOCOL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/protocol -$(RECONCILER_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/reconciler -$(INGEST_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/ingest -$(XORG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/xorg +# 'force' ensures the sub-make is always invoked so it can check source timestamps itself. +.PHONY: force +$(COMMON_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/common +$(MEDIA_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/media_ctrl +$(V4L2_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl +$(SERIAL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/serial +$(TRANSPORT_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/transport +$(DISCOVERY_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/discovery +$(CONFIG_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/config +$(PROTOCOL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/protocol +$(RECONCILER_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/reconciler +$(INGEST_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/ingest +$(XORG_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/xorg $(NODE_BUILD): mkdir -p $@ diff --git a/src/node/main.c b/src/node/main.c index 3813edc..adb6979 100644 --- a/src/node/main.c +++ b/src/node/main.c @@ -19,13 +19,15 @@ #include "reconciler.h" #include "ingest.h" #include "stream_stats.h" +#include "xorg.h" #include "error.h" /* ------------------------------------------------------------------------- * Stream slot constants * ------------------------------------------------------------------------- */ -#define MAX_STREAMS 8 +#define MAX_STREAMS 8 +#define MAX_DISPLAYS 4 #define DEV_CLOSED 0 #define DEV_OPEN 1 @@ -96,6 +98,12 @@ static int trans_transport_disconnect(void *ud); static void on_ingest_frame(const uint8_t *data, uint32_t len, int width, int height, uint32_t pixfmt, void *userdata); static void on_ingest_error(const char *msg, void *userdata); +static void handle_start_display(struct Node *node, + struct Transport_Conn *conn, + const uint8_t *payload, uint32_t length); +static void handle_stop_display(struct Node *node, + struct Transport_Conn *conn, + const uint8_t *payload, uint32_t length); /* ------------------------------------------------------------------------- * Transition tables — shared across all stream slots @@ -147,6 +155,38 @@ struct Ingest_Stream { struct Node *node; }; +/* ------------------------------------------------------------------------- + * Display slot — sink role; one per displayed stream + * ------------------------------------------------------------------------- */ + +/* Display states (wanted/current) */ +#define DISP_CLOSED 0 +#define DISP_OPEN 1 + +struct Display_Slot { + pthread_mutex_t mutex; /* protects wanted_state + frame fields */ + int allocated; /* slot is in use */ + uint16_t stream_id; + + /* Declarative state — handlers write wanted, main loop reconciles */ + int wanted_state; /* DISP_CLOSED / DISP_OPEN */ + int current_state;/* driven by display_loop_tick */ + + /* Config — written by handle_start_display before setting wanted */ + int win_x, win_y; + int win_w, win_h; + Xorg_Scale scale; + Xorg_Anchor anchor; + + /* Pending frame — deposited by transport thread, consumed by main */ + uint8_t *frame_data; /* malloc'd transport payload; owned */ + uint32_t frame_len; + int frame_ready; + + /* Viewer — created and used only on the main thread */ + Xorg_Viewer *viewer; +}; + /* ------------------------------------------------------------------------- * Node * ------------------------------------------------------------------------- */ @@ -161,6 +201,7 @@ struct Node { pthread_mutex_t reconciler_mutex; struct Ingest_Stream streams[MAX_STREAMS]; + struct Display_Slot displays[MAX_DISPLAYS]; }; /* ------------------------------------------------------------------------- @@ -215,11 +256,12 @@ static void on_ingest_frame(const uint8_t *data, uint32_t len, struct App_Error e = proto_write_video_frame(s->conn, s->stream_id, data, len); if (!APP_IS_OK(e)) { /* transport will fire on_disconnect; nothing to do here */ + } else { + stream_stats_record_frame(&s->stats, len); } } pthread_mutex_unlock(&s->conn_mutex); - stream_stats_record_frame(&s->stats, len); if (stream_stats_tick(&s->stats)) { fprintf(stderr, "stream %u %.1f fps %.2f Mbps\n", s->stream_id, s->stats.fps, s->stats.mbps); @@ -399,6 +441,111 @@ static void on_reconciler_log(const struct Rec_Resource *res, success ? "ok" : "FAILED"); } +/* ------------------------------------------------------------------------- + * Main-thread display loop + * + * Called from the main thread only. Opens/closes Xorg_Viewer windows as + * display slots are created or destroyed, delivers pending frames, and polls + * GLFW events for all active viewers. + * ------------------------------------------------------------------------- */ + +/* + * display_loop_tick — main-thread reconciler for display slots. + * + * Runs on the main thread only (GLFW requirement for xorg_viewer_open/close). + * Reads wanted_state set by handlers; drives current_state through transitions. + * For slots that are OPEN: delivers pending frames and polls window events. + */ +static void display_loop_tick(struct Node *node) +{ + for (int i = 0; i < MAX_DISPLAYS; i++) { + struct Display_Slot *d = &node->displays[i]; + + pthread_mutex_lock(&d->mutex); + int allocated = d->allocated; + int wanted = d->wanted_state; + int current = d->current_state; + pthread_mutex_unlock(&d->mutex); + + if (!allocated) { continue; } + + /* Reconcile: drive current toward wanted */ + if (current != wanted) { + if (wanted == DISP_OPEN && current == DISP_CLOSED) { + char title[64]; + snprintf(title, sizeof(title), "stream %u", (unsigned)d->stream_id); + Xorg_Viewer *v = xorg_viewer_open( + d->win_x, d->win_y, d->win_w, d->win_h, title); + if (v) { + xorg_viewer_set_scale(v, d->scale); + xorg_viewer_set_anchor(v, d->anchor); + d->viewer = v; + pthread_mutex_lock(&d->mutex); + d->current_state = DISP_OPEN; + pthread_mutex_unlock(&d->mutex); + } + /* If open failed, current stays CLOSED; will retry next tick */ + } else if (wanted == DISP_CLOSED && current == DISP_OPEN) { + if (d->viewer) { + xorg_viewer_close(d->viewer); + d->viewer = NULL; + } + pthread_mutex_lock(&d->mutex); + d->current_state = DISP_CLOSED; + if (d->frame_data) { free(d->frame_data); d->frame_data = NULL; } + d->frame_ready = 0; + d->allocated = 0; /* release slot */ + pthread_mutex_unlock(&d->mutex); + continue; + } + } + + if (d->current_state != DISP_OPEN || !d->viewer) { continue; } + + /* Deliver pending frame (no lock held during decode/upload) */ + pthread_mutex_lock(&d->mutex); + uint8_t *fdata = NULL; + uint32_t flen = 0; + if (d->frame_ready) { + fdata = d->frame_data; + flen = d->frame_len; + d->frame_data = NULL; + d->frame_len = 0; + d->frame_ready = 0; + } + pthread_mutex_unlock(&d->mutex); + + if (fdata) { + struct Proto_Video_Frame vf; + if (APP_IS_OK(proto_read_video_frame(fdata, flen, &vf))) { + xorg_viewer_push_mjpeg(d->viewer, vf.data, vf.data_len); + } + free(fdata); + } + + /* Poll GLFW events; if user closes the window, treat as STOP_DISPLAY */ + if (!xorg_viewer_handle_events(d->viewer)) { + xorg_viewer_close(d->viewer); + d->viewer = NULL; + pthread_mutex_lock(&d->mutex); + d->current_state = DISP_CLOSED; + d->wanted_state = DISP_CLOSED; + if (d->frame_data) { free(d->frame_data); d->frame_data = NULL; } + d->frame_ready = 0; + d->allocated = 0; + pthread_mutex_unlock(&d->mutex); + } + } +} + +static int any_display_active(struct Node *node) +{ + for (int i = 0; i < MAX_DISPLAYS; i++) { + if (node->displays[i].allocated) { return 1; } + } + return 0; +} + /* ------------------------------------------------------------------------- * Periodic reconciler tick thread * ------------------------------------------------------------------------- */ @@ -838,6 +985,137 @@ static void handle_stop_ingest(struct Node *node, proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0); } +/* ------------------------------------------------------------------------- + * Display slot helpers + * ------------------------------------------------------------------------- */ + +static void display_slots_init(struct Node *node) +{ + for (int i = 0; i < MAX_DISPLAYS; i++) { + struct Display_Slot *d = &node->displays[i]; + memset(d, 0, sizeof(*d)); + pthread_mutex_init(&d->mutex, NULL); + } +} + +static struct Display_Slot *display_find(struct Node *node, uint16_t stream_id) +{ + for (int i = 0; i < MAX_DISPLAYS; i++) { + struct Display_Slot *d = &node->displays[i]; + pthread_mutex_lock(&d->mutex); + /* Match any allocated slot — including ones being closed, so + * START_DISPLAY can reuse them and STOP_DISPLAY can find them */ + int match = d->allocated && d->stream_id == stream_id; + pthread_mutex_unlock(&d->mutex); + if (match) { return d; } + } + return NULL; +} + +static struct Display_Slot *display_alloc(struct Node *node) +{ + for (int i = 0; i < MAX_DISPLAYS; i++) { + struct Display_Slot *d = &node->displays[i]; + pthread_mutex_lock(&d->mutex); + int is_free = !d->allocated; + if (is_free) { + d->allocated = 1; + pthread_mutex_unlock(&d->mutex); + return d; + } + pthread_mutex_unlock(&d->mutex); + } + return NULL; +} + +/* Map PROTO_DISPLAY_SCALE_* to Xorg_Scale */ +static Xorg_Scale proto_scale_to_xorg(uint8_t s) +{ + switch (s) { + case PROTO_DISPLAY_SCALE_FIT: return XORG_SCALE_FIT; + case PROTO_DISPLAY_SCALE_FILL: return XORG_SCALE_FILL; + case PROTO_DISPLAY_SCALE_1_1: return XORG_SCALE_1_1; + default: return XORG_SCALE_STRETCH; + } +} + +/* Map PROTO_DISPLAY_ANCHOR_* to Xorg_Anchor */ +static Xorg_Anchor proto_anchor_to_xorg(uint8_t a) +{ + switch (a) { + case PROTO_DISPLAY_ANCHOR_TOPLEFT: return XORG_ANCHOR_TOP_LEFT; + default: return XORG_ANCHOR_CENTER; + } +} + +static void handle_start_display(struct Node *node, + struct Transport_Conn *conn, + const uint8_t *payload, uint32_t length) +{ + struct Proto_Start_Display req; + struct App_Error e = proto_read_start_display(payload, length, &req); + if (!APP_IS_OK(e)) { + proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); + return; + } + if (!xorg_available()) { + proto_write_control_response(conn, req.request_id, + PROTO_STATUS_ERROR, NULL, 0); + fprintf(stderr, "START_DISPLAY: xorg not available (headless build)\n"); + return; + } + + /* Reuse existing slot for this stream_id, or allocate a fresh one */ + struct Display_Slot *d = display_find(node, req.stream_id); + if (!d) { + d = display_alloc(node); + if (!d) { + proto_write_control_response(conn, req.request_id, + PROTO_STATUS_ERROR, NULL, 0); + fprintf(stderr, "START_DISPLAY: no free display slots\n"); + return; + } + } + + pthread_mutex_lock(&d->mutex); + d->stream_id = req.stream_id; + d->win_x = (int)req.win_x; + d->win_y = (int)req.win_y; + d->win_w = req.win_w > 0 ? (int)req.win_w : 1280; + d->win_h = req.win_h > 0 ? (int)req.win_h : 720; + d->scale = proto_scale_to_xorg(req.scale); + d->anchor = proto_anchor_to_xorg(req.anchor); + d->wanted_state = DISP_OPEN; /* reconciled by display_loop_tick */ + pthread_mutex_unlock(&d->mutex); + + proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0); +} + +static void handle_stop_display(struct Node *node, + struct Transport_Conn *conn, + const uint8_t *payload, uint32_t length) +{ + struct Proto_Stop_Display req; + struct App_Error e = proto_read_stop_display(payload, length, &req); + if (!APP_IS_OK(e)) { + proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); + return; + } + + struct Display_Slot *d = display_find(node, req.stream_id); + if (!d) { + proto_write_control_response(conn, req.request_id, + PROTO_STATUS_NOT_FOUND, NULL, 0); + return; + } + + pthread_mutex_lock(&d->mutex); + d->wanted_state = DISP_CLOSED; /* reconciled by display_loop_tick */ + pthread_mutex_unlock(&d->mutex); + + proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0); +} + /* ------------------------------------------------------------------------- * Transport callbacks (inbound server) * ------------------------------------------------------------------------- */ @@ -847,6 +1125,31 @@ static void on_frame(struct Transport_Conn *conn, { struct Node *node = userdata; + if (frame->message_type == PROTO_MSG_VIDEO_FRAME) { + /* Sink role: route frame to matching display slot */ + struct Proto_Video_Frame vf; + struct App_Error e = proto_read_video_frame( + frame->payload, frame->payload_length, &vf); + if (APP_IS_OK(e)) { + struct Display_Slot *d = display_find(node, vf.stream_id); + if (d) { + pthread_mutex_lock(&d->mutex); + if (d->frame_ready) { + free(d->frame_data); /* drop stale frame */ + } + /* Hand ownership of the payload to the display slot. + * Set frame->payload = NULL so the free() below is a no-op. */ + d->frame_data = frame->payload; + d->frame_len = frame->payload_length; + d->frame_ready = 1; + frame->payload = NULL; + pthread_mutex_unlock(&d->mutex); + } + } + free(frame->payload); + return; + } + if (frame->message_type == PROTO_MSG_CONTROL_REQUEST) { struct Proto_Request_Header hdr; struct App_Error e = proto_read_request_header( @@ -877,6 +1180,14 @@ static void on_frame(struct Transport_Conn *conn, handle_stop_ingest(node, conn, frame->payload, frame->payload_length); break; + case PROTO_CMD_START_DISPLAY: + handle_start_display(node, conn, + frame->payload, frame->payload_length); + break; + case PROTO_CMD_STOP_DISPLAY: + handle_stop_display(node, conn, + frame->payload, frame->payload_length); + break; case PROTO_CMD_STREAM_OPEN: /* A sender is opening a stream on this node (sink role) */ proto_write_control_response(conn, hdr.request_id, @@ -919,7 +1230,7 @@ static const struct Config_Flag_Def function_flag_defs[] = { static const struct Config_Def schema[] = { { "node", "name", CONFIG_STRING, "unnamed:0", NULL }, { "node", "site_id", CONFIG_UINT16, "0", NULL }, - { "node", "tcp_port", CONFIG_UINT16, "8000", NULL }, + { "node", "tcp_port", CONFIG_UINT16, "0", NULL }, { "node", "function", CONFIG_FLAGS, "source", function_flag_defs }, { "discovery", "interval_ms", CONFIG_UINT32, "5000", NULL }, { "discovery", "timeout_intervals", CONFIG_UINT32, "3", NULL }, @@ -934,27 +1245,43 @@ static const struct Config_Def schema[] = { static void usage(void) { fprintf(stderr, - "usage: video-node \n" - " video-node --defaults\n"); + "usage: video-node [--port PORT] \n" + " video-node [--port PORT] --defaults\n" + "\n" + " --port PORT override tcp_port from config (0 = OS-assigned random port)\n"); } int main(int argc, char **argv) { - if (argc < 2) { usage(); return 1; } + int port_override = -1; /* -1 = not set */ + int argi = 1; + + while (argi < argc && argv[argi][0] == '-') { + if (strcmp(argv[argi], "--port") == 0 && argi + 1 < argc) { + port_override = atoi(argv[++argi]); + argi++; + } else { + break; + } + } + + if (argi >= argc) { usage(); return 1; } struct Node node; memset(&node, 0, sizeof(node)); /* Load config */ struct App_Error e; - if (strcmp(argv[1], "--defaults") == 0) { + if (strcmp(argv[argi], "--defaults") == 0) { e = config_defaults(&node.config, schema); } else { - e = config_load(&node.config, argv[1], schema); + e = config_load(&node.config, argv[argi], schema); } if (!APP_IS_OK(e)) { app_error_print(&e); return 1; } - uint16_t tcp_port = config_get_u16(node.config, "node", "tcp_port"); + uint16_t tcp_port = port_override >= 0 + ? (uint16_t)port_override + : config_get_u16(node.config, "node", "tcp_port"); uint16_t site_id = config_get_u16(node.config, "node", "site_id"); uint32_t func = config_get_flags(node.config, "node", "function"); const char *name = config_get_str(node.config, "node", "name"); @@ -963,8 +1290,6 @@ int main(int argc, char **argv) uint32_t max_conn = config_get_u32(node.config, "transport", "max_connections"); uint32_t max_pay = config_get_u32(node.config, "transport", "max_payload"); - printf("node: %s port=%u site=%u\n", name, tcp_port, site_id); - /* Enumerate devices */ printf("scanning devices...\n"); build_device_list(&node.devices); @@ -985,6 +1310,9 @@ int main(int argc, char **argv) stream_slot_init(&node, i); } + /* Initialise display slots */ + display_slots_init(&node); + /* Transport server */ struct Transport_Server_Config srv_cfg = { .port = tcp_port, @@ -1000,6 +1328,10 @@ int main(int argc, char **argv) e = transport_server_start(node.server); if (!APP_IS_OK(e)) { app_error_print(&e); return 1; } + /* Read back actual port (relevant when tcp_port was 0) */ + tcp_port = transport_server_get_port(node.server); + printf("node: %s port=%u site=%u\n", name, tcp_port, site_id); + /* Discovery */ struct Discovery_Config disc_cfg = { .site_id = site_id, @@ -1021,6 +1353,17 @@ int main(int argc, char **argv) pthread_create(&timer_thread, NULL, reconciler_timer, &node); printf("ready\n"); - pthread_join(timer_thread, NULL); + + /* Main thread: GL event loop (required by GLFW) when xorg is available, + * or a simple idle wait when running headless. */ + if (xorg_available()) { + while (1) { + display_loop_tick(&node); + /* Throttle to ~200 Hz when displays are active, 10 Hz otherwise */ + usleep(any_display_active(&node) ? 5000 : 100000); + } + } else { + pthread_join(timer_thread, NULL); + } return 0; }