#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "config.h" #include "discovery.h" #include "transport.h" #include "protocol.h" #include "media_ctrl.h" #include "v4l2_ctrl.h" #include "reconciler.h" #include "ingest.h" #include "stream_stats.h" #include "xorg.h" #include "error.h" /* ------------------------------------------------------------------------- * Stream slot constants * ------------------------------------------------------------------------- */ #define MAX_STREAMS 8 #define MAX_DISPLAYS 4 #define DEV_CLOSED 0 #define DEV_OPEN 1 #define DEV_STREAMING 2 #define CONN_DISCONNECTED 0 #define CONN_CONNECTED 1 /* ------------------------------------------------------------------------- * Device enumeration (unchanged from original) * ------------------------------------------------------------------------- */ #define MEDIA_ENT_F_IO_V4L 0x10001u #define MAX_VIDEO_NODES 32 #define MAX_MEDIA_DEVICES 8 #define MAX_VNODES_PER_MD 8 #define MAX_CONTROLS 256 #define DEV_PATH_MAX 264 struct VNode { char path[DEV_PATH_MAX]; uint32_t dev_major; uint32_t dev_minor; uint32_t device_caps; int claimed; char card[32]; }; struct MNode { char path[DEV_PATH_MAX]; char entity_name[32]; uint32_t entity_type; uint32_t entity_flags; uint32_t device_caps; uint8_t is_capture; int vnode_index; }; struct MediaDev { char path[DEV_PATH_MAX]; char driver[16]; char model[32]; char bus_info[32]; struct MNode vnodes[MAX_VNODES_PER_MD]; int vnode_count; }; struct Device_List { struct VNode vnodes[MAX_VIDEO_NODES]; int vnode_count; struct MediaDev media[MAX_MEDIA_DEVICES]; int media_count; }; /* ------------------------------------------------------------------------- * Forward declarations * ------------------------------------------------------------------------- */ struct Node; struct Ingest_Stream; static int trans_device_open(void *ud); static int trans_device_close(void *ud); static int trans_device_start(void *ud); static int trans_device_stop(void *ud); static int trans_transport_connect(void *ud); static int trans_transport_disconnect(void *ud); static void on_ingest_frame(const uint8_t *data, uint32_t len, int width, int height, uint32_t pixfmt, void *userdata); static void on_ingest_error(const char *msg, void *userdata); static void handle_start_display(struct Node *node, struct Transport_Conn *conn, const uint8_t *payload, uint32_t length); static void handle_stop_display(struct Node *node, struct Transport_Conn *conn, const uint8_t *payload, uint32_t length); /* ------------------------------------------------------------------------- * Transition tables — shared across all stream slots * ------------------------------------------------------------------------- */ static const struct Rec_Transition device_transitions[] = { { DEV_CLOSED, DEV_OPEN, trans_device_open }, { DEV_OPEN, DEV_CLOSED, trans_device_close }, { DEV_OPEN, DEV_STREAMING, trans_device_start }, { DEV_STREAMING, DEV_OPEN, trans_device_stop }, { -1, -1, NULL } }; static const char *device_state_names[] = { "CLOSED", "OPEN", "STREAMING" }; static const struct Rec_Transition transport_transitions[] = { { CONN_DISCONNECTED, CONN_CONNECTED, trans_transport_connect }, { CONN_CONNECTED, CONN_DISCONNECTED, trans_transport_disconnect }, { -1, -1, NULL } }; static const char *transport_state_names[] = { "DISCONNECTED", "CONNECTED" }; /* ------------------------------------------------------------------------- * Per-stream state * ------------------------------------------------------------------------- */ struct Ingest_Stream { int in_use; uint16_t stream_id; /* Wanted config — written by START_INGEST handler */ uint16_t format; uint16_t width, height; uint16_t fps_n, fps_d; uint16_t transport_mode; char device_path[256]; char dest_host[256]; uint16_t dest_port; /* Reconciler resources */ struct Rec_Resource *res_device; struct Rec_Resource *res_transport; /* Runtime */ struct Ingest_Handle *ingest_handle; pthread_mutex_t conn_mutex; struct Transport_Conn *conn; Stream_Stats stats; struct Node *node; }; /* ------------------------------------------------------------------------- * Display slot — sink role; one per displayed stream * ------------------------------------------------------------------------- */ /* Display states (wanted/current) */ #define DISP_CLOSED 0 #define DISP_OPEN 1 struct Display_Slot { pthread_mutex_t mutex; /* protects wanted_state + frame fields */ int allocated; /* slot is in use */ uint16_t stream_id; /* Declarative state — handlers write wanted, main loop reconciles */ int wanted_state; /* DISP_CLOSED / DISP_OPEN */ int current_state;/* driven by display_loop_tick */ /* Config — written by handle_start_display before setting wanted */ int win_x, win_y; int win_w, win_h; Xorg_Scale scale; Xorg_Anchor anchor; /* Pending frame — deposited by transport thread, consumed by main */ uint8_t *frame_data; /* malloc'd transport payload; owned */ uint32_t frame_len; int frame_ready; /* No-signal animation */ int no_signal_fps; /* 0 → default 15 */ uint64_t last_frame_ms; /* monotonic ms of last successfully displayed frame */ uint64_t last_no_signal_ms; /* monotonic ms of last no-signal render */ /* Viewer — created and used only on the main thread */ Xorg_Viewer *viewer; }; /* ------------------------------------------------------------------------- * Node * ------------------------------------------------------------------------- */ struct Node { struct Config *config; struct Transport_Server *server; struct Discovery *discovery; struct Device_List devices; struct Reconciler *reconciler; pthread_mutex_t reconciler_mutex; struct Ingest_Stream streams[MAX_STREAMS]; struct Display_Slot displays[MAX_DISPLAYS]; }; /* ------------------------------------------------------------------------- * Transition implementations * ------------------------------------------------------------------------- */ static int trans_device_open(void *ud) { struct Ingest_Stream *s = ud; /* Map PROTO_FORMAT_* to V4L2 pixfmt; 0 = auto-select best MJPEG */ uint32_t pixfmt = 0; switch (s->format) { case PROTO_FORMAT_MJPEG: pixfmt = V4L2_PIX_FMT_MJPEG; break; default: pixfmt = 0; break; } struct Ingest_Config cfg = { .device = s->device_path, .pixfmt = pixfmt, .width = s->width, .height = s->height, .on_frame = on_ingest_frame, .on_error = on_ingest_error, .userdata = s, }; struct App_Error e = ingest_open(&cfg, &s->ingest_handle); if (!APP_IS_OK(e)) { app_error_print(&e); return 0; } return 1; } static int trans_device_close(void *ud) { struct Ingest_Stream *s = ud; ingest_close(s->ingest_handle); s->ingest_handle = NULL; return 1; } static void on_ingest_frame(const uint8_t *data, uint32_t len, int width, int height, uint32_t pixfmt, void *userdata) { struct Ingest_Stream *s = userdata; (void)width; (void)height; (void)pixfmt; pthread_mutex_lock(&s->conn_mutex); if (s->conn) { struct App_Error e = proto_write_video_frame(s->conn, s->stream_id, data, len); if (!APP_IS_OK(e)) { /* transport will fire on_disconnect; nothing to do here */ } else { stream_stats_record_frame(&s->stats, len); } } pthread_mutex_unlock(&s->conn_mutex); if (stream_stats_tick(&s->stats)) { fprintf(stderr, "stream %u %.1f fps %.2f Mbps\n", s->stream_id, s->stats.fps, s->stats.mbps); } } static void on_ingest_error(const char *msg, void *userdata) { struct Ingest_Stream *s = userdata; fprintf(stderr, "ingest error [stream %u]: %s\n", s->stream_id, msg); /* Capture thread has exited; push device back to OPEN so the * reconciler will restart it on the next tick. */ pthread_mutex_lock(&s->node->reconciler_mutex); reconciler_force_current(s->res_device, DEV_OPEN); pthread_mutex_unlock(&s->node->reconciler_mutex); } static int trans_device_start(void *ud) { struct Ingest_Stream *s = ud; struct App_Error e = ingest_start(s->ingest_handle); if (!APP_IS_OK(e)) { app_error_print(&e); return 0; } return 1; } static int trans_device_stop(void *ud) { struct Ingest_Stream *s = ud; struct App_Error e = ingest_stop(s->ingest_handle); if (!APP_IS_OK(e)) { app_error_print(&e); return 0; } return 1; } static void outbound_on_frame(struct Transport_Conn *conn, struct Transport_Frame *frame, void *userdata) { (void)conn; (void)userdata; /* Discard control responses from the sink */ free(frame->payload); } static void outbound_on_disconnect(struct Transport_Conn *conn, void *userdata) { struct Ingest_Stream *s = userdata; (void)conn; fprintf(stderr, "stream %u: transport disconnected\n", s->stream_id); pthread_mutex_lock(&s->conn_mutex); s->conn = NULL; pthread_mutex_unlock(&s->conn_mutex); /* Force transport resource back so the reconciler re-drives it */ pthread_mutex_lock(&s->node->reconciler_mutex); reconciler_force_current(s->res_transport, CONN_DISCONNECTED); reconciler_tick(s->node->reconciler); pthread_mutex_unlock(&s->node->reconciler_mutex); } static int trans_transport_connect(void *ud) { struct Ingest_Stream *s = ud; struct Transport_Conn *conn = NULL; struct App_Error e = transport_connect(&conn, s->dest_host, s->dest_port, TRANSPORT_DEFAULT_MAX_PAYLOAD, outbound_on_frame, outbound_on_disconnect, s); if (!APP_IS_OK(e)) { app_error_print(&e); return 0; } /* Send STREAM_OPEN so the sink knows what's coming */ uint16_t proto_fmt = s->format ? s->format : PROTO_FORMAT_MJPEG; e = proto_write_stream_open(conn, 1 /* request_id */, s->stream_id, proto_fmt, 0 /* pixel_format: compressed */, PROTO_ORIGIN_DEVICE_NATIVE); if (!APP_IS_OK(e)) { app_error_print(&e); transport_conn_close(conn); return 0; } pthread_mutex_lock(&s->conn_mutex); s->conn = conn; pthread_mutex_unlock(&s->conn_mutex); stream_stats_init(&s->stats, s->stream_id); return 1; } static int trans_transport_disconnect(void *ud) { struct Ingest_Stream *s = ud; pthread_mutex_lock(&s->conn_mutex); struct Transport_Conn *conn = s->conn; s->conn = NULL; pthread_mutex_unlock(&s->conn_mutex); if (conn) { transport_conn_close(conn); } return 1; } /* ------------------------------------------------------------------------- * Stream slot management * ------------------------------------------------------------------------- */ static void stream_slot_init(struct Node *node, int idx) { struct Ingest_Stream *s = &node->streams[idx]; memset(s, 0, sizeof(*s)); pthread_mutex_init(&s->conn_mutex, NULL); s->node = node; char dev_name[32], conn_name[32]; snprintf(dev_name, sizeof(dev_name), "device_%d", idx); snprintf(conn_name, sizeof(conn_name), "transport_%d", idx); s->res_device = reconciler_add_resource(node->reconciler, dev_name, device_transitions, 3, device_state_names, DEV_CLOSED, s); s->res_transport = reconciler_add_resource(node->reconciler, conn_name, transport_transitions, 2, transport_state_names, CONN_DISCONNECTED, s); /* Transport needs device open (to know the format for STREAM_OPEN) */ reconciler_add_dep(s->res_transport, CONN_CONNECTED, s->res_device, DEV_OPEN); /* Don't start capture until transport is connected */ reconciler_add_dep(s->res_device, DEV_STREAMING, s->res_transport, CONN_CONNECTED); } /* Find slot by stream_id, or find a free slot if stream_id is 0 */ static struct Ingest_Stream *stream_find(struct Node *node, uint16_t stream_id) { for (int i = 0; i < MAX_STREAMS; i++) { if (node->streams[i].in_use && node->streams[i].stream_id == stream_id) { return &node->streams[i]; } } return NULL; } static struct Ingest_Stream *stream_alloc(struct Node *node) { for (int i = 0; i < MAX_STREAMS; i++) { if (!node->streams[i].in_use) { return &node->streams[i]; } } return NULL; } /* ------------------------------------------------------------------------- * Reconciler log * ------------------------------------------------------------------------- */ static void on_reconciler_log(const struct Rec_Resource *res, int from, int to, int success, void *userdata) { (void)userdata; fprintf(stderr, "reconciler: [%s] %s -> %s ... %s\n", reconciler_get_name(res), reconciler_state_name(res, from), reconciler_state_name(res, to), success ? "ok" : "FAILED"); } /* ------------------------------------------------------------------------- * Main-thread display loop * * Called from the main thread only. Opens/closes Xorg_Viewer windows as * display slots are created or destroyed, delivers pending frames, and polls * GLFW events for all active viewers. * ------------------------------------------------------------------------- */ /* * display_loop_tick — main-thread reconciler for display slots. * * Runs on the main thread only (GLFW requirement for xorg_viewer_open/close). * Reads wanted_state set by handlers; drives current_state through transitions. * For slots that are OPEN: delivers pending frames and polls window events. */ static void display_loop_tick(struct Node *node) { for (int i = 0; i < MAX_DISPLAYS; i++) { struct Display_Slot *d = &node->displays[i]; pthread_mutex_lock(&d->mutex); int allocated = d->allocated; int wanted = d->wanted_state; int current = d->current_state; pthread_mutex_unlock(&d->mutex); if (!allocated) { continue; } /* Reconcile: drive current toward wanted */ if (current != wanted) { if (wanted == DISP_OPEN && current == DISP_CLOSED) { char title[64]; snprintf(title, sizeof(title), "stream %u", (unsigned)d->stream_id); Xorg_Viewer *v = xorg_viewer_open( d->win_x, d->win_y, d->win_w, d->win_h, title); if (v) { xorg_viewer_set_scale(v, d->scale); xorg_viewer_set_anchor(v, d->anchor); d->viewer = v; pthread_mutex_lock(&d->mutex); d->current_state = DISP_OPEN; pthread_mutex_unlock(&d->mutex); } /* If open failed, current stays CLOSED; will retry next tick */ } else if (wanted == DISP_CLOSED && current == DISP_OPEN) { if (d->viewer) { xorg_viewer_close(d->viewer); d->viewer = NULL; } pthread_mutex_lock(&d->mutex); d->current_state = DISP_CLOSED; if (d->frame_data) { free(d->frame_data); d->frame_data = NULL; } d->frame_ready = 0; d->allocated = 0; /* release slot */ pthread_mutex_unlock(&d->mutex); continue; } } if (d->current_state != DISP_OPEN || !d->viewer) { continue; } /* Deliver pending frame (no lock held during decode/upload) */ pthread_mutex_lock(&d->mutex); uint8_t *fdata = NULL; uint32_t flen = 0; if (d->frame_ready) { fdata = d->frame_data; flen = d->frame_len; d->frame_data = NULL; d->frame_len = 0; d->frame_ready = 0; } pthread_mutex_unlock(&d->mutex); struct timespec _ts; clock_gettime(CLOCK_MONOTONIC, &_ts); uint64_t now_ms = (uint64_t)_ts.tv_sec * 1000u + (uint64_t)_ts.tv_nsec / 1000000u; if (fdata) { struct Proto_Video_Frame vf; if (APP_IS_OK(proto_read_video_frame(fdata, flen, &vf))) { xorg_viewer_push_mjpeg(d->viewer, vf.data, vf.data_len); d->last_frame_ms = now_ms; } free(fdata); } /* Render no-signal only when stream has been silent for ≥1 s */ if (now_ms - d->last_frame_ms >= 1000u) { uint64_t interval_ms = 1000u / (uint64_t)(d->no_signal_fps > 0 ? d->no_signal_fps : 15); if (now_ms - d->last_no_signal_ms >= interval_ms) { /* shader time: wrap ms to [0, 1000000) → [0, 1000) s, preserving float32 precision */ float shader_t = (float)(now_ms % 1000000u) / 1000.0f; xorg_viewer_render_no_signal(d->viewer, shader_t, 80.0f); d->last_no_signal_ms = now_ms; } } /* Poll GLFW events; if user closes the window, treat as STOP_DISPLAY */ if (!xorg_viewer_handle_events(d->viewer)) { xorg_viewer_close(d->viewer); d->viewer = NULL; pthread_mutex_lock(&d->mutex); d->current_state = DISP_CLOSED; d->wanted_state = DISP_CLOSED; if (d->frame_data) { free(d->frame_data); d->frame_data = NULL; } d->frame_ready = 0; d->allocated = 0; pthread_mutex_unlock(&d->mutex); } } } static int any_display_active(struct Node *node) { for (int i = 0; i < MAX_DISPLAYS; i++) { if (node->displays[i].allocated) { return 1; } } return 0; } /* ------------------------------------------------------------------------- * Periodic reconciler tick thread * ------------------------------------------------------------------------- */ static void *reconciler_timer(void *arg) { struct Node *node = arg; while (1) { usleep(500000); /* 500 ms */ pthread_mutex_lock(&node->reconciler_mutex); reconciler_tick(node->reconciler); pthread_mutex_unlock(&node->reconciler_mutex); } return NULL; } /* ------------------------------------------------------------------------- * Device enumeration (unchanged) * ------------------------------------------------------------------------- */ static int scan_video_nodes(struct Device_List *dl) { DIR *d = opendir("/dev"); if (!d) { return -1; } struct dirent *ent; while ((ent = readdir(d)) != NULL && dl->vnode_count < MAX_VIDEO_NODES) { if (strncmp(ent->d_name, "video", 5) != 0) { continue; } const char *suffix = ent->d_name + 5; int numeric = (*suffix != '\0'); for (const char *p = suffix; *p; p++) { if (*p < '0' || *p > '9') { numeric = 0; break; } } if (!numeric) { continue; } struct VNode *v = &dl->vnodes[dl->vnode_count]; snprintf(v->path, sizeof(v->path), "/dev/%s", ent->d_name); struct stat st; if (stat(v->path, &st) != 0 || !S_ISCHR(st.st_mode)) { continue; } v->dev_major = (uint32_t)major(st.st_rdev); v->dev_minor = (uint32_t)minor(st.st_rdev); v->claimed = 0; v->card[0] = '\0'; int fd = open(v->path, O_RDONLY | O_NONBLOCK); if (fd >= 0) { struct v4l2_capability cap; if (ioctl(fd, VIDIOC_QUERYCAP, &cap) == 0) { strncpy(v->card, (const char *)cap.card, sizeof(v->card) - 1); v->card[sizeof(v->card) - 1] = '\0'; v->device_caps = (cap.capabilities & V4L2_CAP_DEVICE_CAPS) ? cap.device_caps : cap.capabilities; } close(fd); } dl->vnode_count++; } closedir(d); return 0; } struct Entity_Cb_State { struct Device_List *dl; struct MediaDev *md; }; static void entity_callback(const struct Media_Entity *entity, void *userdata) { struct Entity_Cb_State *state = userdata; struct Device_List *dl = state->dl; struct MediaDev *md = state->md; if (entity->dev_major == 0) { return; } if (md->vnode_count >= MAX_VNODES_PER_MD) { return; } for (int i = 0; i < dl->vnode_count; i++) { if (dl->vnodes[i].dev_major != entity->dev_major) { continue; } if (dl->vnodes[i].dev_minor != entity->dev_minor) { continue; } dl->vnodes[i].claimed = 1; struct MNode *mn = &md->vnodes[md->vnode_count++]; strncpy(mn->path, dl->vnodes[i].path, sizeof(mn->path) - 1); mn->path[sizeof(mn->path) - 1] = '\0'; strncpy(mn->entity_name, entity->name, sizeof(mn->entity_name) - 1); mn->entity_name[sizeof(mn->entity_name) - 1] = '\0'; mn->entity_type = entity->type; mn->entity_flags = entity->flags; mn->device_caps = dl->vnodes[i].device_caps; mn->is_capture = (mn->device_caps & V4L2_CAP_VIDEO_CAPTURE) ? 1 : 0; mn->vnode_index = i; break; } } static void scan_media_devices(struct Device_List *dl) { DIR *d = opendir("/dev"); if (!d) { return; } struct dirent *ent; while ((ent = readdir(d)) != NULL && dl->media_count < MAX_MEDIA_DEVICES) { if (strncmp(ent->d_name, "media", 5) != 0) { continue; } const char *suffix = ent->d_name + 5; int numeric = (*suffix != '\0'); for (const char *p = suffix; *p; p++) { if (*p < '0' || *p > '9') { numeric = 0; break; } } if (!numeric) { continue; } struct MediaDev *md = &dl->media[dl->media_count]; snprintf(md->path, sizeof(md->path), "/dev/%s", ent->d_name); md->vnode_count = 0; struct Media_Ctrl *ctrl; if (!APP_IS_OK(media_ctrl_open(md->path, &ctrl))) { continue; } struct Media_Device_Info info; if (APP_IS_OK(media_ctrl_get_info(ctrl, &info))) { strncpy(md->driver, info.driver, sizeof(md->driver) - 1); strncpy(md->model, info.model, sizeof(md->model) - 1); strncpy(md->bus_info, info.bus_info, sizeof(md->bus_info) - 1); md->driver[sizeof(md->driver) - 1] = '\0'; md->model[sizeof(md->model) - 1] = '\0'; md->bus_info[sizeof(md->bus_info) - 1] = '\0'; } struct Entity_Cb_State state = { .dl = dl, .md = md }; media_ctrl_enum_entities(ctrl, entity_callback, &state); media_ctrl_close(ctrl); dl->media_count++; } closedir(d); } static void build_device_list(struct Device_List *dl) { memset(dl, 0, sizeof(*dl)); scan_video_nodes(dl); scan_media_devices(dl); } /* ------------------------------------------------------------------------- * Control enumeration helpers (unchanged) * ------------------------------------------------------------------------- */ #define MAX_MENU_POOL 128 struct Ctrl_Build { struct Proto_Control_Info items[MAX_CONTROLS]; char names[MAX_CONTROLS][32]; struct Proto_Menu_Item menu_pool[MAX_MENU_POOL]; char menu_names[MAX_MENU_POOL][32]; int menu_pool_used; int count; }; static void ctrl_enum_cb( const struct V4l2_Ctrl_Desc *desc, uint32_t menu_count, const struct V4l2_Menu_Item *menu_items, void *userdata) { struct Ctrl_Build *b = userdata; if (b->count >= MAX_CONTROLS) { return; } int i = b->count++; strncpy(b->names[i], desc->name, 31); b->names[i][31] = '\0'; b->items[i].id = desc->id; b->items[i].type = (uint8_t)desc->type; b->items[i].flags = desc->flags; b->items[i].name = b->names[i]; b->items[i].min = desc->min; b->items[i].max = desc->max; b->items[i].step = desc->step; b->items[i].default_val = desc->default_value; b->items[i].current_val = desc->current_value; b->items[i].menu_count = 0; b->items[i].menu_items = NULL; if (menu_count > 0 && menu_items) { int avail = MAX_MENU_POOL - b->menu_pool_used; uint8_t mc = (menu_count > (uint32_t)avail) ? (uint8_t)avail : (uint8_t)menu_count; if (mc > 0) { b->items[i].menu_items = &b->menu_pool[b->menu_pool_used]; b->items[i].menu_count = mc; for (uint8_t j = 0; j < mc; j++) { int slot = b->menu_pool_used + j; strncpy(b->menu_names[slot], menu_items[j].name, 31); b->menu_names[slot][31] = '\0'; b->menu_pool[slot].index = menu_items[j].index; b->menu_pool[slot].name = b->menu_names[slot]; b->menu_pool[slot].int_value = menu_items[j].value; } b->menu_pool_used += mc; } } } /* ------------------------------------------------------------------------- * Device path resolver (shared by enum/get/set control handlers) * ------------------------------------------------------------------------- */ static const char *resolve_device_path(struct Node *node, int idx) { for (int i = 0; i < node->devices.media_count; i++) { struct MediaDev *md = &node->devices.media[i]; if (idx < md->vnode_count) { return md->vnodes[idx].path; } idx -= md->vnode_count; } for (int i = 0; i < node->devices.vnode_count; i++) { if (node->devices.vnodes[i].claimed) { continue; } if (idx == 0) { return node->devices.vnodes[i].path; } idx--; } return NULL; } /* ------------------------------------------------------------------------- * Request handlers * ------------------------------------------------------------------------- */ static void handle_enum_devices(struct Node *node, struct Transport_Conn *conn, uint16_t request_id) { struct Proto_Video_Node_Info vnodes[MAX_MEDIA_DEVICES * MAX_VNODES_PER_MD]; struct Proto_Media_Device_Info mdevs[MAX_MEDIA_DEVICES]; int vnode_offset = 0; for (int i = 0; i < node->devices.media_count; i++) { struct MediaDev *md = &node->devices.media[i]; mdevs[i].path = md->path; mdevs[i].driver = md->driver; mdevs[i].model = md->model; mdevs[i].bus_info = md->bus_info; mdevs[i].video_node_count = (uint8_t)md->vnode_count; mdevs[i].video_nodes = &vnodes[vnode_offset]; for (int j = 0; j < md->vnode_count; j++) { struct MNode *mn = &md->vnodes[j]; vnodes[vnode_offset + j].path = mn->path; vnodes[vnode_offset + j].entity_name = mn->entity_name; vnodes[vnode_offset + j].entity_type = mn->entity_type; vnodes[vnode_offset + j].entity_flags= mn->entity_flags; vnodes[vnode_offset + j].device_caps = mn->device_caps; vnodes[vnode_offset + j].pad_flags = 0; vnodes[vnode_offset + j].is_capture = mn->is_capture; } vnode_offset += md->vnode_count; } struct Proto_Standalone_Device_Info standalone[MAX_VIDEO_NODES]; int standalone_count = 0; for (int i = 0; i < node->devices.vnode_count; i++) { if (node->devices.vnodes[i].claimed) { continue; } standalone[standalone_count].path = node->devices.vnodes[i].path; standalone[standalone_count].name = node->devices.vnodes[i].card; standalone_count++; } struct Proto_Display_Device_Info disp_infos[MAX_DISPLAYS]; int disp_count = 0; for (int i = 0; i < MAX_DISPLAYS; i++) { struct Display_Slot *d = &node->displays[i]; pthread_mutex_lock(&d->mutex); int snap = d->allocated && d->wanted_state == DISP_OPEN; struct Proto_Display_Device_Info info = { .stream_id = d->stream_id, .win_x = (int16_t)d->win_x, .win_y = (int16_t)d->win_y, .win_w = (uint16_t)d->win_w, .win_h = (uint16_t)d->win_h, .scale = (uint8_t)d->scale, .anchor = (uint8_t)d->anchor, }; pthread_mutex_unlock(&d->mutex); if (!snap) { continue; } disp_infos[disp_count++] = info; } struct App_Error e = proto_write_enum_devices_response(conn, request_id, PROTO_STATUS_OK, mdevs, (uint16_t)node->devices.media_count, standalone, (uint16_t)standalone_count, disp_infos, (uint16_t)disp_count); if (!APP_IS_OK(e)) { app_error_print(&e); } } static void handle_enum_controls(struct Node *node, struct Transport_Conn *conn, const uint8_t *payload, uint32_t length) { struct Proto_Enum_Controls_Req req; struct App_Error e = proto_read_enum_controls_req(payload, length, &req); if (!APP_IS_OK(e)) { proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); return; } const char *path = resolve_device_path(node, (int)req.device_index); if (!path) { proto_write_control_response(conn, req.request_id, PROTO_STATUS_NOT_FOUND, NULL, 0); return; } struct V4l2_Ctrl_Handle *handle; e = v4l2_ctrl_open(path, &handle); if (!APP_IS_OK(e)) { proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0); return; } struct Ctrl_Build build = { .count = 0 }; v4l2_ctrl_enumerate(handle, ctrl_enum_cb, &build); v4l2_ctrl_close(handle); e = proto_write_enum_controls_response(conn, req.request_id, PROTO_STATUS_OK, build.items, (uint16_t)build.count); if (!APP_IS_OK(e)) { app_error_print(&e); } } static void handle_get_control(struct Node *node, struct Transport_Conn *conn, const uint8_t *payload, uint32_t length) { struct Proto_Get_Control_Req req; struct App_Error e = proto_read_get_control_req(payload, length, &req); if (!APP_IS_OK(e)) { proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); return; } const char *path = resolve_device_path(node, (int)req.device_index); if (!path) { proto_write_control_response(conn, req.request_id, PROTO_STATUS_NOT_FOUND, NULL, 0); return; } struct V4l2_Ctrl_Handle *handle; e = v4l2_ctrl_open(path, &handle); if (!APP_IS_OK(e)) { proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0); return; } int32_t value; e = v4l2_ctrl_get(handle, req.control_id, &value); v4l2_ctrl_close(handle); if (!APP_IS_OK(e)) { proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0); return; } e = proto_write_get_control_response(conn, req.request_id, PROTO_STATUS_OK, value); if (!APP_IS_OK(e)) { app_error_print(&e); } } static void handle_set_control(struct Node *node, struct Transport_Conn *conn, const uint8_t *payload, uint32_t length) { struct Proto_Set_Control_Req req; struct App_Error e = proto_read_set_control_req(payload, length, &req); if (!APP_IS_OK(e)) { proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); return; } const char *path = resolve_device_path(node, (int)req.device_index); if (!path) { proto_write_control_response(conn, req.request_id, PROTO_STATUS_NOT_FOUND, NULL, 0); return; } struct V4l2_Ctrl_Handle *handle; e = v4l2_ctrl_open(path, &handle); if (!APP_IS_OK(e)) { proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0); return; } e = v4l2_ctrl_set(handle, req.control_id, req.value); v4l2_ctrl_close(handle); uint16_t status = APP_IS_OK(e) ? PROTO_STATUS_OK : PROTO_STATUS_ERROR; proto_write_control_response(conn, req.request_id, status, NULL, 0); } static void handle_start_ingest(struct Node *node, struct Transport_Conn *conn, const uint8_t *payload, uint32_t length) { struct Proto_Start_Ingest req; struct App_Error e = proto_read_start_ingest(payload, length, &req); if (!APP_IS_OK(e)) { proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); return; } pthread_mutex_lock(&node->reconciler_mutex); /* Reuse existing slot for this stream_id, or allocate a fresh one */ struct Ingest_Stream *s = stream_find(node, req.stream_id); if (!s) { s = stream_alloc(node); if (!s) { pthread_mutex_unlock(&node->reconciler_mutex); proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0); fprintf(stderr, "START_INGEST: no free stream slots\n"); return; } s->in_use = 1; s->stream_id = req.stream_id; } /* Copy wanted config (NUL-terminate the wire strings) */ s->format = req.format; s->width = req.width; s->height = req.height; s->fps_n = req.fps_n; s->fps_d = req.fps_d; s->transport_mode = req.transport_mode; s->dest_port = req.dest_port; size_t dp_len = req.device_path_len < sizeof(s->device_path) - 1 ? req.device_path_len : sizeof(s->device_path) - 1; memcpy(s->device_path, req.device_path, dp_len); s->device_path[dp_len] = '\0'; size_t dh_len = req.dest_host_len < sizeof(s->dest_host) - 1 ? req.dest_host_len : sizeof(s->dest_host) - 1; memcpy(s->dest_host, req.dest_host, dh_len); s->dest_host[dh_len] = '\0'; reconciler_set_wanted(s->res_device, DEV_STREAMING); reconciler_set_wanted(s->res_transport, CONN_CONNECTED); reconciler_tick(node->reconciler); pthread_mutex_unlock(&node->reconciler_mutex); proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0); } static void handle_stop_ingest(struct Node *node, struct Transport_Conn *conn, const uint8_t *payload, uint32_t length) { struct Proto_Stop_Ingest req; struct App_Error e = proto_read_stop_ingest(payload, length, &req); if (!APP_IS_OK(e)) { proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); return; } pthread_mutex_lock(&node->reconciler_mutex); struct Ingest_Stream *s = stream_find(node, req.stream_id); if (!s) { pthread_mutex_unlock(&node->reconciler_mutex); proto_write_control_response(conn, req.request_id, PROTO_STATUS_NOT_FOUND, NULL, 0); return; } reconciler_set_wanted(s->res_device, DEV_CLOSED); reconciler_set_wanted(s->res_transport, CONN_DISCONNECTED); reconciler_tick(node->reconciler); pthread_mutex_unlock(&node->reconciler_mutex); proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0); } /* ------------------------------------------------------------------------- * Display slot helpers * ------------------------------------------------------------------------- */ static void display_slots_init(struct Node *node) { for (int i = 0; i < MAX_DISPLAYS; i++) { struct Display_Slot *d = &node->displays[i]; memset(d, 0, sizeof(*d)); pthread_mutex_init(&d->mutex, NULL); } } static struct Display_Slot *display_find(struct Node *node, uint16_t stream_id) { for (int i = 0; i < MAX_DISPLAYS; i++) { struct Display_Slot *d = &node->displays[i]; pthread_mutex_lock(&d->mutex); /* Match any allocated slot — including ones being closed, so * START_DISPLAY can reuse them and STOP_DISPLAY can find them */ int match = d->allocated && d->stream_id == stream_id; pthread_mutex_unlock(&d->mutex); if (match) { return d; } } return NULL; } static struct Display_Slot *display_alloc(struct Node *node) { for (int i = 0; i < MAX_DISPLAYS; i++) { struct Display_Slot *d = &node->displays[i]; pthread_mutex_lock(&d->mutex); int is_free = !d->allocated; if (is_free) { d->allocated = 1; pthread_mutex_unlock(&d->mutex); return d; } pthread_mutex_unlock(&d->mutex); } return NULL; } /* Map PROTO_DISPLAY_SCALE_* to Xorg_Scale */ static Xorg_Scale proto_scale_to_xorg(uint8_t s) { switch (s) { case PROTO_DISPLAY_SCALE_FIT: return XORG_SCALE_FIT; case PROTO_DISPLAY_SCALE_FILL: return XORG_SCALE_FILL; case PROTO_DISPLAY_SCALE_1_1: return XORG_SCALE_1_1; default: return XORG_SCALE_STRETCH; } } /* Map PROTO_DISPLAY_ANCHOR_* to Xorg_Anchor */ static Xorg_Anchor proto_anchor_to_xorg(uint8_t a) { switch (a) { case PROTO_DISPLAY_ANCHOR_TOPLEFT: return XORG_ANCHOR_TOP_LEFT; default: return XORG_ANCHOR_CENTER; } } static void handle_start_display(struct Node *node, struct Transport_Conn *conn, const uint8_t *payload, uint32_t length) { struct Proto_Start_Display req; struct App_Error e = proto_read_start_display(payload, length, &req); if (!APP_IS_OK(e)) { proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); return; } if (!xorg_available()) { proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0); fprintf(stderr, "START_DISPLAY: xorg not available (headless build)\n"); return; } /* Reuse existing slot for this stream_id, or allocate a fresh one */ struct Display_Slot *d = display_find(node, req.stream_id); if (!d) { d = display_alloc(node); if (!d) { proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0); fprintf(stderr, "START_DISPLAY: no free display slots\n"); return; } } pthread_mutex_lock(&d->mutex); d->stream_id = req.stream_id; d->win_x = (int)req.win_x; d->win_y = (int)req.win_y; d->win_w = req.win_w > 0 ? (int)req.win_w : 1280; d->win_h = req.win_h > 0 ? (int)req.win_h : 720; d->scale = proto_scale_to_xorg(req.scale); d->anchor = proto_anchor_to_xorg(req.anchor); d->no_signal_fps = req.no_signal_fps > 0 ? (int)req.no_signal_fps : 15; d->wanted_state = DISP_OPEN; /* reconciled by display_loop_tick */ pthread_mutex_unlock(&d->mutex); proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0); } static void handle_stop_display(struct Node *node, struct Transport_Conn *conn, const uint8_t *payload, uint32_t length) { struct Proto_Stop_Display req; struct App_Error e = proto_read_stop_display(payload, length, &req); if (!APP_IS_OK(e)) { proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0); return; } struct Display_Slot *d = display_find(node, req.stream_id); if (!d) { proto_write_control_response(conn, req.request_id, PROTO_STATUS_NOT_FOUND, NULL, 0); return; } pthread_mutex_lock(&d->mutex); d->wanted_state = DISP_CLOSED; /* reconciled by display_loop_tick */ pthread_mutex_unlock(&d->mutex); proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0); } /* ------------------------------------------------------------------------- * Transport callbacks (inbound server) * ------------------------------------------------------------------------- */ static void on_frame(struct Transport_Conn *conn, struct Transport_Frame *frame, void *userdata) { struct Node *node = userdata; if (frame->message_type == PROTO_MSG_VIDEO_FRAME) { /* Sink role: route frame to matching display slot */ struct Proto_Video_Frame vf; struct App_Error e = proto_read_video_frame( frame->payload, frame->payload_length, &vf); if (APP_IS_OK(e)) { struct Display_Slot *d = display_find(node, vf.stream_id); if (d) { pthread_mutex_lock(&d->mutex); if (d->frame_ready) { free(d->frame_data); /* drop stale frame */ } /* Hand ownership of the payload to the display slot. * Set frame->payload = NULL so the free() below is a no-op. */ d->frame_data = frame->payload; d->frame_len = frame->payload_length; d->frame_ready = 1; frame->payload = NULL; pthread_mutex_unlock(&d->mutex); } } free(frame->payload); return; } if (frame->message_type == PROTO_MSG_CONTROL_REQUEST) { struct Proto_Request_Header hdr; struct App_Error e = proto_read_request_header( frame->payload, frame->payload_length, &hdr); if (!APP_IS_OK(e)) { free(frame->payload); return; } switch (hdr.command) { case PROTO_CMD_ENUM_DEVICES: handle_enum_devices(node, conn, hdr.request_id); break; case PROTO_CMD_ENUM_CONTROLS: handle_enum_controls(node, conn, frame->payload, frame->payload_length); break; case PROTO_CMD_GET_CONTROL: handle_get_control(node, conn, frame->payload, frame->payload_length); break; case PROTO_CMD_SET_CONTROL: handle_set_control(node, conn, frame->payload, frame->payload_length); break; case PROTO_CMD_START_INGEST: handle_start_ingest(node, conn, frame->payload, frame->payload_length); break; case PROTO_CMD_STOP_INGEST: handle_stop_ingest(node, conn, frame->payload, frame->payload_length); break; case PROTO_CMD_START_DISPLAY: handle_start_display(node, conn, frame->payload, frame->payload_length); break; case PROTO_CMD_STOP_DISPLAY: handle_stop_display(node, conn, frame->payload, frame->payload_length); break; case PROTO_CMD_STREAM_OPEN: /* A sender is opening a stream on this node (sink role) */ proto_write_control_response(conn, hdr.request_id, PROTO_STATUS_OK, NULL, 0); break; default: proto_write_control_response(conn, hdr.request_id, PROTO_STATUS_UNKNOWN_CMD, NULL, 0); break; } } free(frame->payload); } static void on_connect(struct Transport_Conn *conn, void *userdata) { (void)conn; (void)userdata; printf("peer connected\n"); } static void on_disconnect(struct Transport_Conn *conn, void *userdata) { (void)conn; (void)userdata; printf("peer disconnected\n"); } /* ------------------------------------------------------------------------- * Config schema * ------------------------------------------------------------------------- */ static const struct Config_Flag_Def function_flag_defs[] = { { "source", DISCOVERY_FLAG_SOURCE }, { "relay", DISCOVERY_FLAG_RELAY }, { "sink", DISCOVERY_FLAG_SINK }, { "controller", DISCOVERY_FLAG_CONTROLLER }, { NULL, 0 } }; static const struct Config_Def schema[] = { { "node", "name", CONFIG_STRING, "unnamed:0", NULL }, { "node", "site_id", CONFIG_UINT16, "0", NULL }, { "node", "tcp_port", CONFIG_UINT16, "0", NULL }, { "node", "function", CONFIG_FLAGS, "source", function_flag_defs }, { "discovery", "interval_ms", CONFIG_UINT32, "5000", NULL }, { "discovery", "timeout_intervals", CONFIG_UINT32, "3", NULL }, { "transport", "max_connections", CONFIG_UINT32, "16", NULL }, { "transport", "max_payload", CONFIG_UINT32, "16777216", NULL }, { NULL } }; /* ------------------------------------------------------------------------- * Entry point * ------------------------------------------------------------------------- */ static void usage(void) { fprintf(stderr, "usage: video-node [--port PORT] \n" " video-node [--port PORT] --defaults\n" "\n" " --port PORT override tcp_port from config (0 = OS-assigned random port)\n"); } int main(int argc, char **argv) { int port_override = -1; /* -1 = not set */ int argi = 1; while (argi < argc && argv[argi][0] == '-') { if (strcmp(argv[argi], "--port") == 0 && argi + 1 < argc) { port_override = atoi(argv[++argi]); argi++; } else { break; } } if (argi >= argc) { usage(); return 1; } struct Node node; memset(&node, 0, sizeof(node)); /* Load config */ struct App_Error e; if (strcmp(argv[argi], "--defaults") == 0) { e = config_defaults(&node.config, schema); } else { e = config_load(&node.config, argv[argi], schema); } if (!APP_IS_OK(e)) { app_error_print(&e); return 1; } uint16_t tcp_port = port_override >= 0 ? (uint16_t)port_override : config_get_u16(node.config, "node", "tcp_port"); uint16_t site_id = config_get_u16(node.config, "node", "site_id"); uint32_t func = config_get_flags(node.config, "node", "function"); const char *name = config_get_str(node.config, "node", "name"); uint32_t interval = config_get_u32(node.config, "discovery", "interval_ms"); uint32_t timeout_i = config_get_u32(node.config, "discovery", "timeout_intervals"); uint32_t max_conn = config_get_u32(node.config, "transport", "max_connections"); uint32_t max_pay = config_get_u32(node.config, "transport", "max_payload"); /* Enumerate devices */ printf("scanning devices...\n"); build_device_list(&node.devices); printf("found %d media device(s), %d video node(s)\n", node.devices.media_count, node.devices.vnode_count); /* Reconciler */ node.reconciler = reconciler_create(); if (!node.reconciler) { fprintf(stderr, "failed to create reconciler\n"); return 1; } reconciler_set_log(node.reconciler, on_reconciler_log, NULL); pthread_mutex_init(&node.reconciler_mutex, NULL); /* Pre-allocate stream slots */ for (int i = 0; i < MAX_STREAMS; i++) { stream_slot_init(&node, i); } /* Initialise display slots */ display_slots_init(&node); /* Transport server */ struct Transport_Server_Config srv_cfg = { .port = tcp_port, .max_connections = (int)max_conn, .max_payload = max_pay, .on_frame = on_frame, .on_connect = on_connect, .on_disconnect = on_disconnect, .userdata = &node, }; e = transport_server_create(&node.server, &srv_cfg); if (!APP_IS_OK(e)) { app_error_print(&e); return 1; } e = transport_server_start(node.server); if (!APP_IS_OK(e)) { app_error_print(&e); return 1; } /* Read back actual port (relevant when tcp_port was 0) */ tcp_port = transport_server_get_port(node.server); printf("node: %s port=%u site=%u\n", name, tcp_port, site_id); /* Discovery */ struct Discovery_Config disc_cfg = { .site_id = site_id, .tcp_port = tcp_port, .function_flags = (uint16_t)func, .name = name, .interval_ms = interval, .timeout_intervals = timeout_i, .on_peer_found = NULL, .on_peer_lost = NULL, }; e = discovery_create(&node.discovery, &disc_cfg); if (!APP_IS_OK(e)) { app_error_print(&e); return 1; } e = discovery_start(node.discovery); if (!APP_IS_OK(e)) { app_error_print(&e); return 1; } /* Periodic reconciler tick */ pthread_t timer_thread; pthread_create(&timer_thread, NULL, reconciler_timer, &node); printf("ready\n"); /* Main thread: GL event loop (required by GLFW) when xorg is available, * or a simple idle wait when running headless. */ if (xorg_available()) { while (1) { display_loop_tick(&node); /* Throttle to ~200 Hz when displays are active, 10 Hz otherwise */ usleep(any_display_active(&node) ? 5000 : 100000); } } else { pthread_join(timer_thread, NULL); } return 0; }