Compare commits
8 Commits
4e40223478
...
b7e87ceb46
| Author | SHA1 | Date | |
|---|---|---|---|
| b7e87ceb46 | |||
| f5764940e6 | |||
| 32d31cbd1e | |||
| 28216999e0 | |||
| a2f438bbbb | |||
| 6747c9e00d | |||
| 6c9e0ce7dc | |||
| 639a84b1b9 |
@@ -12,6 +12,7 @@ CONFIG_OBJ = $(BUILD)/config/config.o
|
||||
PROTOCOL_OBJ = $(BUILD)/protocol/protocol.o
|
||||
TEST_IMAGE_OBJ = $(BUILD)/test_image/test_image.o
|
||||
XORG_OBJ = $(BUILD)/xorg/xorg.o
|
||||
RECONCILER_OBJ = $(BUILD)/reconciler/reconciler.o
|
||||
|
||||
CLI_SRCS = \
|
||||
media_ctrl_cli.c \
|
||||
@@ -25,7 +26,9 @@ CLI_SRCS = \
|
||||
xorg_cli.c \
|
||||
v4l2_view_cli.c \
|
||||
stream_send_cli.c \
|
||||
stream_recv_cli.c
|
||||
stream_recv_cli.c \
|
||||
reconciler_cli.c \
|
||||
controller_cli.c
|
||||
|
||||
CLI_OBJS = $(CLI_SRCS:%.c=$(CLI_BUILD)/%.o)
|
||||
|
||||
@@ -43,19 +46,24 @@ all: \
|
||||
$(CLI_BUILD)/xorg_cli \
|
||||
$(CLI_BUILD)/v4l2_view_cli \
|
||||
$(CLI_BUILD)/stream_send_cli \
|
||||
$(CLI_BUILD)/stream_recv_cli
|
||||
$(CLI_BUILD)/stream_recv_cli \
|
||||
$(CLI_BUILD)/reconciler_cli \
|
||||
$(CLI_BUILD)/controller_cli
|
||||
|
||||
# Module objects delegate to their sub-makes.
|
||||
$(COMMON_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/common
|
||||
$(MEDIA_CTRL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/media_ctrl
|
||||
$(V4L2_CTRL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl
|
||||
$(SERIAL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/serial
|
||||
$(TRANSPORT_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/transport
|
||||
$(DISCOVERY_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/discovery
|
||||
$(CONFIG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/config
|
||||
$(PROTOCOL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/protocol
|
||||
$(TEST_IMAGE_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/test_image
|
||||
$(XORG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/xorg
|
||||
# 'force' ensures the sub-make is always invoked so it can check source timestamps itself.
|
||||
.PHONY: force
|
||||
$(COMMON_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/common
|
||||
$(MEDIA_CTRL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/media_ctrl
|
||||
$(V4L2_CTRL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl
|
||||
$(SERIAL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/serial
|
||||
$(TRANSPORT_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/transport
|
||||
$(DISCOVERY_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/discovery
|
||||
$(CONFIG_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/config
|
||||
$(PROTOCOL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/protocol
|
||||
$(TEST_IMAGE_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/test_image
|
||||
$(XORG_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/xorg
|
||||
$(RECONCILER_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/reconciler
|
||||
|
||||
# Compile each CLI source to its own .o (generates .d alongside).
|
||||
$(CLI_BUILD)/%.o: %.c | $(CLI_BUILD)
|
||||
@@ -98,6 +106,12 @@ $(CLI_BUILD)/stream_send_cli: $(CLI_BUILD)/stream_send_cli.o $(COMMON_OBJ) $(SER
|
||||
$(CLI_BUILD)/stream_recv_cli: $(CLI_BUILD)/stream_recv_cli.o $(COMMON_OBJ) $(SERIAL_OBJ) $(TRANSPORT_OBJ) $(PROTOCOL_OBJ) $(XORG_OBJ)
|
||||
$(CC) $(CFLAGS) -o $@ $^ -lpthread $(PKG_LDFLAGS)
|
||||
|
||||
$(CLI_BUILD)/reconciler_cli: $(CLI_BUILD)/reconciler_cli.o $(RECONCILER_OBJ)
|
||||
$(CC) $(CFLAGS) -o $@ $^
|
||||
|
||||
$(CLI_BUILD)/controller_cli: $(CLI_BUILD)/controller_cli.o $(COMMON_OBJ) $(SERIAL_OBJ) $(TRANSPORT_OBJ) $(PROTOCOL_OBJ)
|
||||
$(CC) $(CFLAGS) -o $@ $^ -lpthread
|
||||
|
||||
$(CLI_BUILD):
|
||||
mkdir -p $@
|
||||
|
||||
@@ -116,6 +130,8 @@ clean:
|
||||
$(CLI_BUILD)/xorg_cli \
|
||||
$(CLI_BUILD)/v4l2_view_cli \
|
||||
$(CLI_BUILD)/stream_send_cli \
|
||||
$(CLI_BUILD)/stream_recv_cli
|
||||
$(CLI_BUILD)/stream_recv_cli \
|
||||
$(CLI_BUILD)/reconciler_cli \
|
||||
$(CLI_BUILD)/controller_cli
|
||||
|
||||
-include $(CLI_OBJS:%.o=%.d)
|
||||
|
||||
463
dev/cli/controller_cli.c
Normal file
463
dev/cli/controller_cli.c
Normal file
@@ -0,0 +1,463 @@
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <ctype.h>
|
||||
#include <semaphore.h>
|
||||
|
||||
#include "transport.h"
|
||||
#include "protocol.h"
|
||||
#include "error.h"
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Shared state between REPL and transport read thread
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
struct Ctrl_State {
|
||||
sem_t sem;
|
||||
uint16_t pending_cmd;
|
||||
uint16_t last_status;
|
||||
int32_t last_value; /* GET_CONTROL response */
|
||||
};
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Response display helpers — reused across commands
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
static void caps_str(uint32_t caps, char *buf, size_t len)
|
||||
{
|
||||
static const struct { uint32_t bit; const char *name; } flags[] = {
|
||||
{ 0x00000001u, "video-capture" },
|
||||
{ 0x00000002u, "video-output" },
|
||||
{ 0x00800000u, "meta-capture" },
|
||||
{ 0x04000000u, "streaming" },
|
||||
};
|
||||
buf[0] = '\0';
|
||||
size_t pos = 0;
|
||||
for (size_t i = 0; i < sizeof(flags)/sizeof(flags[0]); i++) {
|
||||
if (!(caps & flags[i].bit)) { continue; }
|
||||
int n = snprintf(buf + pos, len - pos, "%s%s",
|
||||
pos ? "," : "", flags[i].name);
|
||||
if (n < 0 || (size_t)n >= len - pos) { break; }
|
||||
pos += (size_t)n;
|
||||
}
|
||||
}
|
||||
|
||||
static void on_media_device(
|
||||
const char *path, uint8_t path_len,
|
||||
const char *driver, uint8_t driver_len,
|
||||
const char *model, uint8_t model_len,
|
||||
const char *bus_info, uint8_t bus_info_len,
|
||||
uint8_t vcount, void *ud)
|
||||
{
|
||||
(void)ud;
|
||||
printf(" media %.*s driver=%.*s model=%.*s bus=%.*s (%u video node(s))\n",
|
||||
(int)path_len, path,
|
||||
(int)driver_len, driver,
|
||||
(int)model_len, model,
|
||||
(int)bus_info_len, bus_info,
|
||||
(unsigned)vcount);
|
||||
}
|
||||
|
||||
static void on_video_node(
|
||||
const char *path, uint8_t path_len,
|
||||
const char *ename, uint8_t ename_len,
|
||||
uint32_t etype, uint32_t eflags,
|
||||
uint32_t dcaps,
|
||||
uint8_t pflags, uint8_t is_capture,
|
||||
void *ud)
|
||||
{
|
||||
(void)eflags; (void)pflags; (void)ud;
|
||||
char caps[128];
|
||||
caps_str(dcaps, caps, sizeof(caps));
|
||||
printf(" video %.*s entity=%.*s type=0x%08x caps=[%s]%s\n",
|
||||
(int)path_len, path,
|
||||
(int)ename_len, ename,
|
||||
etype, caps,
|
||||
is_capture ? " [capture]" : "");
|
||||
}
|
||||
|
||||
static void on_standalone(
|
||||
const char *path, uint8_t path_len,
|
||||
const char *name, uint8_t name_len,
|
||||
void *ud)
|
||||
{
|
||||
(void)ud;
|
||||
printf(" standalone %.*s card=%.*s\n",
|
||||
(int)path_len, path,
|
||||
(int)name_len, name);
|
||||
}
|
||||
|
||||
static void on_control(
|
||||
uint32_t id, uint8_t type, uint32_t flags,
|
||||
const char *name, uint8_t name_len,
|
||||
int32_t min, int32_t max, int32_t step,
|
||||
int32_t default_val, int32_t current_val,
|
||||
uint8_t menu_count, void *ud)
|
||||
{
|
||||
(void)flags; (void)ud;
|
||||
printf(" ctrl id=0x%08x type=%u %.*s"
|
||||
" min=%d max=%d step=%d default=%d current=%d",
|
||||
id, type,
|
||||
(int)name_len, name,
|
||||
min, max, step, default_val, current_val);
|
||||
if (menu_count) { printf(" (%u menu items)", (unsigned)menu_count); }
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
static void on_menu_item(
|
||||
uint32_t index,
|
||||
const char *name, uint8_t name_len,
|
||||
int64_t int_value,
|
||||
void *ud)
|
||||
{
|
||||
(void)ud;
|
||||
printf(" menu %u %.*s val=%lld\n",
|
||||
index,
|
||||
(int)name_len, name,
|
||||
(long long)int_value);
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Transport callbacks
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
static void on_frame(struct Transport_Conn *conn,
|
||||
struct Transport_Frame *frame, void *userdata)
|
||||
{
|
||||
(void)conn;
|
||||
struct Ctrl_State *cs = userdata;
|
||||
|
||||
if (frame->message_type != PROTO_MSG_CONTROL_RESPONSE) {
|
||||
free(frame->payload);
|
||||
return;
|
||||
}
|
||||
|
||||
switch (cs->pending_cmd) {
|
||||
case PROTO_CMD_ENUM_DEVICES: {
|
||||
struct Proto_Response_Header hdr;
|
||||
struct App_Error e = proto_read_enum_devices_response(
|
||||
frame->payload, frame->payload_length, &hdr,
|
||||
on_media_device, on_video_node, on_standalone, NULL);
|
||||
if (!APP_IS_OK(e)) { app_error_print(&e); }
|
||||
else if (hdr.status != PROTO_STATUS_OK) {
|
||||
fprintf(stderr, "ENUM_DEVICES: status=%u\n", hdr.status);
|
||||
}
|
||||
cs->last_status = hdr.status;
|
||||
break;
|
||||
}
|
||||
case PROTO_CMD_ENUM_CONTROLS: {
|
||||
struct Proto_Response_Header hdr;
|
||||
struct App_Error e = proto_read_enum_controls_response(
|
||||
frame->payload, frame->payload_length, &hdr,
|
||||
on_control, on_menu_item, NULL);
|
||||
if (!APP_IS_OK(e)) { app_error_print(&e); }
|
||||
else if (hdr.status != PROTO_STATUS_OK) {
|
||||
fprintf(stderr, "ENUM_CONTROLS: status=%u\n", hdr.status);
|
||||
}
|
||||
cs->last_status = hdr.status;
|
||||
break;
|
||||
}
|
||||
case PROTO_CMD_GET_CONTROL: {
|
||||
struct Proto_Get_Control_Resp resp;
|
||||
struct App_Error e = proto_read_get_control_response(
|
||||
frame->payload, frame->payload_length, &resp);
|
||||
if (!APP_IS_OK(e)) { app_error_print(&e); }
|
||||
else if (resp.status == PROTO_STATUS_OK) {
|
||||
printf(" value = %d\n", resp.value);
|
||||
} else {
|
||||
fprintf(stderr, "GET_CONTROL: status=%u\n", resp.status);
|
||||
}
|
||||
cs->last_status = resp.status;
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
/* Generic response: just read request_id + status */
|
||||
struct Proto_Response_Header hdr;
|
||||
struct App_Error e = proto_read_response_header(
|
||||
frame->payload, frame->payload_length, &hdr);
|
||||
if (!APP_IS_OK(e)) { app_error_print(&e); }
|
||||
else if (hdr.status != PROTO_STATUS_OK) {
|
||||
fprintf(stderr, "command 0x%04x: status=%u\n",
|
||||
cs->pending_cmd, hdr.status);
|
||||
} else {
|
||||
printf(" ok\n");
|
||||
}
|
||||
cs->last_status = APP_IS_OK(e) ? hdr.status : PROTO_STATUS_ERROR;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
free(frame->payload);
|
||||
sem_post(&cs->sem);
|
||||
}
|
||||
|
||||
static void on_disconnect(struct Transport_Conn *conn, void *userdata)
|
||||
{
|
||||
(void)conn; (void)userdata;
|
||||
printf("disconnected from node\n");
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Request helpers
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
static uint16_t next_req_id(uint16_t *counter)
|
||||
{
|
||||
return ++(*counter);
|
||||
}
|
||||
|
||||
/* Send a request, set pending_cmd, wait for response */
|
||||
#define SEND_AND_WAIT(cs, cmd, send_expr) do { \
|
||||
(cs)->pending_cmd = (cmd); \
|
||||
struct App_Error _e = (send_expr); \
|
||||
if (!APP_IS_OK(_e)) { app_error_print(&_e); break; } \
|
||||
sem_wait(&(cs)->sem); \
|
||||
} while (0)
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* REPL command implementations
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
static void cmd_enum_devices(struct Transport_Conn *conn,
|
||||
struct Ctrl_State *cs, uint16_t *req)
|
||||
{
|
||||
printf("devices:\n");
|
||||
SEND_AND_WAIT(cs, PROTO_CMD_ENUM_DEVICES,
|
||||
proto_write_enum_devices(conn, next_req_id(req)));
|
||||
}
|
||||
|
||||
static void cmd_enum_controls(struct Transport_Conn *conn,
|
||||
struct Ctrl_State *cs, uint16_t *req,
|
||||
const char *idx_str)
|
||||
{
|
||||
int idx = atoi(idx_str);
|
||||
printf("controls for device %d:\n", idx);
|
||||
SEND_AND_WAIT(cs, PROTO_CMD_ENUM_CONTROLS,
|
||||
proto_write_enum_controls(conn, next_req_id(req), (uint16_t)idx));
|
||||
}
|
||||
|
||||
static void cmd_get_control(struct Transport_Conn *conn,
|
||||
struct Ctrl_State *cs, uint16_t *req,
|
||||
const char *idx_str, const char *id_str)
|
||||
{
|
||||
int idx = atoi(idx_str);
|
||||
uint32_t id = (uint32_t)strtoul(id_str, NULL, 0);
|
||||
printf("get control 0x%08x on device %d:\n", id, idx);
|
||||
SEND_AND_WAIT(cs, PROTO_CMD_GET_CONTROL,
|
||||
proto_write_get_control(conn, next_req_id(req), (uint16_t)idx, id));
|
||||
}
|
||||
|
||||
static void cmd_set_control(struct Transport_Conn *conn,
|
||||
struct Ctrl_State *cs, uint16_t *req,
|
||||
const char *idx_str, const char *id_str, const char *val_str)
|
||||
{
|
||||
int idx = atoi(idx_str);
|
||||
uint32_t id = (uint32_t)strtoul(id_str, NULL, 0);
|
||||
int32_t val = (int32_t)atoi(val_str);
|
||||
SEND_AND_WAIT(cs, PROTO_CMD_SET_CONTROL,
|
||||
proto_write_set_control(conn, next_req_id(req), (uint16_t)idx, id, val));
|
||||
}
|
||||
|
||||
static void cmd_start_ingest(struct Transport_Conn *conn,
|
||||
struct Ctrl_State *cs, uint16_t *req,
|
||||
int ntok, char *tokens[])
|
||||
{
|
||||
/* Required: stream_id device dest_host dest_port
|
||||
* Optional: format width height fps_n fps_d */
|
||||
if (ntok < 5) {
|
||||
printf("usage: start-ingest <stream_id> <device> <dest_host> <dest_port>"
|
||||
" [format] [width] [height] [fps_n] [fps_d]\n"
|
||||
" format: 0=auto 1=mjpeg (default 0)\n");
|
||||
return;
|
||||
}
|
||||
|
||||
uint16_t stream_id = (uint16_t)atoi(tokens[1]);
|
||||
const char *device = tokens[2];
|
||||
const char *host = tokens[3];
|
||||
uint16_t port = (uint16_t)atoi(tokens[4]);
|
||||
|
||||
uint16_t format = ntok > 5 ? (uint16_t)atoi(tokens[5]) : 0;
|
||||
uint16_t width = ntok > 6 ? (uint16_t)atoi(tokens[6]) : 0;
|
||||
uint16_t height = ntok > 7 ? (uint16_t)atoi(tokens[7]) : 0;
|
||||
uint16_t fps_n = ntok > 8 ? (uint16_t)atoi(tokens[8]) : 0;
|
||||
uint16_t fps_d = ntok > 9 ? (uint16_t)atoi(tokens[9]) : 1;
|
||||
|
||||
printf("start-ingest: stream=%u device=%s dest=%s:%u"
|
||||
" format=%u %ux%u fps=%u/%u\n",
|
||||
stream_id, device, host, port, format, width, height, fps_n, fps_d);
|
||||
|
||||
SEND_AND_WAIT(cs, PROTO_CMD_START_INGEST,
|
||||
proto_write_start_ingest(conn, next_req_id(req),
|
||||
stream_id, format, width, height, fps_n, fps_d,
|
||||
PROTO_TRANSPORT_ENCAPSULATED, device, host, port));
|
||||
}
|
||||
|
||||
static void cmd_stop_ingest(struct Transport_Conn *conn,
|
||||
struct Ctrl_State *cs, uint16_t *req,
|
||||
const char *sid_str)
|
||||
{
|
||||
uint16_t stream_id = (uint16_t)atoi(sid_str);
|
||||
printf("stop-ingest: stream=%u\n", stream_id);
|
||||
SEND_AND_WAIT(cs, PROTO_CMD_STOP_INGEST,
|
||||
proto_write_stop_ingest(conn, next_req_id(req), stream_id));
|
||||
}
|
||||
|
||||
static void cmd_start_display(struct Transport_Conn *conn,
|
||||
struct Ctrl_State *cs, uint16_t *req,
|
||||
int ntok, char *tokens[])
|
||||
{
|
||||
/* Required: stream_id
|
||||
* Optional: win_x win_y win_w win_h */
|
||||
if (ntok < 2) {
|
||||
printf("usage: start-display <stream_id> [win_x] [win_y] [win_w] [win_h]\n");
|
||||
return;
|
||||
}
|
||||
uint16_t stream_id = (uint16_t)atoi(tokens[1]);
|
||||
int16_t win_x = ntok > 2 ? (int16_t)atoi(tokens[2]) : 0;
|
||||
int16_t win_y = ntok > 3 ? (int16_t)atoi(tokens[3]) : 0;
|
||||
uint16_t win_w = ntok > 4 ? (uint16_t)atoi(tokens[4]) : 0;
|
||||
uint16_t win_h = ntok > 5 ? (uint16_t)atoi(tokens[5]) : 0;
|
||||
printf("start-display: stream=%u pos=%d,%d size=%ux%u\n",
|
||||
stream_id, win_x, win_y, win_w, win_h);
|
||||
SEND_AND_WAIT(cs, PROTO_CMD_START_DISPLAY,
|
||||
proto_write_start_display(conn, next_req_id(req),
|
||||
stream_id, win_x, win_y, win_w, win_h,
|
||||
PROTO_DISPLAY_SCALE_FIT, PROTO_DISPLAY_ANCHOR_CENTER));
|
||||
}
|
||||
|
||||
static void cmd_stop_display(struct Transport_Conn *conn,
|
||||
struct Ctrl_State *cs, uint16_t *req,
|
||||
const char *sid_str)
|
||||
{
|
||||
uint16_t stream_id = (uint16_t)atoi(sid_str);
|
||||
printf("stop-display: stream=%u\n", stream_id);
|
||||
SEND_AND_WAIT(cs, PROTO_CMD_STOP_DISPLAY,
|
||||
proto_write_stop_display(conn, next_req_id(req), stream_id));
|
||||
}
|
||||
|
||||
static void cmd_help(void)
|
||||
{
|
||||
printf("commands:\n"
|
||||
" enum-devices\n"
|
||||
" enum-controls <device_index>\n"
|
||||
" get-control <device_index> <control_id_hex>\n"
|
||||
" set-control <device_index> <control_id_hex> <value>\n"
|
||||
" start-ingest <stream_id> <device> <dest_host> <dest_port>"
|
||||
" [format] [width] [height] [fps_n] [fps_d]\n"
|
||||
" stop-ingest <stream_id>\n"
|
||||
" start-display <stream_id> [win_x] [win_y] [win_w] [win_h]\n"
|
||||
" stop-display <stream_id>\n"
|
||||
" help\n"
|
||||
" quit / exit\n");
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Entry point
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
static void usage(void)
|
||||
{
|
||||
fprintf(stderr,
|
||||
"usage: controller_cli --host HOST [--port PORT]\n"
|
||||
"\n"
|
||||
" Interactive controller for a video node.\n"
|
||||
" --host HOST node hostname or IP (required)\n"
|
||||
" --port PORT node TCP port (default 8000)\n");
|
||||
}
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
const char *host = NULL;
|
||||
uint16_t port = 8000;
|
||||
|
||||
for (int i = 1; i < argc; i++) {
|
||||
if (strcmp(argv[i], "--host") == 0 && i + 1 < argc) {
|
||||
host = argv[++i];
|
||||
} else if (strcmp(argv[i], "--port") == 0 && i + 1 < argc) {
|
||||
port = (uint16_t)atoi(argv[++i]);
|
||||
} else {
|
||||
usage(); return 1;
|
||||
}
|
||||
}
|
||||
if (!host) { usage(); return 1; }
|
||||
|
||||
/* Connect */
|
||||
struct Ctrl_State cs;
|
||||
memset(&cs, 0, sizeof(cs));
|
||||
sem_init(&cs.sem, 0, 0);
|
||||
|
||||
struct Transport_Conn *conn;
|
||||
struct App_Error e = transport_connect(&conn, host, port,
|
||||
TRANSPORT_DEFAULT_MAX_PAYLOAD, on_frame, on_disconnect, &cs);
|
||||
if (!APP_IS_OK(e)) { app_error_print(&e); return 1; }
|
||||
|
||||
printf("connected to %s:%u\n\n", host, port);
|
||||
cmd_help();
|
||||
printf("\n");
|
||||
|
||||
/* REPL */
|
||||
uint16_t req_id = 0;
|
||||
char line[512];
|
||||
|
||||
while (1) {
|
||||
printf("> ");
|
||||
fflush(stdout);
|
||||
|
||||
if (fgets(line, sizeof(line), stdin) == NULL) { break; }
|
||||
|
||||
/* Strip trailing newline */
|
||||
size_t len = strlen(line);
|
||||
while (len > 0 && (line[len-1] == '\n' || line[len-1] == '\r')) {
|
||||
line[--len] = '\0';
|
||||
}
|
||||
|
||||
/* Tokenise (up to 12 tokens) */
|
||||
char *tokens[12];
|
||||
int ntok = 0;
|
||||
char *p = line;
|
||||
|
||||
while (*p && ntok < 12) {
|
||||
while (*p == ' ' || *p == '\t') { p++; }
|
||||
if (!*p) { break; }
|
||||
tokens[ntok++] = p;
|
||||
while (*p && *p != ' ' && *p != '\t') { p++; }
|
||||
if (*p) { *p++ = '\0'; }
|
||||
}
|
||||
if (ntok == 0) { continue; }
|
||||
|
||||
const char *cmd = tokens[0];
|
||||
|
||||
if (strcmp(cmd, "quit") == 0 || strcmp(cmd, "exit") == 0) {
|
||||
break;
|
||||
} else if (strcmp(cmd, "help") == 0) {
|
||||
cmd_help();
|
||||
} else if (strcmp(cmd, "enum-devices") == 0) {
|
||||
cmd_enum_devices(conn, &cs, &req_id);
|
||||
} else if (strcmp(cmd, "enum-controls") == 0) {
|
||||
if (ntok < 2) { printf("usage: enum-controls <device_index>\n"); }
|
||||
else { cmd_enum_controls(conn, &cs, &req_id, tokens[1]); }
|
||||
} else if (strcmp(cmd, "get-control") == 0) {
|
||||
if (ntok < 3) { printf("usage: get-control <device_index> <control_id>\n"); }
|
||||
else { cmd_get_control(conn, &cs, &req_id, tokens[1], tokens[2]); }
|
||||
} else if (strcmp(cmd, "set-control") == 0) {
|
||||
if (ntok < 4) { printf("usage: set-control <device_index> <control_id> <value>\n"); }
|
||||
else { cmd_set_control(conn, &cs, &req_id, tokens[1], tokens[2], tokens[3]); }
|
||||
} else if (strcmp(cmd, "start-ingest") == 0) {
|
||||
cmd_start_ingest(conn, &cs, &req_id, ntok, tokens);
|
||||
} else if (strcmp(cmd, "stop-ingest") == 0) {
|
||||
if (ntok < 2) { printf("usage: stop-ingest <stream_id>\n"); }
|
||||
else { cmd_stop_ingest(conn, &cs, &req_id, tokens[1]); }
|
||||
} else if (strcmp(cmd, "start-display") == 0) {
|
||||
cmd_start_display(conn, &cs, &req_id, ntok, tokens);
|
||||
} else if (strcmp(cmd, "stop-display") == 0) {
|
||||
if (ntok < 2) { printf("usage: stop-display <stream_id>\n"); }
|
||||
else { cmd_stop_display(conn, &cs, &req_id, tokens[1]); }
|
||||
} else {
|
||||
printf("unknown command: %s (type 'help' for commands)\n", cmd);
|
||||
}
|
||||
}
|
||||
|
||||
transport_conn_close(conn);
|
||||
sem_destroy(&cs.sem);
|
||||
return 0;
|
||||
}
|
||||
456
dev/cli/reconciler_cli.c
Normal file
456
dev/cli/reconciler_cli.c
Normal file
@@ -0,0 +1,456 @@
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <ctype.h>
|
||||
#include "reconciler.h"
|
||||
|
||||
/* -----------------------------------------------------------------------
|
||||
* Simulated resource userdata
|
||||
* ----------------------------------------------------------------------- */
|
||||
|
||||
struct Sim_State {
|
||||
const char *name;
|
||||
int fail_next;
|
||||
};
|
||||
|
||||
static int sim_action(struct Sim_State *s, const char *action_name) {
|
||||
printf(" [%s] %s\n", s->name, action_name);
|
||||
if (s->fail_next) {
|
||||
s->fail_next = 0;
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* device transitions */
|
||||
static int device_open(void *ud) { return sim_action((struct Sim_State *)ud, "opening device"); }
|
||||
static int device_close(void *ud) { return sim_action((struct Sim_State *)ud, "closing device"); }
|
||||
static int device_start(void *ud) { return sim_action((struct Sim_State *)ud, "starting capture"); }
|
||||
static int device_stop(void *ud) { return sim_action((struct Sim_State *)ud, "stopping capture"); }
|
||||
|
||||
/* transport transitions */
|
||||
static int transport_connect(void *ud) { return sim_action((struct Sim_State *)ud, "connecting transport"); }
|
||||
static int transport_disconnect(void *ud) { return sim_action((struct Sim_State *)ud, "disconnecting transport"); }
|
||||
|
||||
/* stream transitions */
|
||||
static int stream_activate(void *ud) { return sim_action((struct Sim_State *)ud, "activating stream"); }
|
||||
static int stream_deactivate(void *ud) { return sim_action((struct Sim_State *)ud, "deactivating stream"); }
|
||||
|
||||
/* -----------------------------------------------------------------------
|
||||
* Log callback
|
||||
* ----------------------------------------------------------------------- */
|
||||
|
||||
static void on_log(
|
||||
const struct Rec_Resource *res,
|
||||
int from, int to, int success,
|
||||
void *userdata)
|
||||
{
|
||||
(void)userdata;
|
||||
|
||||
const char *from_name = reconciler_state_name(res, from);
|
||||
const char *to_name = reconciler_state_name(res, to);
|
||||
|
||||
if (from_name != NULL && to_name != NULL) {
|
||||
printf(" [%s] %s -> %s ... %s\n",
|
||||
reconciler_get_name(res),
|
||||
from_name, to_name,
|
||||
success ? "ok" : "FAILED");
|
||||
} else {
|
||||
printf(" [%s] %d -> %d ... %s\n",
|
||||
reconciler_get_name(res),
|
||||
from, to,
|
||||
success ? "ok" : "FAILED");
|
||||
}
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------
|
||||
* Helpers
|
||||
* ----------------------------------------------------------------------- */
|
||||
|
||||
static const char *status_label(Rec_Status s) {
|
||||
switch (s) {
|
||||
case REC_STATUS_STABLE: return "stable";
|
||||
case REC_STATUS_WORKING: return "working";
|
||||
case REC_STATUS_BLOCKED: return "blocked";
|
||||
case REC_STATUS_NO_PATH: return "no_path";
|
||||
}
|
||||
return "?";
|
||||
}
|
||||
|
||||
/*
|
||||
* Find the first unsatisfied dep for a resource.
|
||||
* Returns NULL if none (or resource is not blocked).
|
||||
*/
|
||||
static void print_blocked_reason(const struct Rec_Resource *res) {
|
||||
/* We need access to internals — expose via a helper approach.
|
||||
* Since we can't access internal deps from outside the module,
|
||||
* we rely on reconciler_get_status returning BLOCKED and print
|
||||
* a generic message. The CLI has access to the sim resources
|
||||
* directly so we can check ourselves using the public API. */
|
||||
(void)res;
|
||||
printf(" (dependency unsatisfied)");
|
||||
}
|
||||
|
||||
static void print_state(const struct Rec_Resource *res, int state) {
|
||||
const char *name = reconciler_state_name(res, state);
|
||||
if (name != NULL) {
|
||||
printf("%s(%d)", name, state);
|
||||
} else {
|
||||
printf("%d", state);
|
||||
}
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------
|
||||
* Blocked dependency introspection
|
||||
*
|
||||
* We track resource/dep relationships here in the CLI so we can print
|
||||
* informative blocked messages without exposing internals from the module.
|
||||
* ----------------------------------------------------------------------- */
|
||||
|
||||
#define CLI_MAX_DEPS 8
|
||||
|
||||
struct Cli_Dep {
|
||||
const struct Rec_Resource *resource;
|
||||
int blocked_below;
|
||||
const struct Rec_Resource *dep;
|
||||
int dep_min_state;
|
||||
};
|
||||
|
||||
static struct Cli_Dep cli_deps[CLI_MAX_DEPS];
|
||||
static int cli_dep_count = 0;
|
||||
|
||||
static void cli_add_dep(
|
||||
const struct Rec_Resource *resource,
|
||||
int blocked_below,
|
||||
const struct Rec_Resource *dep,
|
||||
int dep_min_state)
|
||||
{
|
||||
if (cli_dep_count >= CLI_MAX_DEPS) {
|
||||
return;
|
||||
}
|
||||
struct Cli_Dep *d = &cli_deps[cli_dep_count++];
|
||||
d->resource = resource;
|
||||
d->blocked_below = blocked_below;
|
||||
d->dep = dep;
|
||||
d->dep_min_state = dep_min_state;
|
||||
}
|
||||
|
||||
static void print_blocked_info(const struct Rec_Resource *res) {
|
||||
int wanted = reconciler_get_wanted(res);
|
||||
int current = reconciler_get_current(res);
|
||||
(void)current;
|
||||
|
||||
for (int i = 0; i < cli_dep_count; i++) {
|
||||
struct Cli_Dep *d = &cli_deps[i];
|
||||
if (d->resource != res) {
|
||||
continue;
|
||||
}
|
||||
if (wanted < d->blocked_below) {
|
||||
continue;
|
||||
}
|
||||
if (reconciler_get_current(d->dep) < d->dep_min_state) {
|
||||
printf(" [blocked: %s < ", reconciler_get_name(d->dep));
|
||||
print_state(d->dep, d->dep_min_state);
|
||||
printf("]");
|
||||
return;
|
||||
}
|
||||
}
|
||||
print_blocked_reason(res);
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------
|
||||
* Command implementations
|
||||
* ----------------------------------------------------------------------- */
|
||||
|
||||
#define MAX_RESOURCES 8
|
||||
|
||||
static struct Rec_Resource *all_resources[MAX_RESOURCES];
|
||||
static int resource_count = 0;
|
||||
|
||||
static struct Rec_Resource *find_resource(const char *name) {
|
||||
for (int i = 0; i < resource_count; i++) {
|
||||
if (strcmp(reconciler_get_name(all_resources[i]), name) == 0) {
|
||||
return all_resources[i];
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int parse_state(const struct Rec_Resource *res, const char *token) {
|
||||
/* Try numeric first. */
|
||||
char *end;
|
||||
long n = strtol(token, &end, 10);
|
||||
if (*end == '\0') {
|
||||
return (int)n;
|
||||
}
|
||||
|
||||
/* Try case-insensitive name match. */
|
||||
int state_count = 0;
|
||||
/* Iterate states 0..N-1 using reconciler_state_name. We don't know
|
||||
* state_count without internal access, so scan until NULL. */
|
||||
for (int i = 0; i < 64; i++) {
|
||||
const char *sname = reconciler_state_name(res, i);
|
||||
if (sname == NULL) {
|
||||
break;
|
||||
}
|
||||
state_count++;
|
||||
/* Case-insensitive compare. */
|
||||
int match = 1;
|
||||
size_t tlen = strlen(token);
|
||||
size_t slen = strlen(sname);
|
||||
if (tlen != slen) {
|
||||
match = 0;
|
||||
} else {
|
||||
for (size_t j = 0; j < tlen; j++) {
|
||||
if (tolower((unsigned char)token[j]) != tolower((unsigned char)sname[j])) {
|
||||
match = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (match) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
(void)state_count;
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void cmd_status(void) {
|
||||
for (int i = 0; i < resource_count; i++) {
|
||||
const struct Rec_Resource *res = all_resources[i];
|
||||
int current = reconciler_get_current(res);
|
||||
int wanted = reconciler_get_wanted(res);
|
||||
Rec_Status status = reconciler_get_status(res);
|
||||
|
||||
printf(" %-12s ", reconciler_get_name(res));
|
||||
print_state(res, current);
|
||||
printf(" wanted ");
|
||||
print_state(res, wanted);
|
||||
printf(" [%s]", status_label(status));
|
||||
|
||||
if (status == REC_STATUS_BLOCKED) {
|
||||
print_blocked_info(res);
|
||||
}
|
||||
|
||||
printf("\n");
|
||||
}
|
||||
}
|
||||
|
||||
static void cmd_want(const char *name, const char *state_token) {
|
||||
struct Rec_Resource *res = find_resource(name);
|
||||
if (res == NULL) {
|
||||
printf("unknown resource: %s\n", name);
|
||||
return;
|
||||
}
|
||||
|
||||
int state = parse_state(res, state_token);
|
||||
if (state < 0) {
|
||||
printf("unknown state: %s\n", state_token);
|
||||
return;
|
||||
}
|
||||
|
||||
reconciler_set_wanted(res, state);
|
||||
printf(" %s wanted -> ", name);
|
||||
print_state(res, state);
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
static void cmd_tick(struct Reconciler *r) {
|
||||
int n = reconciler_tick(r);
|
||||
if (n == 0) {
|
||||
printf(" (no transitions)\n");
|
||||
}
|
||||
}
|
||||
|
||||
static void cmd_run(struct Reconciler *r) {
|
||||
int max_ticks = 20;
|
||||
int tick = 0;
|
||||
|
||||
while (!reconciler_is_stable(r) && tick < max_ticks) {
|
||||
printf("tick %d:\n", tick + 1);
|
||||
int n = reconciler_tick(r);
|
||||
if (n == 0) {
|
||||
printf(" (no progress — stopping)\n");
|
||||
break;
|
||||
}
|
||||
tick++;
|
||||
}
|
||||
|
||||
if (reconciler_is_stable(r)) {
|
||||
printf("stable after %d tick(s)\n", tick);
|
||||
} else if (tick >= max_ticks) {
|
||||
printf("reached max ticks (%d) without stabilising\n", max_ticks);
|
||||
}
|
||||
}
|
||||
|
||||
static void cmd_fail(const char *name, struct Sim_State sim_states[], int sim_count) {
|
||||
for (int i = 0; i < sim_count; i++) {
|
||||
if (strcmp(sim_states[i].name, name) == 0) {
|
||||
sim_states[i].fail_next = 1;
|
||||
printf(" next action for %s will fail\n", name);
|
||||
return;
|
||||
}
|
||||
}
|
||||
printf("unknown resource: %s\n", name);
|
||||
}
|
||||
|
||||
static void cmd_help(void) {
|
||||
printf("commands:\n");
|
||||
printf(" status print all resources with current/wanted state and status\n");
|
||||
printf(" want <name> <state> set wanted state (by number or name, case-insensitive)\n");
|
||||
printf(" tick run one reconciler tick\n");
|
||||
printf(" run tick until stable (max 20 ticks)\n");
|
||||
printf(" fail <name> make the next action for this resource fail\n");
|
||||
printf(" help show this help\n");
|
||||
printf(" quit / exit exit\n");
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------
|
||||
* Main
|
||||
* ----------------------------------------------------------------------- */
|
||||
|
||||
int main(void) {
|
||||
struct Reconciler *r = reconciler_create();
|
||||
reconciler_set_log(r, on_log, NULL);
|
||||
|
||||
/* Device resource. */
|
||||
static const struct Rec_Transition device_trans[] = {
|
||||
{0, 1, device_open},
|
||||
{1, 0, device_close},
|
||||
{1, 2, device_start},
|
||||
{2, 1, device_stop},
|
||||
{-1, -1, NULL}
|
||||
};
|
||||
static const char *device_states[] = {"CLOSED", "OPEN", "STREAMING"};
|
||||
|
||||
/* Transport resource. */
|
||||
static const struct Rec_Transition transport_trans[] = {
|
||||
{0, 1, transport_connect},
|
||||
{1, 0, transport_disconnect},
|
||||
{-1, -1, NULL}
|
||||
};
|
||||
static const char *transport_states[] = {"DISCONNECTED", "CONNECTED"};
|
||||
|
||||
/* Stream resource. */
|
||||
static const struct Rec_Transition stream_trans[] = {
|
||||
{0, 1, stream_activate},
|
||||
{1, 0, stream_deactivate},
|
||||
{-1, -1, NULL}
|
||||
};
|
||||
static const char *stream_states[] = {"INACTIVE", "ACTIVE"};
|
||||
|
||||
/* Sim userdata — indexed to match resource order. */
|
||||
static struct Sim_State sim_states[] = {
|
||||
{"device", 0},
|
||||
{"transport", 0},
|
||||
{"stream", 0},
|
||||
};
|
||||
|
||||
struct Rec_Resource *device = reconciler_add_resource(r,
|
||||
"device", device_trans, 3, device_states, 0, &sim_states[0]);
|
||||
|
||||
struct Rec_Resource *transport = reconciler_add_resource(r,
|
||||
"transport", transport_trans, 2, transport_states, 0, &sim_states[1]);
|
||||
|
||||
struct Rec_Resource *stream = reconciler_add_resource(r,
|
||||
"stream", stream_trans, 2, stream_states, 0, &sim_states[2]);
|
||||
|
||||
/* Dependencies. */
|
||||
/* transport cannot reach CONNECTED(1) unless device >= OPEN(1). */
|
||||
reconciler_add_dep(transport, 1, device, 1);
|
||||
cli_add_dep(transport, 1, device, 1);
|
||||
|
||||
/* stream cannot reach ACTIVE(1) unless transport >= CONNECTED(1). */
|
||||
reconciler_add_dep(stream, 1, transport, 1);
|
||||
cli_add_dep(stream, 1, transport, 1);
|
||||
|
||||
/* stream cannot reach ACTIVE(1) unless device >= STREAMING(2). */
|
||||
reconciler_add_dep(stream, 1, device, 2);
|
||||
cli_add_dep(stream, 1, device, 2);
|
||||
|
||||
/* Register resources for lookup. */
|
||||
all_resources[resource_count++] = device;
|
||||
all_resources[resource_count++] = transport;
|
||||
all_resources[resource_count++] = stream;
|
||||
|
||||
/* Welcome. */
|
||||
printf("reconciler_cli — interactive declarative state machine demo\n\n");
|
||||
cmd_help();
|
||||
printf("\n");
|
||||
cmd_status();
|
||||
printf("\n");
|
||||
|
||||
/* REPL. */
|
||||
char line[256];
|
||||
while (1) {
|
||||
printf("> ");
|
||||
fflush(stdout);
|
||||
|
||||
if (fgets(line, sizeof(line), stdin) == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
/* Strip trailing newline. */
|
||||
size_t len = strlen(line);
|
||||
while (len > 0 && (line[len - 1] == '\n' || line[len - 1] == '\r')) {
|
||||
line[--len] = '\0';
|
||||
}
|
||||
|
||||
/* Tokenise. */
|
||||
char *tokens[4];
|
||||
int ntok = 0;
|
||||
char *p = line;
|
||||
|
||||
while (*p != '\0' && ntok < 4) {
|
||||
while (*p == ' ' || *p == '\t') {
|
||||
p++;
|
||||
}
|
||||
if (*p == '\0') {
|
||||
break;
|
||||
}
|
||||
tokens[ntok++] = p;
|
||||
while (*p != '\0' && *p != ' ' && *p != '\t') {
|
||||
p++;
|
||||
}
|
||||
if (*p != '\0') {
|
||||
*p++ = '\0';
|
||||
}
|
||||
}
|
||||
|
||||
if (ntok == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const char *cmd = tokens[0];
|
||||
|
||||
if (strcmp(cmd, "quit") == 0 || strcmp(cmd, "exit") == 0) {
|
||||
break;
|
||||
} else if (strcmp(cmd, "help") == 0) {
|
||||
cmd_help();
|
||||
} else if (strcmp(cmd, "status") == 0) {
|
||||
cmd_status();
|
||||
} else if (strcmp(cmd, "tick") == 0) {
|
||||
cmd_tick(r);
|
||||
} else if (strcmp(cmd, "run") == 0) {
|
||||
cmd_run(r);
|
||||
} else if (strcmp(cmd, "want") == 0) {
|
||||
if (ntok < 3) {
|
||||
printf("usage: want <name> <state>\n");
|
||||
} else {
|
||||
cmd_want(tokens[1], tokens[2]);
|
||||
}
|
||||
} else if (strcmp(cmd, "fail") == 0) {
|
||||
if (ntok < 2) {
|
||||
printf("usage: fail <name>\n");
|
||||
} else {
|
||||
cmd_fail(tokens[1], sim_states, 3);
|
||||
}
|
||||
} else {
|
||||
printf("unknown command: %s (type 'help' for commands)\n", cmd);
|
||||
}
|
||||
}
|
||||
|
||||
reconciler_destroy(r);
|
||||
return 0;
|
||||
}
|
||||
107
docs/protocol.md
107
docs/protocol.md
@@ -99,6 +99,10 @@ packet-beta
|
||||
| `0x0005` | `GET_CONTROL` | Get a V4L2 control value |
|
||||
| `0x0006` | `SET_CONTROL` | Set a V4L2 control value |
|
||||
| `0x0007` | `ENUM_MONITORS` | List X11 monitors (XRandR) on the remote node |
|
||||
| `0x0008` | `START_INGEST` | Set wanted state: open V4L2 device, connect outbound, begin streaming |
|
||||
| `0x0009` | `STOP_INGEST` | Set wanted state: stop ingest stream and disconnect |
|
||||
| `0x000A` | `START_DISPLAY` | Open a viewer window on the sink node and display incoming frames for the given stream |
|
||||
| `0x000B` | `STOP_DISPLAY` | Close the viewer window for the given stream |
|
||||
|
||||
### `CONTROL_RESPONSE` (0x0003)
|
||||
|
||||
@@ -412,3 +416,106 @@ packet-beta
|
||||
**Response** — no extra fields beyond request_id and status.
|
||||
|
||||
For `MENU` and `INTEGER_MENU` controls, `value` must be a valid menu item `index` as returned by `ENUM_CONTROLS`.
|
||||
|
||||
### `START_INGEST` (0x0008)
|
||||
|
||||
Sets wanted state on a source node: open the specified V4L2 device, configure the stream format, and connect outbound to the given sink.
|
||||
|
||||
**Request**:
|
||||
|
||||
```mermaid
|
||||
%%{init: {'packet': {'bitsPerRow': 16}}}%%
|
||||
packet-beta
|
||||
0-15: "request_id"
|
||||
16-31: "command = 0x0008"
|
||||
32-47: "stream_id"
|
||||
48-63: "format"
|
||||
64-79: "width"
|
||||
80-95: "height"
|
||||
96-111: "fps_n"
|
||||
112-127: "fps_d"
|
||||
128-143: "dest_port"
|
||||
144-159: "transport_mode"
|
||||
160-167: "device_path_len"
|
||||
168-175: "device_path …"
|
||||
```
|
||||
|
||||
Followed by `dest_host` str8.
|
||||
|
||||
| Field | Description |
|
||||
|---|---|
|
||||
| `stream_id` | ID assigned by the controller; used in all subsequent `VIDEO_FRAME` messages |
|
||||
| `format` | Codec format code (see [Codec Formats](#codec-formats)); `0` = auto-select best MJPEG |
|
||||
| `width` | Capture width in pixels; `0` = auto-select |
|
||||
| `height` | Capture height in pixels; `0` = auto-select |
|
||||
| `fps_n` | Frame rate numerator; `0` = auto-select |
|
||||
| `fps_d` | Frame rate denominator |
|
||||
| `dest_port` | TCP port of the sink node to connect to |
|
||||
| `transport_mode` | `0x0001` = encapsulated (framed); `0x0002` = opaque (raw byte stream) |
|
||||
| `device_path` | str8 — path to the V4L2 device, e.g. `/dev/video0` |
|
||||
| `dest_host` | str8 — hostname or IP of the sink node |
|
||||
|
||||
**Response** — no extra fields beyond request_id and status. `OK` means the wanted state was accepted; the node will reconcile asynchronously.
|
||||
|
||||
### `STOP_INGEST` (0x0009)
|
||||
|
||||
Sets wanted state: stop the ingest stream and disconnect from the sink.
|
||||
|
||||
**Request**:
|
||||
|
||||
```mermaid
|
||||
%%{init: {'packet': {'bitsPerRow': 16}}}%%
|
||||
packet-beta
|
||||
0-15: "request_id"
|
||||
16-31: "command = 0x0009"
|
||||
32-47: "stream_id"
|
||||
```
|
||||
|
||||
**Response** — no extra fields beyond request_id and status.
|
||||
|
||||
### `START_DISPLAY` (0x000A)
|
||||
|
||||
Opens a viewer window on a sink node and routes incoming `VIDEO_FRAME` messages for `stream_id` to it.
|
||||
|
||||
**Request**:
|
||||
|
||||
```mermaid
|
||||
%%{init: {'packet': {'bitsPerRow': 16}}}%%
|
||||
packet-beta
|
||||
0-15: "request_id"
|
||||
16-31: "command = 0x000A"
|
||||
32-47: "stream_id"
|
||||
48-63: "win_x (i16)"
|
||||
64-79: "win_y (i16)"
|
||||
80-95: "win_w"
|
||||
96-111: "win_h"
|
||||
112-119: "scale"
|
||||
120-127: "anchor"
|
||||
```
|
||||
|
||||
| Field | Description |
|
||||
|---|---|
|
||||
| `stream_id` | Stream to display; must match incoming `VIDEO_FRAME` stream_id |
|
||||
| `win_x`, `win_y` | Window screen position (signed; for multi-monitor placement) |
|
||||
| `win_w`, `win_h` | Window size in pixels; `0` = default (1280×720) |
|
||||
| `scale` | `0`=stretch `1`=fit `2`=fill `3`=1:1 |
|
||||
| `anchor` | `0`=center `1`=topleft |
|
||||
|
||||
**Response** — no extra fields beyond request_id and status. `OK` means the display slot was reserved; the window opens asynchronously on the main thread.
|
||||
|
||||
### `STOP_DISPLAY` (0x000B)
|
||||
|
||||
Closes the viewer window for the given stream.
|
||||
|
||||
**Request**:
|
||||
|
||||
```mermaid
|
||||
%%{init: {'packet': {'bitsPerRow': 16}}}%%
|
||||
packet-beta
|
||||
0-15: "request_id"
|
||||
16-31: "command = 0x000B"
|
||||
32-47: "stream_id"
|
||||
```
|
||||
|
||||
**Response** — no extra fields beyond request_id and status.
|
||||
|
||||
|
||||
10
docs/xorg.md
10
docs/xorg.md
@@ -107,6 +107,16 @@ A `framebuffer_size_callback` registered on the window calls `render()` synchron
|
||||
|
||||
Threading note: the GL context must be used from the thread that created it. In the video node, incoming frames arrive on a network receive thread. A frame queue between the receive thread and the render thread (which owns the GL context) is the correct model — the render thread drains the queue each poll iteration rather than having the network thread call push functions directly.
|
||||
|
||||
### Multiple windows
|
||||
|
||||
GLFW supports multiple windows from the same thread. `glfwCreateWindow` can be called repeatedly; each call returns an independent window handle with its own GL context. The video node uses this to display several streams simultaneously (one window per active `Display_Slot`).
|
||||
|
||||
**`glfwPollEvents` is global.** It drains the event queue for all windows at once, not just the one associated with the viewer it is called through. When iterating over multiple display slots and calling `xorg_viewer_handle_events` on each, only the first call does real work; subsequent calls are no-ops because the queue is already empty. This is harmless but worth knowing: if the loop is ever restructured so that event polling is conditional or short-circuited, all windows need at least one `glfwPollEvents` call per iteration or they will stop responding to input.
|
||||
|
||||
**Each window has its own GL context.** `glfwMakeContextCurrent` must be called before any GL operations to ensure calls go to the right context. The push functions (`push_yuv420`, `push_bgra`, `push_mjpeg`) and `poll` do this automatically. Code that calls GL functions directly must make the correct context current first.
|
||||
|
||||
**`glfwInit`/`glfwTerminate` are ref-counted** in the xorg module. The first `xorg_viewer_open` call initialises GLFW; `glfwTerminate` is deferred until the last viewer is closed. Do not call `glfwTerminate` directly — use `xorg_viewer_close` and let the ref count manage it.
|
||||
|
||||
### Renderer: Vulkan (future alternative)
|
||||
|
||||
A Vulkan renderer is planned as an alternative to the OpenGL one. GLFW's surface creation API is renderer-agnostic, so the window management and input handling code is shared. Only the renderer backend changes.
|
||||
|
||||
66
include/ingest.h
Normal file
66
include/ingest.h
Normal file
@@ -0,0 +1,66 @@
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
#include "error.h"
|
||||
|
||||
typedef struct Ingest_Handle Ingest_Handle;
|
||||
|
||||
/*
|
||||
* Called from the capture thread for each dequeued frame.
|
||||
* data points into the mmap'd buffer — valid only for the duration of the call.
|
||||
* Do not free data; copy if you need to retain it beyond the callback.
|
||||
*/
|
||||
typedef void (*Ingest_Frame_Fn)(
|
||||
const uint8_t *data, uint32_t len,
|
||||
int width, int height, uint32_t pixfmt,
|
||||
void *userdata);
|
||||
|
||||
/*
|
||||
* Called from the capture thread when a fatal error terminates the capture loop.
|
||||
* After this callback returns, the thread exits and the handle is in a stopped
|
||||
* state (equivalent to after ingest_stop). msg is a static string.
|
||||
*/
|
||||
typedef void (*Ingest_Error_Fn)(const char *msg, void *userdata);
|
||||
|
||||
struct Ingest_Config {
|
||||
const char *device; /* e.g. "/dev/video0" */
|
||||
uint32_t pixfmt; /* V4L2_PIX_FMT_MJPEG etc.; 0 = auto (best MJPEG) */
|
||||
int width; /* 0 = auto */
|
||||
int height; /* 0 = auto */
|
||||
Ingest_Frame_Fn on_frame;
|
||||
Ingest_Error_Fn on_error; /* may be NULL */
|
||||
void *userdata;
|
||||
};
|
||||
|
||||
/*
|
||||
* Open the V4L2 device, negotiate format, allocate MMAP buffers.
|
||||
* Does NOT start streaming. on_frame must not be NULL.
|
||||
*/
|
||||
struct App_Error ingest_open(const struct Ingest_Config *cfg, Ingest_Handle **out);
|
||||
|
||||
/*
|
||||
* Enable streaming and start the capture thread.
|
||||
* Must be called on a handle in the OPEN (not streaming) state.
|
||||
*/
|
||||
struct App_Error ingest_start(Ingest_Handle *h);
|
||||
|
||||
/*
|
||||
* Signal the capture thread to stop and block until it exits.
|
||||
* Disables streaming. The handle returns to the OPEN state and can be
|
||||
* restarted with ingest_start or released with ingest_close.
|
||||
*/
|
||||
struct App_Error ingest_stop(Ingest_Handle *h);
|
||||
|
||||
/*
|
||||
* Release MMAP buffers and close the device fd.
|
||||
* Must be called only when the handle is not streaming (before ingest_start
|
||||
* or after ingest_stop).
|
||||
*/
|
||||
void ingest_close(Ingest_Handle *h);
|
||||
|
||||
/* Query the negotiated format — valid after a successful ingest_open. */
|
||||
int ingest_width(const Ingest_Handle *h);
|
||||
int ingest_height(const Ingest_Handle *h);
|
||||
uint32_t ingest_pixfmt(const Ingest_Handle *h);
|
||||
int ingest_fps_n(const Ingest_Handle *h);
|
||||
int ingest_fps_d(const Ingest_Handle *h);
|
||||
@@ -24,6 +24,10 @@
|
||||
#define PROTO_CMD_GET_CONTROL 0x0005u
|
||||
#define PROTO_CMD_SET_CONTROL 0x0006u
|
||||
#define PROTO_CMD_ENUM_MONITORS 0x0007u
|
||||
#define PROTO_CMD_START_INGEST 0x0008u
|
||||
#define PROTO_CMD_STOP_INGEST 0x0009u
|
||||
#define PROTO_CMD_START_DISPLAY 0x000Au
|
||||
#define PROTO_CMD_STOP_DISPLAY 0x000Bu
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Response status codes (carried in CONTROL_RESPONSE payload offset 2)
|
||||
@@ -66,6 +70,13 @@
|
||||
#define PROTO_PIXEL_YUV420P 0x0004u
|
||||
#define PROTO_PIXEL_YUV422 0x0005u
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Transport mode codes (START_INGEST transport_mode field)
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
#define PROTO_TRANSPORT_ENCAPSULATED 0x0001u /* framed: message_type + payload_length header */
|
||||
#define PROTO_TRANSPORT_OPAQUE 0x0002u /* raw byte stream, no frame boundaries */
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Origin codes (STREAM_OPEN origin field; informational only)
|
||||
* ------------------------------------------------------------------------- */
|
||||
@@ -196,6 +207,66 @@ struct Proto_Set_Control_Req {
|
||||
int32_t value;
|
||||
};
|
||||
|
||||
/*
|
||||
* START_INGEST: controller tells a source node to open a V4L2 device and
|
||||
* connect outbound to a sink at dest_host:dest_port.
|
||||
* format/width/height/fps_n/fps_d of 0 mean auto-select.
|
||||
* Strings point into the caller's payload buffer; not NUL-terminated.
|
||||
*/
|
||||
struct Proto_Start_Ingest {
|
||||
uint16_t request_id;
|
||||
uint16_t stream_id;
|
||||
uint16_t format; /* PROTO_FORMAT_* code; 0 = auto (best MJPEG) */
|
||||
uint16_t width; /* 0 = auto */
|
||||
uint16_t height; /* 0 = auto */
|
||||
uint16_t fps_n; /* 0 = auto */
|
||||
uint16_t fps_d;
|
||||
uint16_t dest_port;
|
||||
uint16_t transport_mode; /* PROTO_TRANSPORT_ENCAPSULATED or PROTO_TRANSPORT_OPAQUE */
|
||||
const char *device_path;
|
||||
uint8_t device_path_len;
|
||||
const char *dest_host;
|
||||
uint8_t dest_host_len;
|
||||
};
|
||||
|
||||
struct Proto_Stop_Ingest {
|
||||
uint16_t request_id;
|
||||
uint16_t stream_id;
|
||||
};
|
||||
|
||||
/*
|
||||
* START_DISPLAY: controller tells a sink node to open a viewer window and
|
||||
* display incoming VIDEO_FRAME messages for the given stream_id.
|
||||
* win_x/win_y are screen-space window position (signed: multi-monitor).
|
||||
* win_w/win_h of 0 mean use a default size.
|
||||
* scale: 0=stretch 1=fit 2=fill 3=1:1 (PROTO_DISPLAY_SCALE_*)
|
||||
* anchor: 0=center 1=topleft (PROTO_DISPLAY_ANCHOR_*)
|
||||
*/
|
||||
struct Proto_Start_Display {
|
||||
uint16_t request_id;
|
||||
uint16_t stream_id;
|
||||
int16_t win_x;
|
||||
int16_t win_y;
|
||||
uint16_t win_w;
|
||||
uint16_t win_h;
|
||||
uint8_t scale;
|
||||
uint8_t anchor;
|
||||
};
|
||||
|
||||
struct Proto_Stop_Display {
|
||||
uint16_t request_id;
|
||||
uint16_t stream_id;
|
||||
};
|
||||
|
||||
/* Scale/anchor constants for Proto_Start_Display */
|
||||
#define PROTO_DISPLAY_SCALE_STRETCH 0u
|
||||
#define PROTO_DISPLAY_SCALE_FIT 1u
|
||||
#define PROTO_DISPLAY_SCALE_FILL 2u
|
||||
#define PROTO_DISPLAY_SCALE_1_1 3u
|
||||
|
||||
#define PROTO_DISPLAY_ANCHOR_CENTER 0u
|
||||
#define PROTO_DISPLAY_ANCHOR_TOPLEFT 1u
|
||||
|
||||
struct Proto_Response_Header {
|
||||
uint16_t request_id;
|
||||
uint16_t status;
|
||||
@@ -253,6 +324,28 @@ struct App_Error proto_write_set_control(struct Transport_Conn *conn,
|
||||
struct App_Error proto_write_enum_monitors(struct Transport_Conn *conn,
|
||||
uint16_t request_id);
|
||||
|
||||
/* CONTROL_REQUEST: START_INGEST */
|
||||
struct App_Error proto_write_start_ingest(struct Transport_Conn *conn,
|
||||
uint16_t request_id, uint16_t stream_id,
|
||||
uint16_t format, uint16_t width, uint16_t height,
|
||||
uint16_t fps_n, uint16_t fps_d,
|
||||
uint16_t transport_mode,
|
||||
const char *device_path, const char *dest_host, uint16_t dest_port);
|
||||
|
||||
/* CONTROL_REQUEST: STOP_INGEST */
|
||||
struct App_Error proto_write_stop_ingest(struct Transport_Conn *conn,
|
||||
uint16_t request_id, uint16_t stream_id);
|
||||
|
||||
/* CONTROL_REQUEST: START_DISPLAY */
|
||||
struct App_Error proto_write_start_display(struct Transport_Conn *conn,
|
||||
uint16_t request_id, uint16_t stream_id,
|
||||
int16_t win_x, int16_t win_y, uint16_t win_w, uint16_t win_h,
|
||||
uint8_t scale, uint8_t anchor);
|
||||
|
||||
/* CONTROL_REQUEST: STOP_DISPLAY */
|
||||
struct App_Error proto_write_stop_display(struct Transport_Conn *conn,
|
||||
uint16_t request_id, uint16_t stream_id);
|
||||
|
||||
/*
|
||||
* CONTROL_RESPONSE: generic.
|
||||
* payload/payload_len are the command-specific bytes after request_id+status.
|
||||
@@ -325,6 +418,22 @@ struct App_Error proto_read_set_control_req(
|
||||
const uint8_t *payload, uint32_t length,
|
||||
struct Proto_Set_Control_Req *out);
|
||||
|
||||
struct App_Error proto_read_start_ingest(
|
||||
const uint8_t *payload, uint32_t length,
|
||||
struct Proto_Start_Ingest *out);
|
||||
|
||||
struct App_Error proto_read_stop_ingest(
|
||||
const uint8_t *payload, uint32_t length,
|
||||
struct Proto_Stop_Ingest *out);
|
||||
|
||||
struct App_Error proto_read_start_display(
|
||||
const uint8_t *payload, uint32_t length,
|
||||
struct Proto_Start_Display *out);
|
||||
|
||||
struct App_Error proto_read_stop_display(
|
||||
const uint8_t *payload, uint32_t length,
|
||||
struct Proto_Stop_Display *out);
|
||||
|
||||
/*
|
||||
* Read the common 4-byte response header (request_id + status).
|
||||
* For responses with no extra fields (STREAM_OPEN, STREAM_CLOSE, SET_CONTROL),
|
||||
|
||||
108
include/reconciler.h
Normal file
108
include/reconciler.h
Normal file
@@ -0,0 +1,108 @@
|
||||
#pragma once
|
||||
|
||||
/*
|
||||
* Generic declarative state machine reconciler.
|
||||
*
|
||||
* Each managed resource is described as a directed graph of states
|
||||
* with labelled transitions. The reconciler finds the shortest path
|
||||
* (BFS) from a resource's current state to its wanted state and
|
||||
* executes one transition per tick.
|
||||
*
|
||||
* Dependencies between resources prevent a resource from advancing
|
||||
* past a threshold state until a prerequisite resource reaches a
|
||||
* minimum state.
|
||||
*
|
||||
* Usage:
|
||||
* struct Reconciler *r = reconciler_create();
|
||||
*
|
||||
* static const struct Rec_Transition dev_trans[] = {
|
||||
* {0, 1, open_device},
|
||||
* {1, 0, close_device},
|
||||
* {1, 2, start_capture},
|
||||
* {2, 1, stop_capture},
|
||||
* {-1, -1, NULL}
|
||||
* };
|
||||
* static const char *dev_states[] = {"CLOSED", "OPEN", "STREAMING"};
|
||||
* struct Rec_Resource *dev = reconciler_add_resource(r, "device",
|
||||
* dev_trans, 3, dev_states, 0, &my_device);
|
||||
*
|
||||
* reconciler_set_wanted(dev, 2);
|
||||
* while (!reconciler_is_stable(r)) {
|
||||
* reconciler_tick(r);
|
||||
* }
|
||||
*/
|
||||
|
||||
/* Transition table entry. Sentinel: {-1, -1, NULL}.
|
||||
* action: return 1 on success, 0 on failure.
|
||||
* On failure the resource stays in 'from' state. */
|
||||
struct Rec_Transition {
|
||||
int from;
|
||||
int to;
|
||||
int (*action)(void *userdata);
|
||||
};
|
||||
|
||||
typedef enum {
|
||||
REC_STATUS_STABLE, /* current == wanted */
|
||||
REC_STATUS_WORKING, /* current != wanted, next transition is eligible */
|
||||
REC_STATUS_BLOCKED, /* current != wanted, a dependency is unsatisfied */
|
||||
REC_STATUS_NO_PATH, /* current != wanted, no transition path exists */
|
||||
} Rec_Status;
|
||||
|
||||
struct Reconciler;
|
||||
struct Rec_Resource;
|
||||
|
||||
/* Optional log callback — called after each transition attempt. */
|
||||
typedef void (*Rec_Log_Fn)(
|
||||
const struct Rec_Resource *res,
|
||||
int from, int to, int success,
|
||||
void *userdata);
|
||||
|
||||
struct Reconciler *reconciler_create(void);
|
||||
void reconciler_destroy(struct Reconciler *r);
|
||||
|
||||
/* Set a log callback. Called after every transition attempt. */
|
||||
void reconciler_set_log(struct Reconciler *r, Rec_Log_Fn fn, void *userdata);
|
||||
|
||||
/* Add a resource.
|
||||
* transitions: caller-owned, sentinel-terminated {-1,-1,NULL}.
|
||||
* state_names: optional array of state_count strings; NULL for numeric display.
|
||||
* initial_state: sets both current and wanted initially. */
|
||||
struct Rec_Resource *reconciler_add_resource(
|
||||
struct Reconciler *r,
|
||||
const char *name,
|
||||
const struct Rec_Transition *transitions,
|
||||
int state_count,
|
||||
const char **state_names,
|
||||
int initial_state,
|
||||
void *userdata);
|
||||
|
||||
/* Add a dependency: resource cannot reach state >= blocked_below
|
||||
* unless dep is currently in state >= dep_min_state. */
|
||||
void reconciler_add_dep(
|
||||
struct Rec_Resource *resource,
|
||||
int blocked_below,
|
||||
struct Rec_Resource *dep,
|
||||
int dep_min_state);
|
||||
|
||||
void reconciler_set_wanted(struct Rec_Resource *r, int wanted_state);
|
||||
|
||||
/*
|
||||
* Force current state without executing a transition.
|
||||
* Use when an external event pushes a resource into a new state —
|
||||
* e.g. a transport connection drops unexpectedly, or a device error
|
||||
* causes the capture thread to exit. The reconciler will drive back
|
||||
* toward wanted state on the next tick.
|
||||
*/
|
||||
void reconciler_force_current(struct Rec_Resource *r, int state);
|
||||
int reconciler_get_current(const struct Rec_Resource *r);
|
||||
int reconciler_get_wanted(const struct Rec_Resource *r);
|
||||
const char *reconciler_get_name(const struct Rec_Resource *r);
|
||||
const char *reconciler_state_name(const struct Rec_Resource *r, int state);
|
||||
Rec_Status reconciler_get_status(const struct Rec_Resource *r);
|
||||
|
||||
/* Run one reconciliation pass over all resources.
|
||||
* Returns number of transitions attempted (success or failure). */
|
||||
int reconciler_tick(struct Reconciler *r);
|
||||
|
||||
/* Returns 1 if all resources have current == wanted. */
|
||||
int reconciler_is_stable(const struct Reconciler *r);
|
||||
@@ -51,9 +51,15 @@ struct Transport_Server_Config {
|
||||
struct App_Error transport_server_create(struct Transport_Server **out,
|
||||
struct Transport_Server_Config *config);
|
||||
|
||||
/* Bind, listen, and spawn the accept thread. */
|
||||
/* Bind, listen, and spawn the accept thread.
|
||||
* If config.port is 0, the OS assigns a free port; use
|
||||
* transport_server_get_port() afterwards to retrieve it. */
|
||||
struct App_Error transport_server_start(struct Transport_Server *server);
|
||||
|
||||
/* Return the port the server is actually listening on.
|
||||
* Valid after a successful transport_server_start(). */
|
||||
uint16_t transport_server_get_port(const struct Transport_Server *server);
|
||||
|
||||
/*
|
||||
* Stop accepting new connections and free the server.
|
||||
* Active connections continue until they disconnect naturally.
|
||||
|
||||
11
planning.md
11
planning.md
@@ -55,13 +55,13 @@ Modules are listed in intended build order. Each depends only on modules above i
|
||||
| 5 | `transport` | done | Encapsulated transport — frame header, TCP stream abstraction, single-write send |
|
||||
| 6 | `discovery` | done | UDP multicast announcements, peer table, found/lost callbacks |
|
||||
| 7 | `protocol` | done | Typed `write_*`/`read_*` functions for all message types; builds on serial + transport |
|
||||
| — | `node` | done | Video node binary — config, discovery, transport server, V4L2/media control request handlers |
|
||||
| — | `node` | done | Video node binary — config, discovery, transport server, V4L2/media control request handlers; display sink role (START_DISPLAY/STOP_DISPLAY handlers, multi-window xorg viewer, declarative display slot reconciler) |
|
||||
| 8 | `test_image` | done | Test pattern generator — colour bars, luminance ramp, grid crosshatch; YUV420/BGRA output |
|
||||
| 9 | `xorg` | done | GLFW+OpenGL viewer sink — YUV420/BGRA/MJPEG display, all scale/anchor modes, bitmap font atlas text overlays; XRandR queries and screen grab not yet implemented |
|
||||
| 10 | `reconciler` | not started | Generic wanted/current state machine reconciler — resource state graphs, BFS pathfinding, event + periodic tick; used by node to manage V4L2 devices, transport connections, and future resources (codec processes etc.) |
|
||||
| 9 | `xorg` | done | GLFW+OpenGL viewer sink — YUV420/BGRA/MJPEG display, all scale/anchor modes, bitmap font atlas text overlays; XRandR queries and screen grab not yet implemented; viewer controls (zoom, pan, scale policy) not yet exposed remotely |
|
||||
| 10 | `reconciler` | done | Generic wanted/current state machine reconciler — resource state graphs, BFS pathfinding, event + periodic tick; used by node to manage V4L2 devices, transport connections, and future resources (codec processes etc.) |
|
||||
| 11 | `frame_alloc` | not started | Per-frame allocation with bookkeeping (byte budget, ref counting) |
|
||||
| 12 | `relay` | not started | Input dispatch to output queues (low-latency and completeness modes) |
|
||||
| 13 | `ingest` | not started | V4L2 capture loop — dequeue buffers, emit one encapsulated frame per buffer |
|
||||
| 13 | `ingest` | done | V4L2 capture loop — open device, negotiate MJPEG format, MMAP buffers, capture thread with on_frame callback; start/stop lifecycle managed by reconciler |
|
||||
| 14 | `archive` | not started | Write frames to disk, control messages to binary log |
|
||||
| 15 | `codec` | not started | Per-frame encode/decode — MJPEG (libjpeg-turbo), QOI, ZSTD-raw, VA-API H.264 intra; used by screen grab source and archive |
|
||||
| 16 | `web node` | not started | Node.js/Express peer — speaks binary protocol on socket side, HTTP/WebSocket to browser; `protocol.mjs` mirrors C protocol module |
|
||||
@@ -86,6 +86,7 @@ Each module gets a corresponding CLI driver that exercises its API and serves as
|
||||
| `stream_send_cli` | V4L2 + `transport` + `protocol` | Capture MJPEG from V4L2, connect to receiver, send VIDEO_FRAME messages; prints fps/Mbps stats |
|
||||
| `stream_recv_cli` | `transport` + `protocol` + `xorg` | Listen for incoming VIDEO_FRAME stream, display in viewer; fps/Mbps overlay; threaded transport→GL handoff |
|
||||
| `reconciler_cli` | `reconciler` | Simulated state machine experiment — define resources with fake transitions, drive reconciler via CLI commands; validates the generic reconciler before wiring into the node |
|
||||
| `controller_cli` | `transport` + `protocol` | Interactive controller REPL — connects to a running node by host:port; supports enum-devices, enum-controls, get/set-control, start-ingest, stop-ingest, start-display, stop-display |
|
||||
|
||||
### Web UI (`dev/web/`)
|
||||
|
||||
@@ -112,3 +113,5 @@ These are open questions tracked in `architecture.md` that do not need to be res
|
||||
- Transport for relay edges (TCP / UDP / shared memory)
|
||||
- Node discovery mechanism
|
||||
- Hard vs soft byte budget limits
|
||||
- Cooperative capture release: if a capture source has no live downstream targets for a configurable time window, stop capture and release the device. Intended as a resource-conservation policy rather than an immediate reaction to disconnect events. Requires the node to track downstream liveness (e.g. last successful send timestamp per output) and implement a reaper timer.
|
||||
- Xorg viewer remote controls: expose viewer state (zoom, pan, scale policy, anchor) as enumerable/settable controls via the protocol, analogous to V4L2 controls. Future extension: shader-based post-processing — initial candidates are a colour-correction shader and custom user-provided GLSL fragment shaders sent over the wire.
|
||||
|
||||
19
src/modules/ingest/Makefile
Normal file
19
src/modules/ingest/Makefile
Normal file
@@ -0,0 +1,19 @@
|
||||
ROOT := $(abspath ../../..)
|
||||
include $(ROOT)/common.mk
|
||||
|
||||
MODULE_BUILD = $(BUILD)/ingest
|
||||
|
||||
.PHONY: all clean
|
||||
|
||||
all: $(MODULE_BUILD)/ingest.o
|
||||
|
||||
$(MODULE_BUILD)/ingest.o: ingest.c | $(MODULE_BUILD)
|
||||
$(CC) $(CFLAGS) $(DEPFLAGS) -c -o $@ $<
|
||||
|
||||
$(MODULE_BUILD):
|
||||
mkdir -p $@
|
||||
|
||||
clean:
|
||||
rm -f $(MODULE_BUILD)/ingest.o $(MODULE_BUILD)/ingest.d
|
||||
|
||||
-include $(MODULE_BUILD)/ingest.d
|
||||
292
src/modules/ingest/ingest.c
Normal file
292
src/modules/ingest/ingest.c
Normal file
@@ -0,0 +1,292 @@
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/select.h>
|
||||
#include <pthread.h>
|
||||
#include <stdatomic.h>
|
||||
#include <linux/videodev2.h>
|
||||
|
||||
#include "ingest.h"
|
||||
#include "v4l2_fmt.h"
|
||||
#include "error.h"
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Internal types
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
#define INGEST_N_BUFS 4
|
||||
|
||||
struct Mmap_Buf {
|
||||
void *start;
|
||||
size_t length;
|
||||
};
|
||||
|
||||
struct Ingest_Handle {
|
||||
int fd;
|
||||
struct Mmap_Buf bufs[INGEST_N_BUFS];
|
||||
int buf_count;
|
||||
|
||||
int width, height;
|
||||
uint32_t pixfmt;
|
||||
int fps_n, fps_d;
|
||||
|
||||
Ingest_Frame_Fn on_frame;
|
||||
Ingest_Error_Fn on_error;
|
||||
void *userdata;
|
||||
|
||||
pthread_t thread;
|
||||
atomic_int running; /* 1 = thread should keep going; 0 = stop */
|
||||
int started; /* 1 = pthread_create was called */
|
||||
};
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Capture thread
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
static void *capture_thread(void *arg)
|
||||
{
|
||||
struct Ingest_Handle *h = arg;
|
||||
|
||||
while (atomic_load(&h->running)) {
|
||||
fd_set fds;
|
||||
FD_ZERO(&fds);
|
||||
FD_SET(h->fd, &fds);
|
||||
struct timeval tv = { 0, 100000 }; /* 100 ms — keeps stop latency short */
|
||||
|
||||
int r = select(h->fd + 1, &fds, NULL, NULL, &tv);
|
||||
if (r < 0) {
|
||||
if (errno == EINTR) { continue; }
|
||||
if (h->on_error) { h->on_error("select failed", h->userdata); }
|
||||
break;
|
||||
}
|
||||
if (r == 0) {
|
||||
continue; /* timeout — recheck running flag */
|
||||
}
|
||||
|
||||
struct v4l2_buffer buf = {0};
|
||||
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
buf.memory = V4L2_MEMORY_MMAP;
|
||||
if (v4l2_xioctl(h->fd, VIDIOC_DQBUF, &buf) < 0) {
|
||||
if (errno == EAGAIN) { continue; }
|
||||
if (h->on_error) { h->on_error("VIDIOC_DQBUF failed", h->userdata); }
|
||||
break;
|
||||
}
|
||||
|
||||
h->on_frame(
|
||||
(const uint8_t *)h->bufs[buf.index].start,
|
||||
buf.bytesused,
|
||||
h->width, h->height, h->pixfmt,
|
||||
h->userdata);
|
||||
|
||||
if (v4l2_xioctl(h->fd, VIDIOC_QBUF, &buf) < 0) {
|
||||
if (h->on_error) { h->on_error("VIDIOC_QBUF failed", h->userdata); }
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
atomic_store(&h->running, 0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Public API
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
struct App_Error ingest_open(const struct Ingest_Config *cfg, Ingest_Handle **out)
|
||||
{
|
||||
struct Ingest_Handle *h = calloc(1, sizeof(*h));
|
||||
if (!h) { return APP_SYSCALL_ERROR(); }
|
||||
|
||||
h->fd = -1;
|
||||
h->on_frame = cfg->on_frame;
|
||||
h->on_error = cfg->on_error;
|
||||
h->userdata = cfg->userdata;
|
||||
atomic_init(&h->running, 0);
|
||||
|
||||
/* Open device */
|
||||
h->fd = open(cfg->device, O_RDWR | O_NONBLOCK);
|
||||
if (h->fd < 0) {
|
||||
free(h);
|
||||
return APP_SYSCALL_ERROR();
|
||||
}
|
||||
|
||||
/* Verify capture + streaming capability */
|
||||
struct v4l2_capability cap = {0};
|
||||
if (v4l2_xioctl(h->fd, VIDIOC_QUERYCAP, &cap) < 0) {
|
||||
close(h->fd); free(h);
|
||||
return APP_SYSCALL_ERROR();
|
||||
}
|
||||
if (!(cap.capabilities & V4L2_CAP_VIDEO_CAPTURE) ||
|
||||
!(cap.capabilities & V4L2_CAP_STREAMING)) {
|
||||
close(h->fd); free(h);
|
||||
return APP_INVALID_ERROR_MSG(0, "device does not support MJPEG streaming capture");
|
||||
}
|
||||
|
||||
/* Format selection */
|
||||
uint32_t want_pixfmt = cfg->pixfmt ? cfg->pixfmt : V4L2_PIX_FMT_MJPEG;
|
||||
|
||||
V4l2_Fmt_Option opts[V4L2_FMT_MAX_OPTS];
|
||||
int n = v4l2_enumerate_formats(h->fd, opts, V4L2_FMT_MAX_OPTS, want_pixfmt);
|
||||
if (n == 0) {
|
||||
close(h->fd); free(h);
|
||||
return APP_INVALID_ERROR_MSG(0, "no matching formats found on device");
|
||||
}
|
||||
|
||||
/* If caller specified exact w/h use that, otherwise auto-select best */
|
||||
const V4l2_Fmt_Option *chosen;
|
||||
if (cfg->width > 0 && cfg->height > 0) {
|
||||
chosen = NULL;
|
||||
for (int i = 0; i < n; i++) {
|
||||
if (opts[i].w == cfg->width && opts[i].h == cfg->height) {
|
||||
if (!chosen || v4l2_fmt_fps_gt(&opts[i], chosen)) {
|
||||
chosen = &opts[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!chosen) {
|
||||
/* Exact size not found — fall back to best available */
|
||||
chosen = v4l2_select_best(opts, n);
|
||||
}
|
||||
} else {
|
||||
chosen = v4l2_select_best(opts, n);
|
||||
}
|
||||
|
||||
/* Apply format */
|
||||
struct v4l2_format fmt = {0};
|
||||
fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
fmt.fmt.pix.pixelformat = chosen->pixfmt;
|
||||
fmt.fmt.pix.width = (uint32_t)chosen->w;
|
||||
fmt.fmt.pix.height = (uint32_t)chosen->h;
|
||||
fmt.fmt.pix.field = V4L2_FIELD_ANY;
|
||||
if (v4l2_xioctl(h->fd, VIDIOC_S_FMT, &fmt) < 0) {
|
||||
close(h->fd); free(h);
|
||||
return APP_SYSCALL_ERROR();
|
||||
}
|
||||
|
||||
h->width = (int)fmt.fmt.pix.width;
|
||||
h->height = (int)fmt.fmt.pix.height;
|
||||
h->pixfmt = fmt.fmt.pix.pixelformat;
|
||||
|
||||
/* Apply frame rate */
|
||||
{
|
||||
struct v4l2_streamparm parm = {0};
|
||||
parm.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
parm.parm.capture.timeperframe.numerator = (uint32_t)chosen->fps_d;
|
||||
parm.parm.capture.timeperframe.denominator = (uint32_t)chosen->fps_n;
|
||||
v4l2_xioctl(h->fd, VIDIOC_S_PARM, &parm);
|
||||
if (v4l2_xioctl(h->fd, VIDIOC_G_PARM, &parm) == 0 &&
|
||||
parm.parm.capture.timeperframe.denominator > 0) {
|
||||
h->fps_n = (int)parm.parm.capture.timeperframe.denominator;
|
||||
h->fps_d = (int)parm.parm.capture.timeperframe.numerator;
|
||||
} else {
|
||||
h->fps_n = chosen->fps_n;
|
||||
h->fps_d = chosen->fps_d;
|
||||
}
|
||||
}
|
||||
|
||||
/* Allocate MMAP buffers */
|
||||
struct v4l2_requestbuffers req = {0};
|
||||
req.count = INGEST_N_BUFS;
|
||||
req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
req.memory = V4L2_MEMORY_MMAP;
|
||||
if (v4l2_xioctl(h->fd, VIDIOC_REQBUFS, &req) < 0) {
|
||||
close(h->fd); free(h);
|
||||
return APP_SYSCALL_ERROR();
|
||||
}
|
||||
|
||||
h->buf_count = (int)req.count;
|
||||
for (int i = 0; i < h->buf_count; i++) {
|
||||
struct v4l2_buffer buf = {0};
|
||||
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
buf.memory = V4L2_MEMORY_MMAP;
|
||||
buf.index = (uint32_t)i;
|
||||
if (v4l2_xioctl(h->fd, VIDIOC_QUERYBUF, &buf) < 0) {
|
||||
/* Unmap already-mapped buffers before returning */
|
||||
for (int j = 0; j < i; j++) {
|
||||
munmap(h->bufs[j].start, h->bufs[j].length);
|
||||
}
|
||||
close(h->fd); free(h);
|
||||
return APP_SYSCALL_ERROR();
|
||||
}
|
||||
h->bufs[i].length = buf.length;
|
||||
h->bufs[i].start = mmap(NULL, buf.length,
|
||||
PROT_READ | PROT_WRITE, MAP_SHARED, h->fd, buf.m.offset);
|
||||
if (h->bufs[i].start == MAP_FAILED) {
|
||||
for (int j = 0; j < i; j++) {
|
||||
munmap(h->bufs[j].start, h->bufs[j].length);
|
||||
}
|
||||
close(h->fd); free(h);
|
||||
return APP_SYSCALL_ERROR();
|
||||
}
|
||||
}
|
||||
|
||||
*out = h;
|
||||
return APP_OK;
|
||||
}
|
||||
|
||||
struct App_Error ingest_start(Ingest_Handle *h)
|
||||
{
|
||||
/* Queue all buffers */
|
||||
for (int i = 0; i < h->buf_count; i++) {
|
||||
struct v4l2_buffer buf = {0};
|
||||
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
buf.memory = V4L2_MEMORY_MMAP;
|
||||
buf.index = (uint32_t)i;
|
||||
if (v4l2_xioctl(h->fd, VIDIOC_QBUF, &buf) < 0) {
|
||||
return APP_SYSCALL_ERROR();
|
||||
}
|
||||
}
|
||||
|
||||
/* Enable streaming */
|
||||
enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
if (v4l2_xioctl(h->fd, VIDIOC_STREAMON, &type) < 0) {
|
||||
return APP_SYSCALL_ERROR();
|
||||
}
|
||||
|
||||
/* Start capture thread */
|
||||
atomic_store(&h->running, 1);
|
||||
if (pthread_create(&h->thread, NULL, capture_thread, h) != 0) {
|
||||
atomic_store(&h->running, 0);
|
||||
v4l2_xioctl(h->fd, VIDIOC_STREAMOFF, &type);
|
||||
return APP_SYSCALL_ERROR();
|
||||
}
|
||||
h->started = 1;
|
||||
return APP_OK;
|
||||
}
|
||||
|
||||
struct App_Error ingest_stop(Ingest_Handle *h)
|
||||
{
|
||||
if (!h->started) {
|
||||
return APP_OK;
|
||||
}
|
||||
|
||||
atomic_store(&h->running, 0);
|
||||
pthread_join(h->thread, NULL);
|
||||
h->started = 0;
|
||||
|
||||
enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
|
||||
v4l2_xioctl(h->fd, VIDIOC_STREAMOFF, &type);
|
||||
|
||||
return APP_OK;
|
||||
}
|
||||
|
||||
void ingest_close(Ingest_Handle *h)
|
||||
{
|
||||
if (!h) { return; }
|
||||
for (int i = 0; i < h->buf_count; i++) {
|
||||
if (h->bufs[i].start && h->bufs[i].start != MAP_FAILED) {
|
||||
munmap(h->bufs[i].start, h->bufs[i].length);
|
||||
}
|
||||
}
|
||||
if (h->fd >= 0) { close(h->fd); }
|
||||
free(h);
|
||||
}
|
||||
|
||||
int ingest_width(const Ingest_Handle *h) { return h->width; }
|
||||
int ingest_height(const Ingest_Handle *h) { return h->height; }
|
||||
uint32_t ingest_pixfmt(const Ingest_Handle *h) { return h->pixfmt; }
|
||||
int ingest_fps_n(const Ingest_Handle *h) { return h->fps_n; }
|
||||
int ingest_fps_d(const Ingest_Handle *h) { return h->fps_d; }
|
||||
@@ -300,6 +300,54 @@ struct App_Error proto_write_enum_monitors(struct Transport_Conn *conn,
|
||||
return transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, 4);
|
||||
}
|
||||
|
||||
struct App_Error proto_write_start_ingest(struct Transport_Conn *conn,
|
||||
uint16_t request_id, uint16_t stream_id,
|
||||
uint16_t format, uint16_t width, uint16_t height,
|
||||
uint16_t fps_n, uint16_t fps_d,
|
||||
uint16_t transport_mode,
|
||||
const char *device_path, const char *dest_host, uint16_t dest_port)
|
||||
{
|
||||
size_t dp_len = device_path ? strlen(device_path) : 0;
|
||||
size_t dh_len = dest_host ? strlen(dest_host) : 0;
|
||||
uint8_t dp_n = dp_len > 255u ? 255u : (uint8_t)dp_len;
|
||||
uint8_t dh_n = dh_len > 255u ? 255u : (uint8_t)dh_len;
|
||||
|
||||
/* 20 bytes fixed + 1+dp_n (device_path str8) + 1+dh_n (dest_host str8) */
|
||||
uint32_t total = 20u + 1u + dp_n + 1u + dh_n;
|
||||
uint8_t *buf = malloc(total);
|
||||
if (!buf) { return APP_SYSCALL_ERROR(); }
|
||||
|
||||
uint32_t o = 0;
|
||||
put_u16(buf, o, request_id); o += 2;
|
||||
put_u16(buf, o, PROTO_CMD_START_INGEST); o += 2;
|
||||
put_u16(buf, o, stream_id); o += 2;
|
||||
put_u16(buf, o, format); o += 2;
|
||||
put_u16(buf, o, width); o += 2;
|
||||
put_u16(buf, o, height); o += 2;
|
||||
put_u16(buf, o, fps_n); o += 2;
|
||||
put_u16(buf, o, fps_d); o += 2;
|
||||
put_u16(buf, o, dest_port); o += 2;
|
||||
put_u16(buf, o, transport_mode); o += 2;
|
||||
put_u8 (buf, o, dp_n); o += 1;
|
||||
memcpy(buf + o, device_path, dp_n); o += dp_n;
|
||||
put_u8 (buf, o, dh_n); o += 1;
|
||||
memcpy(buf + o, dest_host, dh_n); o += dh_n;
|
||||
|
||||
struct App_Error e = transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, total);
|
||||
free(buf);
|
||||
return e;
|
||||
}
|
||||
|
||||
struct App_Error proto_write_stop_ingest(struct Transport_Conn *conn,
|
||||
uint16_t request_id, uint16_t stream_id)
|
||||
{
|
||||
uint8_t buf[6];
|
||||
put_u16(buf, 0, request_id);
|
||||
put_u16(buf, 2, PROTO_CMD_STOP_INGEST);
|
||||
put_u16(buf, 4, stream_id);
|
||||
return transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, 6);
|
||||
}
|
||||
|
||||
struct App_Error proto_write_control_response(struct Transport_Conn *conn,
|
||||
uint16_t request_id, uint16_t status,
|
||||
const uint8_t *payload, uint32_t payload_len)
|
||||
@@ -515,6 +563,103 @@ struct App_Error proto_read_set_control_req(
|
||||
return APP_OK;
|
||||
}
|
||||
|
||||
struct App_Error proto_read_start_ingest(
|
||||
const uint8_t *payload, uint32_t length,
|
||||
struct Proto_Start_Ingest *out)
|
||||
{
|
||||
/* Fixed portion: request_id(2) cmd(2) stream_id(2) format(2) width(2)
|
||||
* height(2) fps_n(2) fps_d(2) dest_port(2) transport_mode(2) = 20 bytes,
|
||||
* then two str8 fields. */
|
||||
struct Cursor c;
|
||||
cur_init(&c, payload, length);
|
||||
|
||||
out->request_id = cur_u16(&c);
|
||||
/* skip command word at [2..3] */
|
||||
(void) cur_u16(&c);
|
||||
out->stream_id = cur_u16(&c);
|
||||
out->format = cur_u16(&c);
|
||||
out->width = cur_u16(&c);
|
||||
out->height = cur_u16(&c);
|
||||
out->fps_n = cur_u16(&c);
|
||||
out->fps_d = cur_u16(&c);
|
||||
out->dest_port = cur_u16(&c);
|
||||
out->transport_mode = cur_u16(&c);
|
||||
out->device_path = cur_str8(&c, &out->device_path_len);
|
||||
out->dest_host = cur_str8(&c, &out->dest_host_len);
|
||||
CUR_CHECK(c);
|
||||
|
||||
return APP_OK;
|
||||
}
|
||||
|
||||
struct App_Error proto_read_stop_ingest(
|
||||
const uint8_t *payload, uint32_t length,
|
||||
struct Proto_Stop_Ingest *out)
|
||||
{
|
||||
if (length < 6) { return APP_INVALID_ERROR_MSG(0, "STOP_INGEST payload too short"); }
|
||||
out->request_id = get_u16(payload, 0);
|
||||
out->stream_id = get_u16(payload, 4);
|
||||
return APP_OK;
|
||||
}
|
||||
|
||||
/* START_DISPLAY: request_id(2) cmd(2) stream_id(2) win_x(2) win_y(2)
|
||||
* win_w(2) win_h(2) scale(1) anchor(1) = 16 bytes */
|
||||
struct App_Error proto_write_start_display(struct Transport_Conn *conn,
|
||||
uint16_t request_id, uint16_t stream_id,
|
||||
int16_t win_x, int16_t win_y, uint16_t win_w, uint16_t win_h,
|
||||
uint8_t scale, uint8_t anchor)
|
||||
{
|
||||
uint8_t buf[16];
|
||||
uint32_t o = 0;
|
||||
put_u16(buf, o, request_id); o += 2;
|
||||
put_u16(buf, o, PROTO_CMD_START_DISPLAY); o += 2;
|
||||
put_u16(buf, o, stream_id); o += 2;
|
||||
put_i16(buf, o, win_x); o += 2;
|
||||
put_i16(buf, o, win_y); o += 2;
|
||||
put_u16(buf, o, win_w); o += 2;
|
||||
put_u16(buf, o, win_h); o += 2;
|
||||
put_u8 (buf, o, scale); o += 1;
|
||||
put_u8 (buf, o, anchor); o += 1;
|
||||
(void)o;
|
||||
return transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, 16);
|
||||
}
|
||||
|
||||
struct App_Error proto_write_stop_display(struct Transport_Conn *conn,
|
||||
uint16_t request_id, uint16_t stream_id)
|
||||
{
|
||||
uint8_t buf[6];
|
||||
put_u16(buf, 0, request_id);
|
||||
put_u16(buf, 2, PROTO_CMD_STOP_DISPLAY);
|
||||
put_u16(buf, 4, stream_id);
|
||||
return transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, 6);
|
||||
}
|
||||
|
||||
struct App_Error proto_read_start_display(
|
||||
const uint8_t *payload, uint32_t length,
|
||||
struct Proto_Start_Display *out)
|
||||
{
|
||||
if (length < 16) { return APP_INVALID_ERROR_MSG(0, "START_DISPLAY payload too short"); }
|
||||
out->request_id = get_u16(payload, 0);
|
||||
/* skip command word at [2..3] */
|
||||
out->stream_id = get_u16(payload, 4);
|
||||
out->win_x = get_i16(payload, 6);
|
||||
out->win_y = get_i16(payload, 8);
|
||||
out->win_w = get_u16(payload, 10);
|
||||
out->win_h = get_u16(payload, 12);
|
||||
out->scale = get_u8 (payload, 14);
|
||||
out->anchor = get_u8 (payload, 15);
|
||||
return APP_OK;
|
||||
}
|
||||
|
||||
struct App_Error proto_read_stop_display(
|
||||
const uint8_t *payload, uint32_t length,
|
||||
struct Proto_Stop_Display *out)
|
||||
{
|
||||
if (length < 6) { return APP_INVALID_ERROR_MSG(0, "STOP_DISPLAY payload too short"); }
|
||||
out->request_id = get_u16(payload, 0);
|
||||
out->stream_id = get_u16(payload, 4);
|
||||
return APP_OK;
|
||||
}
|
||||
|
||||
struct App_Error proto_read_response_header(
|
||||
const uint8_t *payload, uint32_t length,
|
||||
struct Proto_Response_Header *out)
|
||||
|
||||
19
src/modules/reconciler/Makefile
Normal file
19
src/modules/reconciler/Makefile
Normal file
@@ -0,0 +1,19 @@
|
||||
ROOT := $(abspath ../../..)
|
||||
include $(ROOT)/common.mk
|
||||
|
||||
MODULE_BUILD = $(BUILD)/reconciler
|
||||
|
||||
.PHONY: all clean
|
||||
|
||||
all: $(MODULE_BUILD)/reconciler.o
|
||||
|
||||
$(MODULE_BUILD)/reconciler.o: reconciler.c | $(MODULE_BUILD)
|
||||
$(CC) $(CFLAGS) $(DEPFLAGS) -c -o $@ $<
|
||||
|
||||
$(MODULE_BUILD):
|
||||
mkdir -p $@
|
||||
|
||||
clean:
|
||||
rm -f $(MODULE_BUILD)/reconciler.o $(MODULE_BUILD)/reconciler.d
|
||||
|
||||
-include $(MODULE_BUILD)/reconciler.d
|
||||
278
src/modules/reconciler/reconciler.c
Normal file
278
src/modules/reconciler/reconciler.c
Normal file
@@ -0,0 +1,278 @@
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include "reconciler.h"
|
||||
|
||||
#define REC_MAX_RESOURCES 32
|
||||
#define REC_MAX_STATES 16
|
||||
#define REC_MAX_DEPS 8
|
||||
|
||||
struct Rec_Dep {
|
||||
struct Rec_Resource *dep;
|
||||
int dep_min_state;
|
||||
int blocked_below;
|
||||
};
|
||||
|
||||
struct Rec_Resource {
|
||||
char name[32];
|
||||
const struct Rec_Transition *transitions;
|
||||
int state_count;
|
||||
const char **state_names;
|
||||
int current_state;
|
||||
int wanted_state;
|
||||
void *userdata;
|
||||
struct Rec_Dep deps[REC_MAX_DEPS];
|
||||
int dep_count;
|
||||
};
|
||||
|
||||
struct Reconciler {
|
||||
struct Rec_Resource resources[REC_MAX_RESOURCES];
|
||||
int count;
|
||||
Rec_Log_Fn log_fn;
|
||||
void *log_userdata;
|
||||
};
|
||||
|
||||
struct Reconciler *reconciler_create(void) {
|
||||
struct Reconciler *r = calloc(1, sizeof(struct Reconciler));
|
||||
return r;
|
||||
}
|
||||
|
||||
void reconciler_destroy(struct Reconciler *r) {
|
||||
free(r);
|
||||
}
|
||||
|
||||
void reconciler_set_log(struct Reconciler *r, Rec_Log_Fn fn, void *userdata) {
|
||||
r->log_fn = fn;
|
||||
r->log_userdata = userdata;
|
||||
}
|
||||
|
||||
struct Rec_Resource *reconciler_add_resource(
|
||||
struct Reconciler *r,
|
||||
const char *name,
|
||||
const struct Rec_Transition *transitions,
|
||||
int state_count,
|
||||
const char **state_names,
|
||||
int initial_state,
|
||||
void *userdata)
|
||||
{
|
||||
if (r->count >= REC_MAX_RESOURCES) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
struct Rec_Resource *res = &r->resources[r->count++];
|
||||
memset(res, 0, sizeof(*res));
|
||||
strncpy(res->name, name, sizeof(res->name) - 1);
|
||||
res->transitions = transitions;
|
||||
res->state_count = state_count;
|
||||
res->state_names = state_names;
|
||||
res->current_state = initial_state;
|
||||
res->wanted_state = initial_state;
|
||||
res->userdata = userdata;
|
||||
res->dep_count = 0;
|
||||
return res;
|
||||
}
|
||||
|
||||
void reconciler_add_dep(
|
||||
struct Rec_Resource *resource,
|
||||
int blocked_below,
|
||||
struct Rec_Resource *dep,
|
||||
int dep_min_state)
|
||||
{
|
||||
if (resource->dep_count >= REC_MAX_DEPS) {
|
||||
return;
|
||||
}
|
||||
|
||||
struct Rec_Dep *d = &resource->deps[resource->dep_count++];
|
||||
d->dep = dep;
|
||||
d->dep_min_state = dep_min_state;
|
||||
d->blocked_below = blocked_below;
|
||||
}
|
||||
|
||||
void reconciler_set_wanted(struct Rec_Resource *r, int wanted_state) {
|
||||
r->wanted_state = wanted_state;
|
||||
}
|
||||
|
||||
void reconciler_force_current(struct Rec_Resource *r, int state) {
|
||||
r->current_state = state;
|
||||
}
|
||||
|
||||
int reconciler_get_current(const struct Rec_Resource *r) {
|
||||
return r->current_state;
|
||||
}
|
||||
|
||||
int reconciler_get_wanted(const struct Rec_Resource *r) {
|
||||
return r->wanted_state;
|
||||
}
|
||||
|
||||
const char *reconciler_get_name(const struct Rec_Resource *r) {
|
||||
return r->name;
|
||||
}
|
||||
|
||||
const char *reconciler_state_name(const struct Rec_Resource *r, int state) {
|
||||
if (r->state_names != NULL && state >= 0 && state < r->state_count) {
|
||||
return r->state_names[state];
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* BFS over the transition graph to find the shortest path from
|
||||
* current_state to wanted_state. Returns the first transition on
|
||||
* that path, or NULL if no path exists (or already stable).
|
||||
*/
|
||||
static const struct Rec_Transition *find_next_transition(const struct Rec_Resource *res) {
|
||||
if (res->current_state == res->wanted_state) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* prev[s] = index of transition in res->transitions that leads into state s,
|
||||
* or -1 if not yet visited. */
|
||||
int prev_trans[REC_MAX_STATES];
|
||||
int visited[REC_MAX_STATES];
|
||||
for (int i = 0; i < REC_MAX_STATES; i++) {
|
||||
prev_trans[i] = -1;
|
||||
visited[i] = 0;
|
||||
}
|
||||
|
||||
/* BFS queue — state indices. */
|
||||
int queue[REC_MAX_STATES];
|
||||
int head = 0;
|
||||
int tail = 0;
|
||||
|
||||
visited[res->current_state] = 1;
|
||||
queue[tail++] = res->current_state;
|
||||
|
||||
int found = 0;
|
||||
|
||||
while (head < tail && !found) {
|
||||
int cur = queue[head++];
|
||||
|
||||
for (int i = 0; ; i++) {
|
||||
const struct Rec_Transition *t = &res->transitions[i];
|
||||
if (t->from == -1 && t->to == -1 && t->action == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (t->from != cur) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int next = t->to;
|
||||
if (next < 0 || next >= REC_MAX_STATES) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (visited[next]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
visited[next] = 1;
|
||||
prev_trans[next] = i;
|
||||
queue[tail++] = next;
|
||||
|
||||
if (next == res->wanted_state) {
|
||||
found = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!found) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Walk back from wanted_state to find the first step. */
|
||||
int state = res->wanted_state;
|
||||
int first_trans_idx = prev_trans[state];
|
||||
|
||||
while (1) {
|
||||
int ti = prev_trans[state];
|
||||
if (ti == -1) {
|
||||
break;
|
||||
}
|
||||
int from_state = res->transitions[ti].from;
|
||||
if (from_state == res->current_state) {
|
||||
first_trans_idx = ti;
|
||||
break;
|
||||
}
|
||||
first_trans_idx = ti;
|
||||
state = from_state;
|
||||
}
|
||||
|
||||
return &res->transitions[first_trans_idx];
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns 1 if all dependencies allow the resource to enter next_state.
|
||||
* Returns 0 if any dependency blocks it.
|
||||
*/
|
||||
static int deps_allow(const struct Rec_Resource *res, int next_state) {
|
||||
for (int i = 0; i < res->dep_count; i++) {
|
||||
const struct Rec_Dep *d = &res->deps[i];
|
||||
if (next_state >= d->blocked_below && d->dep->current_state < d->dep_min_state) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
Rec_Status reconciler_get_status(const struct Rec_Resource *r) {
|
||||
if (r->current_state == r->wanted_state) {
|
||||
return REC_STATUS_STABLE;
|
||||
}
|
||||
|
||||
const struct Rec_Transition *t = find_next_transition(r);
|
||||
if (t == NULL) {
|
||||
return REC_STATUS_NO_PATH;
|
||||
}
|
||||
|
||||
if (!deps_allow(r, t->to)) {
|
||||
return REC_STATUS_BLOCKED;
|
||||
}
|
||||
|
||||
return REC_STATUS_WORKING;
|
||||
}
|
||||
|
||||
int reconciler_tick(struct Reconciler *r) {
|
||||
int attempted = 0;
|
||||
|
||||
for (int i = 0; i < r->count; i++) {
|
||||
struct Rec_Resource *res = &r->resources[i];
|
||||
|
||||
if (res->current_state == res->wanted_state) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const struct Rec_Transition *t = find_next_transition(res);
|
||||
if (t == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!deps_allow(res, t->to)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int from = res->current_state;
|
||||
int to = t->to;
|
||||
int success = t->action(res->userdata);
|
||||
attempted++;
|
||||
|
||||
if (success) {
|
||||
res->current_state = to;
|
||||
}
|
||||
|
||||
if (r->log_fn != NULL) {
|
||||
r->log_fn(res, from, to, success, r->log_userdata);
|
||||
}
|
||||
}
|
||||
|
||||
return attempted;
|
||||
}
|
||||
|
||||
int reconciler_is_stable(const struct Reconciler *r) {
|
||||
for (int i = 0; i < r->count; i++) {
|
||||
if (r->resources[i].current_state != r->resources[i].wanted_state) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
@@ -23,6 +23,7 @@ struct Transport_Conn {
|
||||
|
||||
struct Transport_Server {
|
||||
int listen_fd;
|
||||
uint16_t bound_port; /* actual port after bind */
|
||||
struct Transport_Server_Config config;
|
||||
pthread_t accept_thread;
|
||||
pthread_mutex_t count_mutex;
|
||||
@@ -209,6 +210,15 @@ struct App_Error transport_server_start(struct Transport_Server *server) {
|
||||
return APP_SYSCALL_ERROR();
|
||||
}
|
||||
|
||||
/* Read back the actual port (matters when config.port == 0) */
|
||||
struct sockaddr_in bound = {0};
|
||||
socklen_t bound_len = sizeof(bound);
|
||||
if (getsockname(fd, (struct sockaddr *)&bound, &bound_len) == 0) {
|
||||
server->bound_port = ntohs(bound.sin_port);
|
||||
} else {
|
||||
server->bound_port = server->config.port;
|
||||
}
|
||||
|
||||
if (listen(fd, SOMAXCONN) < 0) {
|
||||
close(fd);
|
||||
return APP_SYSCALL_ERROR();
|
||||
@@ -235,6 +245,10 @@ void transport_server_destroy(struct Transport_Server *server) {
|
||||
free(server);
|
||||
}
|
||||
|
||||
uint16_t transport_server_get_port(const struct Transport_Server *server) {
|
||||
return server->bound_port;
|
||||
}
|
||||
|
||||
struct App_Error transport_connect(struct Transport_Conn **out,
|
||||
const char *host, uint16_t port,
|
||||
uint32_t max_payload,
|
||||
|
||||
@@ -12,6 +12,23 @@
|
||||
#include "xorg.h"
|
||||
#include "font_atlas.h" /* generated: font_glyphs[], font_atlas_pixels[], FONT_ATLAS_W/H */
|
||||
|
||||
/* Reference count for glfwInit/glfwTerminate.
|
||||
* All xorg calls happen on the main thread, so no locking needed. */
|
||||
static int glfw_ref_count = 0;
|
||||
|
||||
static void glfw_acquire(void)
|
||||
{
|
||||
if (glfw_ref_count == 0) { glfwInit(); }
|
||||
glfw_ref_count++;
|
||||
}
|
||||
|
||||
static void glfw_release(void)
|
||||
{
|
||||
if (glfw_ref_count <= 0) { return; }
|
||||
glfw_ref_count--;
|
||||
if (glfw_ref_count == 0) { glfwTerminate(); }
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Shader sources — video */
|
||||
/* ------------------------------------------------------------------ */
|
||||
@@ -326,10 +343,7 @@ static bool init_text_rendering(Xorg_Viewer *v)
|
||||
Xorg_Viewer *xorg_viewer_open(int x, int y, int width, int height,
|
||||
const char *title)
|
||||
{
|
||||
if (!glfwInit()) {
|
||||
fprintf(stderr, "xorg: glfwInit failed\n");
|
||||
return NULL;
|
||||
}
|
||||
glfw_acquire();
|
||||
|
||||
glfwWindowHint(GLFW_CONTEXT_VERSION_MAJOR, 3);
|
||||
glfwWindowHint(GLFW_CONTEXT_VERSION_MINOR, 3);
|
||||
@@ -338,7 +352,7 @@ Xorg_Viewer *xorg_viewer_open(int x, int y, int width, int height,
|
||||
GLFWwindow *win = glfwCreateWindow(width, height, title, NULL, NULL);
|
||||
if (!win) {
|
||||
fprintf(stderr, "xorg: glfwCreateWindow failed\n");
|
||||
glfwTerminate();
|
||||
glfw_release();
|
||||
return NULL;
|
||||
}
|
||||
glfwSetWindowPos(win, x, y);
|
||||
@@ -350,14 +364,14 @@ Xorg_Viewer *xorg_viewer_open(int x, int y, int width, int height,
|
||||
if (glewInit() != GLEW_OK) {
|
||||
fprintf(stderr, "xorg: glewInit failed\n");
|
||||
glfwDestroyWindow(win);
|
||||
glfwTerminate();
|
||||
glfw_release();
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Xorg_Viewer *v = calloc(1, sizeof(*v));
|
||||
if (!v) {
|
||||
glfwDestroyWindow(win);
|
||||
glfwTerminate();
|
||||
glfw_release();
|
||||
return NULL;
|
||||
}
|
||||
v->window = win;
|
||||
@@ -727,6 +741,7 @@ bool xorg_viewer_push_yuv420(Xorg_Viewer *v,
|
||||
int width, int height)
|
||||
{
|
||||
if (!v) { return false; }
|
||||
glfwMakeContextCurrent(v->window);
|
||||
v->frame_w = width;
|
||||
v->frame_h = height;
|
||||
upload_yuv(v, y, width, height, cb, width / 2, height / 2, cr);
|
||||
@@ -737,6 +752,7 @@ bool xorg_viewer_push_bgra(Xorg_Viewer *v,
|
||||
const uint8_t *data, int width, int height)
|
||||
{
|
||||
if (!v) { return false; }
|
||||
glfwMakeContextCurrent(v->window);
|
||||
|
||||
v->frame_w = width;
|
||||
v->frame_h = height;
|
||||
@@ -759,6 +775,7 @@ bool xorg_viewer_push_mjpeg(Xorg_Viewer *v,
|
||||
return false;
|
||||
#else
|
||||
if (!v) { return false; }
|
||||
glfwMakeContextCurrent(v->window);
|
||||
|
||||
int w, h, subsamp, colorspace;
|
||||
if (tjDecompressHeader3(v->tj, data, (unsigned long)size,
|
||||
@@ -809,6 +826,7 @@ bool xorg_viewer_poll(Xorg_Viewer *v)
|
||||
if (!v || glfwWindowShouldClose(v->window)) { return false; }
|
||||
glfwPollEvents();
|
||||
if (glfwWindowShouldClose(v->window)) { return false; }
|
||||
glfwMakeContextCurrent(v->window);
|
||||
render(v);
|
||||
return true;
|
||||
}
|
||||
@@ -838,7 +856,7 @@ void xorg_viewer_close(Xorg_Viewer *v)
|
||||
if (v->prog_rgb) { glDeleteProgram(v->prog_rgb); }
|
||||
if (v->window) {
|
||||
glfwDestroyWindow(v->window);
|
||||
glfwTerminate();
|
||||
glfw_release();
|
||||
}
|
||||
free(v);
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ TRANSPORT_OBJ = $(BUILD)/transport/transport.o
|
||||
DISCOVERY_OBJ = $(BUILD)/discovery/discovery.o
|
||||
CONFIG_OBJ = $(BUILD)/config/config.o
|
||||
PROTOCOL_OBJ = $(BUILD)/protocol/protocol.o
|
||||
RECONCILER_OBJ = $(BUILD)/reconciler/reconciler.o
|
||||
INGEST_OBJ = $(BUILD)/ingest/ingest.o
|
||||
XORG_OBJ = $(BUILD)/xorg/xorg.o
|
||||
|
||||
.PHONY: all clean
|
||||
@@ -20,21 +22,25 @@ all: $(NODE_BUILD)/video-node
|
||||
$(NODE_BUILD)/video-node: $(MAIN_OBJ) \
|
||||
$(COMMON_OBJ) $(MEDIA_OBJ) $(V4L2_OBJ) $(SERIAL_OBJ) \
|
||||
$(TRANSPORT_OBJ) $(DISCOVERY_OBJ) $(CONFIG_OBJ) $(PROTOCOL_OBJ) \
|
||||
$(XORG_OBJ)
|
||||
$(RECONCILER_OBJ) $(INGEST_OBJ) $(XORG_OBJ)
|
||||
$(CC) $(CFLAGS) -o $@ $^ -lpthread $(PKG_LDFLAGS)
|
||||
|
||||
$(MAIN_OBJ): main.c | $(NODE_BUILD)
|
||||
$(CC) $(CFLAGS) $(DEPFLAGS) -c -o $@ $<
|
||||
|
||||
$(COMMON_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/common
|
||||
$(MEDIA_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/media_ctrl
|
||||
$(V4L2_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl
|
||||
$(SERIAL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/serial
|
||||
$(TRANSPORT_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/transport
|
||||
$(DISCOVERY_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/discovery
|
||||
$(CONFIG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/config
|
||||
$(PROTOCOL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/protocol
|
||||
$(XORG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/xorg
|
||||
# 'force' ensures the sub-make is always invoked so it can check source timestamps itself.
|
||||
.PHONY: force
|
||||
$(COMMON_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/common
|
||||
$(MEDIA_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/media_ctrl
|
||||
$(V4L2_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl
|
||||
$(SERIAL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/serial
|
||||
$(TRANSPORT_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/transport
|
||||
$(DISCOVERY_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/discovery
|
||||
$(CONFIG_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/config
|
||||
$(PROTOCOL_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/protocol
|
||||
$(RECONCILER_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/reconciler
|
||||
$(INGEST_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/ingest
|
||||
$(XORG_OBJ): force ; $(MAKE) -C $(ROOT)/src/modules/xorg
|
||||
|
||||
$(NODE_BUILD):
|
||||
mkdir -p $@
|
||||
|
||||
1040
src/node/main.c
1040
src/node/main.c
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user