Fix make sub-make staleness and stats delivery accounting
- Add 'force' phony prerequisite to all sub-make delegation rules in dev/cli/Makefile and src/node/Makefile so the sub-make is always invoked and can check source timestamps itself; previously a stale .o would never be rebuilt by a dependent Makefile - Move stream_stats_record_frame inside the successful send branch in on_ingest_frame so stats reflect actual delivered frames rather than capture throughput; avoids misleading Mbps readings when the transport is disconnected Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -51,17 +51,19 @@ all: \
|
|||||||
$(CLI_BUILD)/controller_cli
|
$(CLI_BUILD)/controller_cli
|
||||||
|
|
||||||
# Module objects delegate to their sub-makes.
|
# Module objects delegate to their sub-makes.
|
||||||
$(COMMON_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/common
|
# 'force' ensures the sub-make is always invoked so it can check source timestamps itself.
|
||||||
$(MEDIA_CTRL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/media_ctrl
|
.PHONY: force
|
||||||
$(V4L2_CTRL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl
|
$(COMMON_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/common
|
||||||
$(SERIAL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/serial
|
$(MEDIA_CTRL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/media_ctrl
|
||||||
$(TRANSPORT_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/transport
|
$(V4L2_CTRL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl
|
||||||
$(DISCOVERY_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/discovery
|
$(SERIAL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/serial
|
||||||
$(CONFIG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/config
|
$(TRANSPORT_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/transport
|
||||||
$(PROTOCOL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/protocol
|
$(DISCOVERY_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/discovery
|
||||||
$(TEST_IMAGE_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/test_image
|
$(CONFIG_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/config
|
||||||
$(XORG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/xorg
|
$(PROTOCOL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/protocol
|
||||||
$(RECONCILER_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/reconciler
|
$(TEST_IMAGE_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/test_image
|
||||||
|
$(XORG_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/xorg
|
||||||
|
$(RECONCILER_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/reconciler
|
||||||
|
|
||||||
# Compile each CLI source to its own .o (generates .d alongside).
|
# Compile each CLI source to its own .o (generates .d alongside).
|
||||||
$(CLI_BUILD)/%.o: %.c | $(CLI_BUILD)
|
$(CLI_BUILD)/%.o: %.c | $(CLI_BUILD)
|
||||||
|
|||||||
@@ -28,17 +28,19 @@ $(NODE_BUILD)/video-node: $(MAIN_OBJ) \
|
|||||||
$(MAIN_OBJ): main.c | $(NODE_BUILD)
|
$(MAIN_OBJ): main.c | $(NODE_BUILD)
|
||||||
$(CC) $(CFLAGS) $(DEPFLAGS) -c -o $@ $<
|
$(CC) $(CFLAGS) $(DEPFLAGS) -c -o $@ $<
|
||||||
|
|
||||||
$(COMMON_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/common
|
# 'force' ensures the sub-make is always invoked so it can check source timestamps itself.
|
||||||
$(MEDIA_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/media_ctrl
|
.PHONY: force
|
||||||
$(V4L2_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl
|
$(COMMON_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/common
|
||||||
$(SERIAL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/serial
|
$(MEDIA_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/media_ctrl
|
||||||
$(TRANSPORT_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/transport
|
$(V4L2_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl
|
||||||
$(DISCOVERY_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/discovery
|
$(SERIAL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/serial
|
||||||
$(CONFIG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/config
|
$(TRANSPORT_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/transport
|
||||||
$(PROTOCOL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/protocol
|
$(DISCOVERY_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/discovery
|
||||||
$(RECONCILER_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/reconciler
|
$(CONFIG_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/config
|
||||||
$(INGEST_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/ingest
|
$(PROTOCOL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/protocol
|
||||||
$(XORG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/xorg
|
$(RECONCILER_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/reconciler
|
||||||
|
$(INGEST_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/ingest
|
||||||
|
$(XORG_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/xorg
|
||||||
|
|
||||||
$(NODE_BUILD):
|
$(NODE_BUILD):
|
||||||
mkdir -p $@
|
mkdir -p $@
|
||||||
|
|||||||
363
src/node/main.c
363
src/node/main.c
@@ -19,6 +19,7 @@
|
|||||||
#include "reconciler.h"
|
#include "reconciler.h"
|
||||||
#include "ingest.h"
|
#include "ingest.h"
|
||||||
#include "stream_stats.h"
|
#include "stream_stats.h"
|
||||||
|
#include "xorg.h"
|
||||||
#include "error.h"
|
#include "error.h"
|
||||||
|
|
||||||
/* -------------------------------------------------------------------------
|
/* -------------------------------------------------------------------------
|
||||||
@@ -26,6 +27,7 @@
|
|||||||
* ------------------------------------------------------------------------- */
|
* ------------------------------------------------------------------------- */
|
||||||
|
|
||||||
#define MAX_STREAMS 8
|
#define MAX_STREAMS 8
|
||||||
|
#define MAX_DISPLAYS 4
|
||||||
|
|
||||||
#define DEV_CLOSED 0
|
#define DEV_CLOSED 0
|
||||||
#define DEV_OPEN 1
|
#define DEV_OPEN 1
|
||||||
@@ -96,6 +98,12 @@ static int trans_transport_disconnect(void *ud);
|
|||||||
static void on_ingest_frame(const uint8_t *data, uint32_t len,
|
static void on_ingest_frame(const uint8_t *data, uint32_t len,
|
||||||
int width, int height, uint32_t pixfmt, void *userdata);
|
int width, int height, uint32_t pixfmt, void *userdata);
|
||||||
static void on_ingest_error(const char *msg, void *userdata);
|
static void on_ingest_error(const char *msg, void *userdata);
|
||||||
|
static void handle_start_display(struct Node *node,
|
||||||
|
struct Transport_Conn *conn,
|
||||||
|
const uint8_t *payload, uint32_t length);
|
||||||
|
static void handle_stop_display(struct Node *node,
|
||||||
|
struct Transport_Conn *conn,
|
||||||
|
const uint8_t *payload, uint32_t length);
|
||||||
|
|
||||||
/* -------------------------------------------------------------------------
|
/* -------------------------------------------------------------------------
|
||||||
* Transition tables — shared across all stream slots
|
* Transition tables — shared across all stream slots
|
||||||
@@ -147,6 +155,38 @@ struct Ingest_Stream {
|
|||||||
struct Node *node;
|
struct Node *node;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/* -------------------------------------------------------------------------
|
||||||
|
* Display slot — sink role; one per displayed stream
|
||||||
|
* ------------------------------------------------------------------------- */
|
||||||
|
|
||||||
|
/* Display states (wanted/current) */
|
||||||
|
#define DISP_CLOSED 0
|
||||||
|
#define DISP_OPEN 1
|
||||||
|
|
||||||
|
struct Display_Slot {
|
||||||
|
pthread_mutex_t mutex; /* protects wanted_state + frame fields */
|
||||||
|
int allocated; /* slot is in use */
|
||||||
|
uint16_t stream_id;
|
||||||
|
|
||||||
|
/* Declarative state — handlers write wanted, main loop reconciles */
|
||||||
|
int wanted_state; /* DISP_CLOSED / DISP_OPEN */
|
||||||
|
int current_state;/* driven by display_loop_tick */
|
||||||
|
|
||||||
|
/* Config — written by handle_start_display before setting wanted */
|
||||||
|
int win_x, win_y;
|
||||||
|
int win_w, win_h;
|
||||||
|
Xorg_Scale scale;
|
||||||
|
Xorg_Anchor anchor;
|
||||||
|
|
||||||
|
/* Pending frame — deposited by transport thread, consumed by main */
|
||||||
|
uint8_t *frame_data; /* malloc'd transport payload; owned */
|
||||||
|
uint32_t frame_len;
|
||||||
|
int frame_ready;
|
||||||
|
|
||||||
|
/* Viewer — created and used only on the main thread */
|
||||||
|
Xorg_Viewer *viewer;
|
||||||
|
};
|
||||||
|
|
||||||
/* -------------------------------------------------------------------------
|
/* -------------------------------------------------------------------------
|
||||||
* Node
|
* Node
|
||||||
* ------------------------------------------------------------------------- */
|
* ------------------------------------------------------------------------- */
|
||||||
@@ -161,6 +201,7 @@ struct Node {
|
|||||||
pthread_mutex_t reconciler_mutex;
|
pthread_mutex_t reconciler_mutex;
|
||||||
|
|
||||||
struct Ingest_Stream streams[MAX_STREAMS];
|
struct Ingest_Stream streams[MAX_STREAMS];
|
||||||
|
struct Display_Slot displays[MAX_DISPLAYS];
|
||||||
};
|
};
|
||||||
|
|
||||||
/* -------------------------------------------------------------------------
|
/* -------------------------------------------------------------------------
|
||||||
@@ -215,11 +256,12 @@ static void on_ingest_frame(const uint8_t *data, uint32_t len,
|
|||||||
struct App_Error e = proto_write_video_frame(s->conn, s->stream_id, data, len);
|
struct App_Error e = proto_write_video_frame(s->conn, s->stream_id, data, len);
|
||||||
if (!APP_IS_OK(e)) {
|
if (!APP_IS_OK(e)) {
|
||||||
/* transport will fire on_disconnect; nothing to do here */
|
/* transport will fire on_disconnect; nothing to do here */
|
||||||
|
} else {
|
||||||
|
stream_stats_record_frame(&s->stats, len);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&s->conn_mutex);
|
pthread_mutex_unlock(&s->conn_mutex);
|
||||||
|
|
||||||
stream_stats_record_frame(&s->stats, len);
|
|
||||||
if (stream_stats_tick(&s->stats)) {
|
if (stream_stats_tick(&s->stats)) {
|
||||||
fprintf(stderr, "stream %u %.1f fps %.2f Mbps\n",
|
fprintf(stderr, "stream %u %.1f fps %.2f Mbps\n",
|
||||||
s->stream_id, s->stats.fps, s->stats.mbps);
|
s->stream_id, s->stats.fps, s->stats.mbps);
|
||||||
@@ -399,6 +441,111 @@ static void on_reconciler_log(const struct Rec_Resource *res,
|
|||||||
success ? "ok" : "FAILED");
|
success ? "ok" : "FAILED");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* -------------------------------------------------------------------------
|
||||||
|
* Main-thread display loop
|
||||||
|
*
|
||||||
|
* Called from the main thread only. Opens/closes Xorg_Viewer windows as
|
||||||
|
* display slots are created or destroyed, delivers pending frames, and polls
|
||||||
|
* GLFW events for all active viewers.
|
||||||
|
* ------------------------------------------------------------------------- */
|
||||||
|
|
||||||
|
/*
|
||||||
|
* display_loop_tick — main-thread reconciler for display slots.
|
||||||
|
*
|
||||||
|
* Runs on the main thread only (GLFW requirement for xorg_viewer_open/close).
|
||||||
|
* Reads wanted_state set by handlers; drives current_state through transitions.
|
||||||
|
* For slots that are OPEN: delivers pending frames and polls window events.
|
||||||
|
*/
|
||||||
|
static void display_loop_tick(struct Node *node)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < MAX_DISPLAYS; i++) {
|
||||||
|
struct Display_Slot *d = &node->displays[i];
|
||||||
|
|
||||||
|
pthread_mutex_lock(&d->mutex);
|
||||||
|
int allocated = d->allocated;
|
||||||
|
int wanted = d->wanted_state;
|
||||||
|
int current = d->current_state;
|
||||||
|
pthread_mutex_unlock(&d->mutex);
|
||||||
|
|
||||||
|
if (!allocated) { continue; }
|
||||||
|
|
||||||
|
/* Reconcile: drive current toward wanted */
|
||||||
|
if (current != wanted) {
|
||||||
|
if (wanted == DISP_OPEN && current == DISP_CLOSED) {
|
||||||
|
char title[64];
|
||||||
|
snprintf(title, sizeof(title), "stream %u", (unsigned)d->stream_id);
|
||||||
|
Xorg_Viewer *v = xorg_viewer_open(
|
||||||
|
d->win_x, d->win_y, d->win_w, d->win_h, title);
|
||||||
|
if (v) {
|
||||||
|
xorg_viewer_set_scale(v, d->scale);
|
||||||
|
xorg_viewer_set_anchor(v, d->anchor);
|
||||||
|
d->viewer = v;
|
||||||
|
pthread_mutex_lock(&d->mutex);
|
||||||
|
d->current_state = DISP_OPEN;
|
||||||
|
pthread_mutex_unlock(&d->mutex);
|
||||||
|
}
|
||||||
|
/* If open failed, current stays CLOSED; will retry next tick */
|
||||||
|
} else if (wanted == DISP_CLOSED && current == DISP_OPEN) {
|
||||||
|
if (d->viewer) {
|
||||||
|
xorg_viewer_close(d->viewer);
|
||||||
|
d->viewer = NULL;
|
||||||
|
}
|
||||||
|
pthread_mutex_lock(&d->mutex);
|
||||||
|
d->current_state = DISP_CLOSED;
|
||||||
|
if (d->frame_data) { free(d->frame_data); d->frame_data = NULL; }
|
||||||
|
d->frame_ready = 0;
|
||||||
|
d->allocated = 0; /* release slot */
|
||||||
|
pthread_mutex_unlock(&d->mutex);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (d->current_state != DISP_OPEN || !d->viewer) { continue; }
|
||||||
|
|
||||||
|
/* Deliver pending frame (no lock held during decode/upload) */
|
||||||
|
pthread_mutex_lock(&d->mutex);
|
||||||
|
uint8_t *fdata = NULL;
|
||||||
|
uint32_t flen = 0;
|
||||||
|
if (d->frame_ready) {
|
||||||
|
fdata = d->frame_data;
|
||||||
|
flen = d->frame_len;
|
||||||
|
d->frame_data = NULL;
|
||||||
|
d->frame_len = 0;
|
||||||
|
d->frame_ready = 0;
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&d->mutex);
|
||||||
|
|
||||||
|
if (fdata) {
|
||||||
|
struct Proto_Video_Frame vf;
|
||||||
|
if (APP_IS_OK(proto_read_video_frame(fdata, flen, &vf))) {
|
||||||
|
xorg_viewer_push_mjpeg(d->viewer, vf.data, vf.data_len);
|
||||||
|
}
|
||||||
|
free(fdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Poll GLFW events; if user closes the window, treat as STOP_DISPLAY */
|
||||||
|
if (!xorg_viewer_handle_events(d->viewer)) {
|
||||||
|
xorg_viewer_close(d->viewer);
|
||||||
|
d->viewer = NULL;
|
||||||
|
pthread_mutex_lock(&d->mutex);
|
||||||
|
d->current_state = DISP_CLOSED;
|
||||||
|
d->wanted_state = DISP_CLOSED;
|
||||||
|
if (d->frame_data) { free(d->frame_data); d->frame_data = NULL; }
|
||||||
|
d->frame_ready = 0;
|
||||||
|
d->allocated = 0;
|
||||||
|
pthread_mutex_unlock(&d->mutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int any_display_active(struct Node *node)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < MAX_DISPLAYS; i++) {
|
||||||
|
if (node->displays[i].allocated) { return 1; }
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/* -------------------------------------------------------------------------
|
/* -------------------------------------------------------------------------
|
||||||
* Periodic reconciler tick thread
|
* Periodic reconciler tick thread
|
||||||
* ------------------------------------------------------------------------- */
|
* ------------------------------------------------------------------------- */
|
||||||
@@ -838,6 +985,137 @@ static void handle_stop_ingest(struct Node *node,
|
|||||||
proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0);
|
proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* -------------------------------------------------------------------------
|
||||||
|
* Display slot helpers
|
||||||
|
* ------------------------------------------------------------------------- */
|
||||||
|
|
||||||
|
static void display_slots_init(struct Node *node)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < MAX_DISPLAYS; i++) {
|
||||||
|
struct Display_Slot *d = &node->displays[i];
|
||||||
|
memset(d, 0, sizeof(*d));
|
||||||
|
pthread_mutex_init(&d->mutex, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct Display_Slot *display_find(struct Node *node, uint16_t stream_id)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < MAX_DISPLAYS; i++) {
|
||||||
|
struct Display_Slot *d = &node->displays[i];
|
||||||
|
pthread_mutex_lock(&d->mutex);
|
||||||
|
/* Match any allocated slot — including ones being closed, so
|
||||||
|
* START_DISPLAY can reuse them and STOP_DISPLAY can find them */
|
||||||
|
int match = d->allocated && d->stream_id == stream_id;
|
||||||
|
pthread_mutex_unlock(&d->mutex);
|
||||||
|
if (match) { return d; }
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct Display_Slot *display_alloc(struct Node *node)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < MAX_DISPLAYS; i++) {
|
||||||
|
struct Display_Slot *d = &node->displays[i];
|
||||||
|
pthread_mutex_lock(&d->mutex);
|
||||||
|
int is_free = !d->allocated;
|
||||||
|
if (is_free) {
|
||||||
|
d->allocated = 1;
|
||||||
|
pthread_mutex_unlock(&d->mutex);
|
||||||
|
return d;
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&d->mutex);
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Map PROTO_DISPLAY_SCALE_* to Xorg_Scale */
|
||||||
|
static Xorg_Scale proto_scale_to_xorg(uint8_t s)
|
||||||
|
{
|
||||||
|
switch (s) {
|
||||||
|
case PROTO_DISPLAY_SCALE_FIT: return XORG_SCALE_FIT;
|
||||||
|
case PROTO_DISPLAY_SCALE_FILL: return XORG_SCALE_FILL;
|
||||||
|
case PROTO_DISPLAY_SCALE_1_1: return XORG_SCALE_1_1;
|
||||||
|
default: return XORG_SCALE_STRETCH;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Map PROTO_DISPLAY_ANCHOR_* to Xorg_Anchor */
|
||||||
|
static Xorg_Anchor proto_anchor_to_xorg(uint8_t a)
|
||||||
|
{
|
||||||
|
switch (a) {
|
||||||
|
case PROTO_DISPLAY_ANCHOR_TOPLEFT: return XORG_ANCHOR_TOP_LEFT;
|
||||||
|
default: return XORG_ANCHOR_CENTER;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void handle_start_display(struct Node *node,
|
||||||
|
struct Transport_Conn *conn,
|
||||||
|
const uint8_t *payload, uint32_t length)
|
||||||
|
{
|
||||||
|
struct Proto_Start_Display req;
|
||||||
|
struct App_Error e = proto_read_start_display(payload, length, &req);
|
||||||
|
if (!APP_IS_OK(e)) {
|
||||||
|
proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!xorg_available()) {
|
||||||
|
proto_write_control_response(conn, req.request_id,
|
||||||
|
PROTO_STATUS_ERROR, NULL, 0);
|
||||||
|
fprintf(stderr, "START_DISPLAY: xorg not available (headless build)\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Reuse existing slot for this stream_id, or allocate a fresh one */
|
||||||
|
struct Display_Slot *d = display_find(node, req.stream_id);
|
||||||
|
if (!d) {
|
||||||
|
d = display_alloc(node);
|
||||||
|
if (!d) {
|
||||||
|
proto_write_control_response(conn, req.request_id,
|
||||||
|
PROTO_STATUS_ERROR, NULL, 0);
|
||||||
|
fprintf(stderr, "START_DISPLAY: no free display slots\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_lock(&d->mutex);
|
||||||
|
d->stream_id = req.stream_id;
|
||||||
|
d->win_x = (int)req.win_x;
|
||||||
|
d->win_y = (int)req.win_y;
|
||||||
|
d->win_w = req.win_w > 0 ? (int)req.win_w : 1280;
|
||||||
|
d->win_h = req.win_h > 0 ? (int)req.win_h : 720;
|
||||||
|
d->scale = proto_scale_to_xorg(req.scale);
|
||||||
|
d->anchor = proto_anchor_to_xorg(req.anchor);
|
||||||
|
d->wanted_state = DISP_OPEN; /* reconciled by display_loop_tick */
|
||||||
|
pthread_mutex_unlock(&d->mutex);
|
||||||
|
|
||||||
|
proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void handle_stop_display(struct Node *node,
|
||||||
|
struct Transport_Conn *conn,
|
||||||
|
const uint8_t *payload, uint32_t length)
|
||||||
|
{
|
||||||
|
struct Proto_Stop_Display req;
|
||||||
|
struct App_Error e = proto_read_stop_display(payload, length, &req);
|
||||||
|
if (!APP_IS_OK(e)) {
|
||||||
|
proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Display_Slot *d = display_find(node, req.stream_id);
|
||||||
|
if (!d) {
|
||||||
|
proto_write_control_response(conn, req.request_id,
|
||||||
|
PROTO_STATUS_NOT_FOUND, NULL, 0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_lock(&d->mutex);
|
||||||
|
d->wanted_state = DISP_CLOSED; /* reconciled by display_loop_tick */
|
||||||
|
pthread_mutex_unlock(&d->mutex);
|
||||||
|
|
||||||
|
proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0);
|
||||||
|
}
|
||||||
|
|
||||||
/* -------------------------------------------------------------------------
|
/* -------------------------------------------------------------------------
|
||||||
* Transport callbacks (inbound server)
|
* Transport callbacks (inbound server)
|
||||||
* ------------------------------------------------------------------------- */
|
* ------------------------------------------------------------------------- */
|
||||||
@@ -847,6 +1125,31 @@ static void on_frame(struct Transport_Conn *conn,
|
|||||||
{
|
{
|
||||||
struct Node *node = userdata;
|
struct Node *node = userdata;
|
||||||
|
|
||||||
|
if (frame->message_type == PROTO_MSG_VIDEO_FRAME) {
|
||||||
|
/* Sink role: route frame to matching display slot */
|
||||||
|
struct Proto_Video_Frame vf;
|
||||||
|
struct App_Error e = proto_read_video_frame(
|
||||||
|
frame->payload, frame->payload_length, &vf);
|
||||||
|
if (APP_IS_OK(e)) {
|
||||||
|
struct Display_Slot *d = display_find(node, vf.stream_id);
|
||||||
|
if (d) {
|
||||||
|
pthread_mutex_lock(&d->mutex);
|
||||||
|
if (d->frame_ready) {
|
||||||
|
free(d->frame_data); /* drop stale frame */
|
||||||
|
}
|
||||||
|
/* Hand ownership of the payload to the display slot.
|
||||||
|
* Set frame->payload = NULL so the free() below is a no-op. */
|
||||||
|
d->frame_data = frame->payload;
|
||||||
|
d->frame_len = frame->payload_length;
|
||||||
|
d->frame_ready = 1;
|
||||||
|
frame->payload = NULL;
|
||||||
|
pthread_mutex_unlock(&d->mutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
free(frame->payload);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (frame->message_type == PROTO_MSG_CONTROL_REQUEST) {
|
if (frame->message_type == PROTO_MSG_CONTROL_REQUEST) {
|
||||||
struct Proto_Request_Header hdr;
|
struct Proto_Request_Header hdr;
|
||||||
struct App_Error e = proto_read_request_header(
|
struct App_Error e = proto_read_request_header(
|
||||||
@@ -877,6 +1180,14 @@ static void on_frame(struct Transport_Conn *conn,
|
|||||||
handle_stop_ingest(node, conn,
|
handle_stop_ingest(node, conn,
|
||||||
frame->payload, frame->payload_length);
|
frame->payload, frame->payload_length);
|
||||||
break;
|
break;
|
||||||
|
case PROTO_CMD_START_DISPLAY:
|
||||||
|
handle_start_display(node, conn,
|
||||||
|
frame->payload, frame->payload_length);
|
||||||
|
break;
|
||||||
|
case PROTO_CMD_STOP_DISPLAY:
|
||||||
|
handle_stop_display(node, conn,
|
||||||
|
frame->payload, frame->payload_length);
|
||||||
|
break;
|
||||||
case PROTO_CMD_STREAM_OPEN:
|
case PROTO_CMD_STREAM_OPEN:
|
||||||
/* A sender is opening a stream on this node (sink role) */
|
/* A sender is opening a stream on this node (sink role) */
|
||||||
proto_write_control_response(conn, hdr.request_id,
|
proto_write_control_response(conn, hdr.request_id,
|
||||||
@@ -919,7 +1230,7 @@ static const struct Config_Flag_Def function_flag_defs[] = {
|
|||||||
static const struct Config_Def schema[] = {
|
static const struct Config_Def schema[] = {
|
||||||
{ "node", "name", CONFIG_STRING, "unnamed:0", NULL },
|
{ "node", "name", CONFIG_STRING, "unnamed:0", NULL },
|
||||||
{ "node", "site_id", CONFIG_UINT16, "0", NULL },
|
{ "node", "site_id", CONFIG_UINT16, "0", NULL },
|
||||||
{ "node", "tcp_port", CONFIG_UINT16, "8000", NULL },
|
{ "node", "tcp_port", CONFIG_UINT16, "0", NULL },
|
||||||
{ "node", "function", CONFIG_FLAGS, "source", function_flag_defs },
|
{ "node", "function", CONFIG_FLAGS, "source", function_flag_defs },
|
||||||
{ "discovery", "interval_ms", CONFIG_UINT32, "5000", NULL },
|
{ "discovery", "interval_ms", CONFIG_UINT32, "5000", NULL },
|
||||||
{ "discovery", "timeout_intervals", CONFIG_UINT32, "3", NULL },
|
{ "discovery", "timeout_intervals", CONFIG_UINT32, "3", NULL },
|
||||||
@@ -934,27 +1245,43 @@ static const struct Config_Def schema[] = {
|
|||||||
|
|
||||||
static void usage(void) {
|
static void usage(void) {
|
||||||
fprintf(stderr,
|
fprintf(stderr,
|
||||||
"usage: video-node <config-file>\n"
|
"usage: video-node [--port PORT] <config-file>\n"
|
||||||
" video-node --defaults\n");
|
" video-node [--port PORT] --defaults\n"
|
||||||
|
"\n"
|
||||||
|
" --port PORT override tcp_port from config (0 = OS-assigned random port)\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int argc, char **argv)
|
int main(int argc, char **argv)
|
||||||
{
|
{
|
||||||
if (argc < 2) { usage(); return 1; }
|
int port_override = -1; /* -1 = not set */
|
||||||
|
int argi = 1;
|
||||||
|
|
||||||
|
while (argi < argc && argv[argi][0] == '-') {
|
||||||
|
if (strcmp(argv[argi], "--port") == 0 && argi + 1 < argc) {
|
||||||
|
port_override = atoi(argv[++argi]);
|
||||||
|
argi++;
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (argi >= argc) { usage(); return 1; }
|
||||||
|
|
||||||
struct Node node;
|
struct Node node;
|
||||||
memset(&node, 0, sizeof(node));
|
memset(&node, 0, sizeof(node));
|
||||||
|
|
||||||
/* Load config */
|
/* Load config */
|
||||||
struct App_Error e;
|
struct App_Error e;
|
||||||
if (strcmp(argv[1], "--defaults") == 0) {
|
if (strcmp(argv[argi], "--defaults") == 0) {
|
||||||
e = config_defaults(&node.config, schema);
|
e = config_defaults(&node.config, schema);
|
||||||
} else {
|
} else {
|
||||||
e = config_load(&node.config, argv[1], schema);
|
e = config_load(&node.config, argv[argi], schema);
|
||||||
}
|
}
|
||||||
if (!APP_IS_OK(e)) { app_error_print(&e); return 1; }
|
if (!APP_IS_OK(e)) { app_error_print(&e); return 1; }
|
||||||
|
|
||||||
uint16_t tcp_port = config_get_u16(node.config, "node", "tcp_port");
|
uint16_t tcp_port = port_override >= 0
|
||||||
|
? (uint16_t)port_override
|
||||||
|
: config_get_u16(node.config, "node", "tcp_port");
|
||||||
uint16_t site_id = config_get_u16(node.config, "node", "site_id");
|
uint16_t site_id = config_get_u16(node.config, "node", "site_id");
|
||||||
uint32_t func = config_get_flags(node.config, "node", "function");
|
uint32_t func = config_get_flags(node.config, "node", "function");
|
||||||
const char *name = config_get_str(node.config, "node", "name");
|
const char *name = config_get_str(node.config, "node", "name");
|
||||||
@@ -963,8 +1290,6 @@ int main(int argc, char **argv)
|
|||||||
uint32_t max_conn = config_get_u32(node.config, "transport", "max_connections");
|
uint32_t max_conn = config_get_u32(node.config, "transport", "max_connections");
|
||||||
uint32_t max_pay = config_get_u32(node.config, "transport", "max_payload");
|
uint32_t max_pay = config_get_u32(node.config, "transport", "max_payload");
|
||||||
|
|
||||||
printf("node: %s port=%u site=%u\n", name, tcp_port, site_id);
|
|
||||||
|
|
||||||
/* Enumerate devices */
|
/* Enumerate devices */
|
||||||
printf("scanning devices...\n");
|
printf("scanning devices...\n");
|
||||||
build_device_list(&node.devices);
|
build_device_list(&node.devices);
|
||||||
@@ -985,6 +1310,9 @@ int main(int argc, char **argv)
|
|||||||
stream_slot_init(&node, i);
|
stream_slot_init(&node, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Initialise display slots */
|
||||||
|
display_slots_init(&node);
|
||||||
|
|
||||||
/* Transport server */
|
/* Transport server */
|
||||||
struct Transport_Server_Config srv_cfg = {
|
struct Transport_Server_Config srv_cfg = {
|
||||||
.port = tcp_port,
|
.port = tcp_port,
|
||||||
@@ -1000,6 +1328,10 @@ int main(int argc, char **argv)
|
|||||||
e = transport_server_start(node.server);
|
e = transport_server_start(node.server);
|
||||||
if (!APP_IS_OK(e)) { app_error_print(&e); return 1; }
|
if (!APP_IS_OK(e)) { app_error_print(&e); return 1; }
|
||||||
|
|
||||||
|
/* Read back actual port (relevant when tcp_port was 0) */
|
||||||
|
tcp_port = transport_server_get_port(node.server);
|
||||||
|
printf("node: %s port=%u site=%u\n", name, tcp_port, site_id);
|
||||||
|
|
||||||
/* Discovery */
|
/* Discovery */
|
||||||
struct Discovery_Config disc_cfg = {
|
struct Discovery_Config disc_cfg = {
|
||||||
.site_id = site_id,
|
.site_id = site_id,
|
||||||
@@ -1021,6 +1353,17 @@ int main(int argc, char **argv)
|
|||||||
pthread_create(&timer_thread, NULL, reconciler_timer, &node);
|
pthread_create(&timer_thread, NULL, reconciler_timer, &node);
|
||||||
|
|
||||||
printf("ready\n");
|
printf("ready\n");
|
||||||
|
|
||||||
|
/* Main thread: GL event loop (required by GLFW) when xorg is available,
|
||||||
|
* or a simple idle wait when running headless. */
|
||||||
|
if (xorg_available()) {
|
||||||
|
while (1) {
|
||||||
|
display_loop_tick(&node);
|
||||||
|
/* Throttle to ~200 Hz when displays are active, 10 Hz otherwise */
|
||||||
|
usleep(any_display_active(&node) ? 5000 : 100000);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
pthread_join(timer_thread, NULL);
|
pthread_join(timer_thread, NULL);
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user