From 6747c9e00d303c6793506baf184639e7c5cc8b42 Mon Sep 17 00:00:00 2001 From: mikael-lovqvists-claude-agent Date: Sun, 29 Mar 2026 02:17:16 +0000 Subject: [PATCH] Wire reconciler and ingest into video node MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each ingest stream gets two reconciler resources (device, transport) with dependencies: transport waits for device OPEN (needs format for STREAM_OPEN), device waits for transport CONNECTED before starting capture. START_INGEST sets wanted state and triggers a tick; the reconciler drives device CLOSED→OPEN→STREAMING and transport DISCONNECTED→CONNECTED over subsequent ticks. STOP_INGEST reverses both. External events (transport drop, ingest thread error) use reconciler_force_current to push state backward; the periodic 500ms timer thread re-drives toward wanted state automatically. All 8 stream slots are pre-allocated at startup. on_ingest_frame sends VIDEO_FRAME messages over the outbound transport connection, protected by a per-stream conn_mutex. Co-Authored-By: Claude Sonnet 4.6 --- include/reconciler.h | 9 + src/modules/reconciler/reconciler.c | 4 + src/node/Makefile | 14 +- src/node/main.c | 733 +++++++++++++++++++++------- 4 files changed, 582 insertions(+), 178 deletions(-) diff --git a/include/reconciler.h b/include/reconciler.h index e55f10b..cf09654 100644 --- a/include/reconciler.h +++ b/include/reconciler.h @@ -85,6 +85,15 @@ void reconciler_add_dep( int dep_min_state); void reconciler_set_wanted(struct Rec_Resource *r, int wanted_state); + +/* + * Force current state without executing a transition. + * Use when an external event pushes a resource into a new state — + * e.g. a transport connection drops unexpectedly, or a device error + * causes the capture thread to exit. The reconciler will drive back + * toward wanted state on the next tick. + */ +void reconciler_force_current(struct Rec_Resource *r, int state); int reconciler_get_current(const struct Rec_Resource *r); int reconciler_get_wanted(const struct Rec_Resource *r); const char *reconciler_get_name(const struct Rec_Resource *r); diff --git a/src/modules/reconciler/reconciler.c b/src/modules/reconciler/reconciler.c index 89f3a7a..324b728 100644 --- a/src/modules/reconciler/reconciler.c +++ b/src/modules/reconciler/reconciler.c @@ -91,6 +91,10 @@ void reconciler_set_wanted(struct Rec_Resource *r, int wanted_state) { r->wanted_state = wanted_state; } +void reconciler_force_current(struct Rec_Resource *r, int state) { + r->current_state = state; +} + int reconciler_get_current(const struct Rec_Resource *r) { return r->current_state; } diff --git a/src/node/Makefile b/src/node/Makefile index edea59a..b241f29 100644 --- a/src/node/Makefile +++ b/src/node/Makefile @@ -10,8 +10,10 @@ SERIAL_OBJ = $(BUILD)/serial/serial.o TRANSPORT_OBJ = $(BUILD)/transport/transport.o DISCOVERY_OBJ = $(BUILD)/discovery/discovery.o CONFIG_OBJ = $(BUILD)/config/config.o -PROTOCOL_OBJ = $(BUILD)/protocol/protocol.o -XORG_OBJ = $(BUILD)/xorg/xorg.o +PROTOCOL_OBJ = $(BUILD)/protocol/protocol.o +RECONCILER_OBJ = $(BUILD)/reconciler/reconciler.o +INGEST_OBJ = $(BUILD)/ingest/ingest.o +XORG_OBJ = $(BUILD)/xorg/xorg.o .PHONY: all clean @@ -20,7 +22,7 @@ all: $(NODE_BUILD)/video-node $(NODE_BUILD)/video-node: $(MAIN_OBJ) \ $(COMMON_OBJ) $(MEDIA_OBJ) $(V4L2_OBJ) $(SERIAL_OBJ) \ $(TRANSPORT_OBJ) $(DISCOVERY_OBJ) $(CONFIG_OBJ) $(PROTOCOL_OBJ) \ - $(XORG_OBJ) + $(RECONCILER_OBJ) $(INGEST_OBJ) $(XORG_OBJ) $(CC) $(CFLAGS) -o $@ $^ -lpthread $(PKG_LDFLAGS) $(MAIN_OBJ): main.c | $(NODE_BUILD) @@ -33,8 +35,10 @@ $(SERIAL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/serial $(TRANSPORT_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/transport $(DISCOVERY_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/discovery $(CONFIG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/config -$(PROTOCOL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/protocol -$(XORG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/xorg +$(PROTOCOL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/protocol +$(RECONCILER_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/reconciler +$(INGEST_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/ingest +$(XORG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/xorg $(NODE_BUILD): mkdir -p $@ diff --git a/src/node/main.c b/src/node/main.c index 83a40dd..3813edc 100644 --- a/src/node/main.c +++ b/src/node/main.c @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -15,30 +16,42 @@ #include "protocol.h" #include "media_ctrl.h" #include "v4l2_ctrl.h" +#include "reconciler.h" +#include "ingest.h" +#include "stream_stats.h" #include "error.h" /* ------------------------------------------------------------------------- - * Device enumeration + * Stream slot constants + * ------------------------------------------------------------------------- */ + +#define MAX_STREAMS 8 + +#define DEV_CLOSED 0 +#define DEV_OPEN 1 +#define DEV_STREAMING 2 + +#define CONN_DISCONNECTED 0 +#define CONN_CONNECTED 1 + +/* ------------------------------------------------------------------------- + * Device enumeration (unchanged from original) * ------------------------------------------------------------------------- */ -/* Entity type flag for a V4L2 I/O interface entity */ #define MEDIA_ENT_F_IO_V4L 0x10001u - -#define MAX_VIDEO_NODES 32 -#define MAX_MEDIA_DEVICES 8 -#define MAX_VNODES_PER_MD 8 -#define MAX_CONTROLS 256 - -/* 5 ("/dev/") + NAME_MAX (255) + 1 (NUL) = 261; round up */ -#define DEV_PATH_MAX 264 +#define MAX_VIDEO_NODES 32 +#define MAX_MEDIA_DEVICES 8 +#define MAX_VNODES_PER_MD 8 +#define MAX_CONTROLS 256 +#define DEV_PATH_MAX 264 struct VNode { char path[DEV_PATH_MAX]; uint32_t dev_major; uint32_t dev_minor; - uint32_t device_caps; /* V4L2_CAP_* from VIDIOC_QUERYCAP cap.device_caps */ + uint32_t device_caps; int claimed; - char card[32]; /* VIDIOC_QUERYCAP card name, empty if unavailable */ + char card[32]; }; struct MNode { @@ -48,7 +61,7 @@ struct MNode { uint32_t entity_flags; uint32_t device_caps; uint8_t is_capture; - int vnode_index; /* index into VNode array */ + int vnode_index; }; struct MediaDev { @@ -67,7 +80,347 @@ struct Device_List { int media_count; }; -static int scan_video_nodes(struct Device_List *dl) { +/* ------------------------------------------------------------------------- + * Forward declarations + * ------------------------------------------------------------------------- */ + +struct Node; +struct Ingest_Stream; + +static int trans_device_open(void *ud); +static int trans_device_close(void *ud); +static int trans_device_start(void *ud); +static int trans_device_stop(void *ud); +static int trans_transport_connect(void *ud); +static int trans_transport_disconnect(void *ud); +static void on_ingest_frame(const uint8_t *data, uint32_t len, + int width, int height, uint32_t pixfmt, void *userdata); +static void on_ingest_error(const char *msg, void *userdata); + +/* ------------------------------------------------------------------------- + * Transition tables — shared across all stream slots + * ------------------------------------------------------------------------- */ + +static const struct Rec_Transition device_transitions[] = { + { DEV_CLOSED, DEV_OPEN, trans_device_open }, + { DEV_OPEN, DEV_CLOSED, trans_device_close }, + { DEV_OPEN, DEV_STREAMING, trans_device_start }, + { DEV_STREAMING, DEV_OPEN, trans_device_stop }, + { -1, -1, NULL } +}; +static const char *device_state_names[] = { "CLOSED", "OPEN", "STREAMING" }; + +static const struct Rec_Transition transport_transitions[] = { + { CONN_DISCONNECTED, CONN_CONNECTED, trans_transport_connect }, + { CONN_CONNECTED, CONN_DISCONNECTED, trans_transport_disconnect }, + { -1, -1, NULL } +}; +static const char *transport_state_names[] = { "DISCONNECTED", "CONNECTED" }; + +/* ------------------------------------------------------------------------- + * Per-stream state + * ------------------------------------------------------------------------- */ + +struct Ingest_Stream { + int in_use; + uint16_t stream_id; + + /* Wanted config — written by START_INGEST handler */ + uint16_t format; + uint16_t width, height; + uint16_t fps_n, fps_d; + uint16_t transport_mode; + char device_path[256]; + char dest_host[256]; + uint16_t dest_port; + + /* Reconciler resources */ + struct Rec_Resource *res_device; + struct Rec_Resource *res_transport; + + /* Runtime */ + struct Ingest_Handle *ingest_handle; + pthread_mutex_t conn_mutex; + struct Transport_Conn *conn; + Stream_Stats stats; + + struct Node *node; +}; + +/* ------------------------------------------------------------------------- + * Node + * ------------------------------------------------------------------------- */ + +struct Node { + struct Config *config; + struct Transport_Server *server; + struct Discovery *discovery; + struct Device_List devices; + + struct Reconciler *reconciler; + pthread_mutex_t reconciler_mutex; + + struct Ingest_Stream streams[MAX_STREAMS]; +}; + +/* ------------------------------------------------------------------------- + * Transition implementations + * ------------------------------------------------------------------------- */ + +static int trans_device_open(void *ud) +{ + struct Ingest_Stream *s = ud; + + /* Map PROTO_FORMAT_* to V4L2 pixfmt; 0 = auto-select best MJPEG */ + uint32_t pixfmt = 0; + switch (s->format) { + case PROTO_FORMAT_MJPEG: pixfmt = V4L2_PIX_FMT_MJPEG; break; + default: pixfmt = 0; break; + } + + struct Ingest_Config cfg = { + .device = s->device_path, + .pixfmt = pixfmt, + .width = s->width, + .height = s->height, + .on_frame = on_ingest_frame, + .on_error = on_ingest_error, + .userdata = s, + }; + + struct App_Error e = ingest_open(&cfg, &s->ingest_handle); + if (!APP_IS_OK(e)) { + app_error_print(&e); + return 0; + } + return 1; +} + +static int trans_device_close(void *ud) +{ + struct Ingest_Stream *s = ud; + ingest_close(s->ingest_handle); + s->ingest_handle = NULL; + return 1; +} + +static void on_ingest_frame(const uint8_t *data, uint32_t len, + int width, int height, uint32_t pixfmt, void *userdata) +{ + struct Ingest_Stream *s = userdata; + (void)width; (void)height; (void)pixfmt; + + pthread_mutex_lock(&s->conn_mutex); + if (s->conn) { + struct App_Error e = proto_write_video_frame(s->conn, s->stream_id, data, len); + if (!APP_IS_OK(e)) { + /* transport will fire on_disconnect; nothing to do here */ + } + } + pthread_mutex_unlock(&s->conn_mutex); + + stream_stats_record_frame(&s->stats, len); + if (stream_stats_tick(&s->stats)) { + fprintf(stderr, "stream %u %.1f fps %.2f Mbps\n", + s->stream_id, s->stats.fps, s->stats.mbps); + } +} + +static void on_ingest_error(const char *msg, void *userdata) +{ + struct Ingest_Stream *s = userdata; + fprintf(stderr, "ingest error [stream %u]: %s\n", s->stream_id, msg); + + /* Capture thread has exited; push device back to OPEN so the + * reconciler will restart it on the next tick. */ + pthread_mutex_lock(&s->node->reconciler_mutex); + reconciler_force_current(s->res_device, DEV_OPEN); + pthread_mutex_unlock(&s->node->reconciler_mutex); +} + +static int trans_device_start(void *ud) +{ + struct Ingest_Stream *s = ud; + struct App_Error e = ingest_start(s->ingest_handle); + if (!APP_IS_OK(e)) { + app_error_print(&e); + return 0; + } + return 1; +} + +static int trans_device_stop(void *ud) +{ + struct Ingest_Stream *s = ud; + struct App_Error e = ingest_stop(s->ingest_handle); + if (!APP_IS_OK(e)) { + app_error_print(&e); + return 0; + } + return 1; +} + +static void outbound_on_frame(struct Transport_Conn *conn, + struct Transport_Frame *frame, void *userdata) +{ + (void)conn; (void)userdata; + /* Discard control responses from the sink */ + free(frame->payload); +} + +static void outbound_on_disconnect(struct Transport_Conn *conn, + void *userdata) +{ + struct Ingest_Stream *s = userdata; + (void)conn; + + fprintf(stderr, "stream %u: transport disconnected\n", s->stream_id); + + pthread_mutex_lock(&s->conn_mutex); + s->conn = NULL; + pthread_mutex_unlock(&s->conn_mutex); + + /* Force transport resource back so the reconciler re-drives it */ + pthread_mutex_lock(&s->node->reconciler_mutex); + reconciler_force_current(s->res_transport, CONN_DISCONNECTED); + reconciler_tick(s->node->reconciler); + pthread_mutex_unlock(&s->node->reconciler_mutex); +} + +static int trans_transport_connect(void *ud) +{ + struct Ingest_Stream *s = ud; + + struct Transport_Conn *conn = NULL; + struct App_Error e = transport_connect(&conn, + s->dest_host, s->dest_port, + TRANSPORT_DEFAULT_MAX_PAYLOAD, + outbound_on_frame, outbound_on_disconnect, s); + if (!APP_IS_OK(e)) { + app_error_print(&e); + return 0; + } + + /* Send STREAM_OPEN so the sink knows what's coming */ + uint16_t proto_fmt = s->format ? s->format : PROTO_FORMAT_MJPEG; + e = proto_write_stream_open(conn, 1 /* request_id */, s->stream_id, + proto_fmt, 0 /* pixel_format: compressed */, PROTO_ORIGIN_DEVICE_NATIVE); + if (!APP_IS_OK(e)) { + app_error_print(&e); + transport_conn_close(conn); + return 0; + } + + pthread_mutex_lock(&s->conn_mutex); + s->conn = conn; + pthread_mutex_unlock(&s->conn_mutex); + + stream_stats_init(&s->stats, s->stream_id); + return 1; +} + +static int trans_transport_disconnect(void *ud) +{ + struct Ingest_Stream *s = ud; + + pthread_mutex_lock(&s->conn_mutex); + struct Transport_Conn *conn = s->conn; + s->conn = NULL; + pthread_mutex_unlock(&s->conn_mutex); + + if (conn) { + transport_conn_close(conn); + } + return 1; +} + +/* ------------------------------------------------------------------------- + * Stream slot management + * ------------------------------------------------------------------------- */ + +static void stream_slot_init(struct Node *node, int idx) +{ + struct Ingest_Stream *s = &node->streams[idx]; + memset(s, 0, sizeof(*s)); + pthread_mutex_init(&s->conn_mutex, NULL); + s->node = node; + + char dev_name[32], conn_name[32]; + snprintf(dev_name, sizeof(dev_name), "device_%d", idx); + snprintf(conn_name, sizeof(conn_name), "transport_%d", idx); + + s->res_device = reconciler_add_resource(node->reconciler, + dev_name, device_transitions, 3, device_state_names, + DEV_CLOSED, s); + s->res_transport = reconciler_add_resource(node->reconciler, + conn_name, transport_transitions, 2, transport_state_names, + CONN_DISCONNECTED, s); + + /* Transport needs device open (to know the format for STREAM_OPEN) */ + reconciler_add_dep(s->res_transport, CONN_CONNECTED, s->res_device, DEV_OPEN); + + /* Don't start capture until transport is connected */ + reconciler_add_dep(s->res_device, DEV_STREAMING, s->res_transport, CONN_CONNECTED); +} + +/* Find slot by stream_id, or find a free slot if stream_id is 0 */ +static struct Ingest_Stream *stream_find(struct Node *node, uint16_t stream_id) +{ + for (int i = 0; i < MAX_STREAMS; i++) { + if (node->streams[i].in_use && node->streams[i].stream_id == stream_id) { + return &node->streams[i]; + } + } + return NULL; +} + +static struct Ingest_Stream *stream_alloc(struct Node *node) +{ + for (int i = 0; i < MAX_STREAMS; i++) { + if (!node->streams[i].in_use) { + return &node->streams[i]; + } + } + return NULL; +} + +/* ------------------------------------------------------------------------- + * Reconciler log + * ------------------------------------------------------------------------- */ + +static void on_reconciler_log(const struct Rec_Resource *res, + int from, int to, int success, void *userdata) +{ + (void)userdata; + fprintf(stderr, "reconciler: [%s] %s -> %s ... %s\n", + reconciler_get_name(res), + reconciler_state_name(res, from), + reconciler_state_name(res, to), + success ? "ok" : "FAILED"); +} + +/* ------------------------------------------------------------------------- + * Periodic reconciler tick thread + * ------------------------------------------------------------------------- */ + +static void *reconciler_timer(void *arg) +{ + struct Node *node = arg; + while (1) { + usleep(500000); /* 500 ms */ + pthread_mutex_lock(&node->reconciler_mutex); + reconciler_tick(node->reconciler); + pthread_mutex_unlock(&node->reconciler_mutex); + } + return NULL; +} + +/* ------------------------------------------------------------------------- + * Device enumeration (unchanged) + * ------------------------------------------------------------------------- */ + +static int scan_video_nodes(struct Device_List *dl) +{ DIR *d = opendir("/dev"); if (!d) { return -1; } @@ -91,32 +444,27 @@ static int scan_video_nodes(struct Device_List *dl) { v->claimed = 0; v->card[0] = '\0'; - /* Try to get card name */ int fd = open(v->path, O_RDONLY | O_NONBLOCK); if (fd >= 0) { struct v4l2_capability cap; if (ioctl(fd, VIDIOC_QUERYCAP, &cap) == 0) { strncpy(v->card, (const char *)cap.card, sizeof(v->card) - 1); v->card[sizeof(v->card) - 1] = '\0'; - /* use per-node caps when available, fall back to physical caps */ v->device_caps = (cap.capabilities & V4L2_CAP_DEVICE_CAPS) ? cap.device_caps : cap.capabilities; } close(fd); } - dl->vnode_count++; } closedir(d); return 0; } -struct Entity_Cb_State { - struct Device_List *dl; - struct MediaDev *md; -}; +struct Entity_Cb_State { struct Device_List *dl; struct MediaDev *md; }; -static void entity_callback(const struct Media_Entity *entity, void *userdata) { +static void entity_callback(const struct Media_Entity *entity, void *userdata) +{ struct Entity_Cb_State *state = userdata; struct Device_List *dl = state->dl; struct MediaDev *md = state->md; @@ -124,11 +472,9 @@ static void entity_callback(const struct Media_Entity *entity, void *userdata) { if (entity->dev_major == 0) { return; } if (md->vnode_count >= MAX_VNODES_PER_MD) { return; } - /* Find matching video node by device number */ for (int i = 0; i < dl->vnode_count; i++) { if (dl->vnodes[i].dev_major != entity->dev_major) { continue; } if (dl->vnodes[i].dev_minor != entity->dev_minor) { continue; } - dl->vnodes[i].claimed = 1; struct MNode *mn = &md->vnodes[md->vnode_count++]; @@ -145,7 +491,8 @@ static void entity_callback(const struct Media_Entity *entity, void *userdata) { } } -static void scan_media_devices(struct Device_List *dl) { +static void scan_media_devices(struct Device_List *dl) +{ DIR *d = opendir("/dev"); if (!d) { return; } @@ -179,23 +526,23 @@ static void scan_media_devices(struct Device_List *dl) { struct Entity_Cb_State state = { .dl = dl, .md = md }; media_ctrl_enum_entities(ctrl, entity_callback, &state); media_ctrl_close(ctrl); - dl->media_count++; } closedir(d); } -static void build_device_list(struct Device_List *dl) { +static void build_device_list(struct Device_List *dl) +{ memset(dl, 0, sizeof(*dl)); scan_video_nodes(dl); scan_media_devices(dl); } /* ------------------------------------------------------------------------- - * Control enumeration helpers + * Control enumeration helpers (unchanged) * ------------------------------------------------------------------------- */ -#define MAX_MENU_POOL 128 /* total menu items across all controls */ +#define MAX_MENU_POOL 128 struct Ctrl_Build { struct Proto_Control_Info items[MAX_CONTROLS]; @@ -250,15 +597,23 @@ static void ctrl_enum_cb( } /* ------------------------------------------------------------------------- - * Node state + * Device path resolver (shared by enum/get/set control handlers) * ------------------------------------------------------------------------- */ -struct Node { - struct Config *config; - struct Transport_Server *server; - struct Discovery *discovery; - struct Device_List devices; -}; +static const char *resolve_device_path(struct Node *node, int idx) +{ + for (int i = 0; i < node->devices.media_count; i++) { + struct MediaDev *md = &node->devices.media[i]; + if (idx < md->vnode_count) { return md->vnodes[idx].path; } + idx -= md->vnode_count; + } + for (int i = 0; i < node->devices.vnode_count; i++) { + if (node->devices.vnodes[i].claimed) { continue; } + if (idx == 0) { return node->devices.vnodes[i].path; } + idx--; + } + return NULL; +} /* ------------------------------------------------------------------------- * Request handlers @@ -267,7 +622,6 @@ struct Node { static void handle_enum_devices(struct Node *node, struct Transport_Conn *conn, uint16_t request_id) { - /* Build Proto_Media_Device_Info array */ struct Proto_Video_Node_Info vnodes[MAX_MEDIA_DEVICES * MAX_VNODES_PER_MD]; struct Proto_Media_Device_Info mdevs[MAX_MEDIA_DEVICES]; int vnode_offset = 0; @@ -294,7 +648,6 @@ static void handle_enum_devices(struct Node *node, vnode_offset += md->vnode_count; } - /* Build standalone list */ struct Proto_Standalone_Device_Info standalone[MAX_VIDEO_NODES]; int standalone_count = 0; for (int i = 0; i < node->devices.vnode_count; i++) { @@ -318,52 +671,25 @@ static void handle_enum_controls(struct Node *node, struct Proto_Enum_Controls_Req req; struct App_Error e = proto_read_enum_controls_req(payload, length, &req); if (!APP_IS_OK(e)) { - proto_write_control_response(conn, 0, - PROTO_STATUS_INVALID_PARAMS, NULL, 0); + proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); return; } - - /* Resolve device_index to a path across media-owned + standalone nodes */ - const char *path = NULL; - int idx = (int)req.device_index; - for (int i = 0; i < node->devices.media_count && path == NULL; i++) { - struct MediaDev *md = &node->devices.media[i]; - if (idx < md->vnode_count) { - path = md->vnodes[idx].path; - } else { - idx -= md->vnode_count; - } - } - if (path == NULL) { - /* Check standalone */ - for (int i = 0; i < node->devices.vnode_count; i++) { - if (node->devices.vnodes[i].claimed) { continue; } - if (idx == 0) { path = node->devices.vnodes[i].path; break; } - idx--; - } - } - - if (path == NULL) { - proto_write_control_response(conn, req.request_id, - PROTO_STATUS_NOT_FOUND, NULL, 0); + const char *path = resolve_device_path(node, (int)req.device_index); + if (!path) { + proto_write_control_response(conn, req.request_id, PROTO_STATUS_NOT_FOUND, NULL, 0); return; } - struct V4l2_Ctrl_Handle *handle; e = v4l2_ctrl_open(path, &handle); if (!APP_IS_OK(e)) { - proto_write_control_response(conn, req.request_id, - PROTO_STATUS_ERROR, NULL, 0); + proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0); return; } - struct Ctrl_Build build = { .count = 0 }; v4l2_ctrl_enumerate(handle, ctrl_enum_cb, &build); v4l2_ctrl_close(handle); - e = proto_write_enum_controls_response(conn, - req.request_id, PROTO_STATUS_OK, - build.items, (uint16_t)build.count); + req.request_id, PROTO_STATUS_OK, build.items, (uint16_t)build.count); if (!APP_IS_OK(e)) { app_error_print(&e); } } @@ -374,56 +700,28 @@ static void handle_get_control(struct Node *node, struct Proto_Get_Control_Req req; struct App_Error e = proto_read_get_control_req(payload, length, &req); if (!APP_IS_OK(e)) { - proto_write_control_response(conn, 0, - PROTO_STATUS_INVALID_PARAMS, NULL, 0); + proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); return; } - - /* Same device resolution as enum_controls */ - const char *path = NULL; - int idx = (int)req.device_index; - for (int i = 0; i < node->devices.media_count && path == NULL; i++) { - struct MediaDev *md = &node->devices.media[i]; - if (idx < md->vnode_count) { - path = md->vnodes[idx].path; - } else { - idx -= md->vnode_count; - } - } - if (path == NULL) { - for (int i = 0; i < node->devices.vnode_count; i++) { - if (node->devices.vnodes[i].claimed) { continue; } - if (idx == 0) { path = node->devices.vnodes[i].path; break; } - idx--; - } - } - - if (path == NULL) { - proto_write_control_response(conn, req.request_id, - PROTO_STATUS_NOT_FOUND, NULL, 0); + const char *path = resolve_device_path(node, (int)req.device_index); + if (!path) { + proto_write_control_response(conn, req.request_id, PROTO_STATUS_NOT_FOUND, NULL, 0); return; } - struct V4l2_Ctrl_Handle *handle; e = v4l2_ctrl_open(path, &handle); if (!APP_IS_OK(e)) { - proto_write_control_response(conn, req.request_id, - PROTO_STATUS_ERROR, NULL, 0); + proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0); return; } - int32_t value; e = v4l2_ctrl_get(handle, req.control_id, &value); v4l2_ctrl_close(handle); - if (!APP_IS_OK(e)) { - proto_write_control_response(conn, req.request_id, - PROTO_STATUS_ERROR, NULL, 0); + proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0); return; } - - e = proto_write_get_control_response(conn, - req.request_id, PROTO_STATUS_OK, value); + e = proto_write_get_control_response(conn, req.request_id, PROTO_STATUS_OK, value); if (!APP_IS_OK(e)) { app_error_print(&e); } } @@ -434,52 +732,114 @@ static void handle_set_control(struct Node *node, struct Proto_Set_Control_Req req; struct App_Error e = proto_read_set_control_req(payload, length, &req); if (!APP_IS_OK(e)) { - proto_write_control_response(conn, 0, - PROTO_STATUS_INVALID_PARAMS, NULL, 0); + proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); + return; + } + const char *path = resolve_device_path(node, (int)req.device_index); + if (!path) { + proto_write_control_response(conn, req.request_id, PROTO_STATUS_NOT_FOUND, NULL, 0); + return; + } + struct V4l2_Ctrl_Handle *handle; + e = v4l2_ctrl_open(path, &handle); + if (!APP_IS_OK(e)) { + proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0); + return; + } + e = v4l2_ctrl_set(handle, req.control_id, req.value); + v4l2_ctrl_close(handle); + uint16_t status = APP_IS_OK(e) ? PROTO_STATUS_OK : PROTO_STATUS_ERROR; + proto_write_control_response(conn, req.request_id, status, NULL, 0); +} + +static void handle_start_ingest(struct Node *node, + struct Transport_Conn *conn, + const uint8_t *payload, uint32_t length) +{ + struct Proto_Start_Ingest req; + struct App_Error e = proto_read_start_ingest(payload, length, &req); + if (!APP_IS_OK(e)) { + proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); return; } - const char *path = NULL; - int idx = (int)req.device_index; - for (int i = 0; i < node->devices.media_count && path == NULL; i++) { - struct MediaDev *md = &node->devices.media[i]; - if (idx < md->vnode_count) { - path = md->vnodes[idx].path; - } else { - idx -= md->vnode_count; - } - } - if (path == NULL) { - for (int i = 0; i < node->devices.vnode_count; i++) { - if (node->devices.vnodes[i].claimed) { continue; } - if (idx == 0) { path = node->devices.vnodes[i].path; break; } - idx--; + pthread_mutex_lock(&node->reconciler_mutex); + + /* Reuse existing slot for this stream_id, or allocate a fresh one */ + struct Ingest_Stream *s = stream_find(node, req.stream_id); + if (!s) { + s = stream_alloc(node); + if (!s) { + pthread_mutex_unlock(&node->reconciler_mutex); + proto_write_control_response(conn, req.request_id, + PROTO_STATUS_ERROR, NULL, 0); + fprintf(stderr, "START_INGEST: no free stream slots\n"); + return; } + s->in_use = 1; + s->stream_id = req.stream_id; } - if (path == NULL) { + /* Copy wanted config (NUL-terminate the wire strings) */ + s->format = req.format; + s->width = req.width; + s->height = req.height; + s->fps_n = req.fps_n; + s->fps_d = req.fps_d; + s->transport_mode = req.transport_mode; + s->dest_port = req.dest_port; + + size_t dp_len = req.device_path_len < sizeof(s->device_path) - 1 + ? req.device_path_len : sizeof(s->device_path) - 1; + memcpy(s->device_path, req.device_path, dp_len); + s->device_path[dp_len] = '\0'; + + size_t dh_len = req.dest_host_len < sizeof(s->dest_host) - 1 + ? req.dest_host_len : sizeof(s->dest_host) - 1; + memcpy(s->dest_host, req.dest_host, dh_len); + s->dest_host[dh_len] = '\0'; + + reconciler_set_wanted(s->res_device, DEV_STREAMING); + reconciler_set_wanted(s->res_transport, CONN_CONNECTED); + reconciler_tick(node->reconciler); + + pthread_mutex_unlock(&node->reconciler_mutex); + + proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0); +} + +static void handle_stop_ingest(struct Node *node, + struct Transport_Conn *conn, + const uint8_t *payload, uint32_t length) +{ + struct Proto_Stop_Ingest req; + struct App_Error e = proto_read_stop_ingest(payload, length, &req); + if (!APP_IS_OK(e)) { + proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); + return; + } + + pthread_mutex_lock(&node->reconciler_mutex); + + struct Ingest_Stream *s = stream_find(node, req.stream_id); + if (!s) { + pthread_mutex_unlock(&node->reconciler_mutex); proto_write_control_response(conn, req.request_id, PROTO_STATUS_NOT_FOUND, NULL, 0); return; } - struct V4l2_Ctrl_Handle *handle; - e = v4l2_ctrl_open(path, &handle); - if (!APP_IS_OK(e)) { - proto_write_control_response(conn, req.request_id, - PROTO_STATUS_ERROR, NULL, 0); - return; - } + reconciler_set_wanted(s->res_device, DEV_CLOSED); + reconciler_set_wanted(s->res_transport, CONN_DISCONNECTED); + reconciler_tick(node->reconciler); - e = v4l2_ctrl_set(handle, req.control_id, req.value); - v4l2_ctrl_close(handle); + pthread_mutex_unlock(&node->reconciler_mutex); - uint16_t status = APP_IS_OK(e) ? PROTO_STATUS_OK : PROTO_STATUS_ERROR; - proto_write_control_response(conn, req.request_id, status, NULL, 0); + proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0); } /* ------------------------------------------------------------------------- - * Transport callbacks + * Transport callbacks (inbound server) * ------------------------------------------------------------------------- */ static void on_frame(struct Transport_Conn *conn, @@ -491,10 +851,7 @@ static void on_frame(struct Transport_Conn *conn, struct Proto_Request_Header hdr; struct App_Error e = proto_read_request_header( frame->payload, frame->payload_length, &hdr); - if (!APP_IS_OK(e)) { - free(frame->payload); - return; - } + if (!APP_IS_OK(e)) { free(frame->payload); return; } switch (hdr.command) { case PROTO_CMD_ENUM_DEVICES: @@ -512,6 +869,19 @@ static void on_frame(struct Transport_Conn *conn, handle_set_control(node, conn, frame->payload, frame->payload_length); break; + case PROTO_CMD_START_INGEST: + handle_start_ingest(node, conn, + frame->payload, frame->payload_length); + break; + case PROTO_CMD_STOP_INGEST: + handle_stop_ingest(node, conn, + frame->payload, frame->payload_length); + break; + case PROTO_CMD_STREAM_OPEN: + /* A sender is opening a stream on this node (sink role) */ + proto_write_control_response(conn, hdr.request_id, + PROTO_STATUS_OK, NULL, 0); + break; default: proto_write_control_response(conn, hdr.request_id, PROTO_STATUS_UNKNOWN_CMD, NULL, 0); @@ -522,12 +892,14 @@ static void on_frame(struct Transport_Conn *conn, free(frame->payload); } -static void on_connect(struct Transport_Conn *conn, void *userdata) { +static void on_connect(struct Transport_Conn *conn, void *userdata) +{ (void)conn; (void)userdata; printf("peer connected\n"); } -static void on_disconnect(struct Transport_Conn *conn, void *userdata) { +static void on_disconnect(struct Transport_Conn *conn, void *userdata) +{ (void)conn; (void)userdata; printf("peer disconnected\n"); } @@ -545,14 +917,14 @@ static const struct Config_Flag_Def function_flag_defs[] = { }; static const struct Config_Def schema[] = { - { "node", "name", CONFIG_STRING, "unnamed:0", NULL }, - { "node", "site_id", CONFIG_UINT16, "0", NULL }, - { "node", "tcp_port", CONFIG_UINT16, "8000", NULL }, + { "node", "name", CONFIG_STRING, "unnamed:0", NULL }, + { "node", "site_id", CONFIG_UINT16, "0", NULL }, + { "node", "tcp_port", CONFIG_UINT16, "8000", NULL }, { "node", "function", CONFIG_FLAGS, "source", function_flag_defs }, - { "discovery", "interval_ms", CONFIG_UINT32, "5000", NULL }, - { "discovery", "timeout_intervals", CONFIG_UINT32, "3", NULL }, - { "transport", "max_connections", CONFIG_UINT32, "16", NULL }, - { "transport", "max_payload", CONFIG_UINT32, "16777216", NULL }, + { "discovery", "interval_ms", CONFIG_UINT32, "5000", NULL }, + { "discovery", "timeout_intervals", CONFIG_UINT32, "3", NULL }, + { "transport", "max_connections", CONFIG_UINT32, "16", NULL }, + { "transport", "max_payload", CONFIG_UINT32, "16777216", NULL }, { NULL } }; @@ -566,7 +938,8 @@ static void usage(void) { " video-node --defaults\n"); } -int main(int argc, char **argv) { +int main(int argc, char **argv) +{ if (argc < 2) { usage(); return 1; } struct Node node; @@ -581,14 +954,14 @@ int main(int argc, char **argv) { } if (!APP_IS_OK(e)) { app_error_print(&e); return 1; } - uint16_t tcp_port = config_get_u16(node.config, "node", "tcp_port"); - uint16_t site_id = config_get_u16(node.config, "node", "site_id"); - uint32_t func = config_get_flags(node.config, "node", "function"); - const char *name = config_get_str(node.config, "node", "name"); - uint32_t interval = config_get_u32(node.config, "discovery", "interval_ms"); - uint32_t timeout_i = config_get_u32(node.config, "discovery", "timeout_intervals"); - uint32_t max_conn = config_get_u32(node.config, "transport", "max_connections"); - uint32_t max_pay = config_get_u32(node.config, "transport", "max_payload"); + uint16_t tcp_port = config_get_u16(node.config, "node", "tcp_port"); + uint16_t site_id = config_get_u16(node.config, "node", "site_id"); + uint32_t func = config_get_flags(node.config, "node", "function"); + const char *name = config_get_str(node.config, "node", "name"); + uint32_t interval = config_get_u32(node.config, "discovery", "interval_ms"); + uint32_t timeout_i = config_get_u32(node.config, "discovery", "timeout_intervals"); + uint32_t max_conn = config_get_u32(node.config, "transport", "max_connections"); + uint32_t max_pay = config_get_u32(node.config, "transport", "max_payload"); printf("node: %s port=%u site=%u\n", name, tcp_port, site_id); @@ -598,7 +971,21 @@ int main(int argc, char **argv) { printf("found %d media device(s), %d video node(s)\n", node.devices.media_count, node.devices.vnode_count); - /* Start transport server */ + /* Reconciler */ + node.reconciler = reconciler_create(); + if (!node.reconciler) { + fprintf(stderr, "failed to create reconciler\n"); + return 1; + } + reconciler_set_log(node.reconciler, on_reconciler_log, NULL); + pthread_mutex_init(&node.reconciler_mutex, NULL); + + /* Pre-allocate stream slots */ + for (int i = 0; i < MAX_STREAMS; i++) { + stream_slot_init(&node, i); + } + + /* Transport server */ struct Transport_Server_Config srv_cfg = { .port = tcp_port, .max_connections = (int)max_conn, @@ -608,32 +995,32 @@ int main(int argc, char **argv) { .on_disconnect = on_disconnect, .userdata = &node, }; - e = transport_server_create(&node.server, &srv_cfg); if (!APP_IS_OK(e)) { app_error_print(&e); return 1; } - e = transport_server_start(node.server); if (!APP_IS_OK(e)) { app_error_print(&e); return 1; } - /* Start discovery */ + /* Discovery */ struct Discovery_Config disc_cfg = { - .site_id = site_id, - .tcp_port = tcp_port, - .function_flags = (uint16_t)func, - .name = name, - .interval_ms = interval, - .timeout_intervals= timeout_i, - .on_peer_found = NULL, - .on_peer_lost = NULL, + .site_id = site_id, + .tcp_port = tcp_port, + .function_flags = (uint16_t)func, + .name = name, + .interval_ms = interval, + .timeout_intervals = timeout_i, + .on_peer_found = NULL, + .on_peer_lost = NULL, }; - e = discovery_create(&node.discovery, &disc_cfg); if (!APP_IS_OK(e)) { app_error_print(&e); return 1; } - e = discovery_start(node.discovery); if (!APP_IS_OK(e)) { app_error_print(&e); return 1; } + /* Periodic reconciler tick */ + pthread_t timer_thread; + pthread_create(&timer_thread, NULL, reconciler_timer, &node); + printf("ready\n"); - pause(); + pthread_join(timer_thread, NULL); return 0; }