From 639a84b1b9319b4c79bd6bf6e3abdc9f8588a60d Mon Sep 17 00:00:00 2001 From: mikael-lovqvists-claude-agent Date: Sun, 29 Mar 2026 01:52:17 +0000 Subject: [PATCH] Add reconciler and ingest modules with CLI driver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit reconciler: generic resource state machine — BFS pathfinding from current to wanted state, dependency constraints, event/periodic tick model. reconciler_cli exercises it with simulated device/transport/stream resources. ingest: V4L2 capture module — open device, negotiate MJPEG format, MMAP buffer pool, capture thread with on_frame callback. start/stop lifecycle designed for reconciler management. Transport-agnostic: caller wires on_frame to proto_write_video_frame. Co-Authored-By: Claude Sonnet 4.6 --- dev/cli/Makefile | 14 +- dev/cli/reconciler_cli.c | 456 ++++++++++++++++++++++++++++ include/ingest.h | 66 ++++ include/reconciler.h | 99 ++++++ planning.md | 4 +- src/modules/ingest/Makefile | 19 ++ src/modules/ingest/ingest.c | 292 ++++++++++++++++++ src/modules/reconciler/Makefile | 19 ++ src/modules/reconciler/reconciler.c | 274 +++++++++++++++++ 9 files changed, 1238 insertions(+), 5 deletions(-) create mode 100644 dev/cli/reconciler_cli.c create mode 100644 include/ingest.h create mode 100644 include/reconciler.h create mode 100644 src/modules/ingest/Makefile create mode 100644 src/modules/ingest/ingest.c create mode 100644 src/modules/reconciler/Makefile create mode 100644 src/modules/reconciler/reconciler.c diff --git a/dev/cli/Makefile b/dev/cli/Makefile index 739ecf8..13a4684 100644 --- a/dev/cli/Makefile +++ b/dev/cli/Makefile @@ -12,6 +12,7 @@ CONFIG_OBJ = $(BUILD)/config/config.o PROTOCOL_OBJ = $(BUILD)/protocol/protocol.o TEST_IMAGE_OBJ = $(BUILD)/test_image/test_image.o XORG_OBJ = $(BUILD)/xorg/xorg.o +RECONCILER_OBJ = $(BUILD)/reconciler/reconciler.o CLI_SRCS = \ media_ctrl_cli.c \ @@ -25,7 +26,8 @@ CLI_SRCS = \ xorg_cli.c \ v4l2_view_cli.c \ stream_send_cli.c \ - stream_recv_cli.c + stream_recv_cli.c \ + reconciler_cli.c CLI_OBJS = $(CLI_SRCS:%.c=$(CLI_BUILD)/%.o) @@ -43,7 +45,8 @@ all: \ $(CLI_BUILD)/xorg_cli \ $(CLI_BUILD)/v4l2_view_cli \ $(CLI_BUILD)/stream_send_cli \ - $(CLI_BUILD)/stream_recv_cli + $(CLI_BUILD)/stream_recv_cli \ + $(CLI_BUILD)/reconciler_cli # Module objects delegate to their sub-makes. $(COMMON_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/common @@ -56,6 +59,7 @@ $(CONFIG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/config $(PROTOCOL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/protocol $(TEST_IMAGE_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/test_image $(XORG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/xorg +$(RECONCILER_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/reconciler # Compile each CLI source to its own .o (generates .d alongside). $(CLI_BUILD)/%.o: %.c | $(CLI_BUILD) @@ -98,6 +102,9 @@ $(CLI_BUILD)/stream_send_cli: $(CLI_BUILD)/stream_send_cli.o $(COMMON_OBJ) $(SER $(CLI_BUILD)/stream_recv_cli: $(CLI_BUILD)/stream_recv_cli.o $(COMMON_OBJ) $(SERIAL_OBJ) $(TRANSPORT_OBJ) $(PROTOCOL_OBJ) $(XORG_OBJ) $(CC) $(CFLAGS) -o $@ $^ -lpthread $(PKG_LDFLAGS) +$(CLI_BUILD)/reconciler_cli: $(CLI_BUILD)/reconciler_cli.o $(RECONCILER_OBJ) + $(CC) $(CFLAGS) -o $@ $^ + $(CLI_BUILD): mkdir -p $@ @@ -116,6 +123,7 @@ clean: $(CLI_BUILD)/xorg_cli \ $(CLI_BUILD)/v4l2_view_cli \ $(CLI_BUILD)/stream_send_cli \ - $(CLI_BUILD)/stream_recv_cli + $(CLI_BUILD)/stream_recv_cli \ + $(CLI_BUILD)/reconciler_cli -include $(CLI_OBJS:%.o=%.d) diff --git a/dev/cli/reconciler_cli.c b/dev/cli/reconciler_cli.c new file mode 100644 index 0000000..83f7d79 --- /dev/null +++ b/dev/cli/reconciler_cli.c @@ -0,0 +1,456 @@ +#include +#include +#include +#include +#include "reconciler.h" + +/* ----------------------------------------------------------------------- + * Simulated resource userdata + * ----------------------------------------------------------------------- */ + +struct Sim_State { + const char *name; + int fail_next; +}; + +static int sim_action(struct Sim_State *s, const char *action_name) { + printf(" [%s] %s\n", s->name, action_name); + if (s->fail_next) { + s->fail_next = 0; + return 0; + } + return 1; +} + +/* device transitions */ +static int device_open(void *ud) { return sim_action((struct Sim_State *)ud, "opening device"); } +static int device_close(void *ud) { return sim_action((struct Sim_State *)ud, "closing device"); } +static int device_start(void *ud) { return sim_action((struct Sim_State *)ud, "starting capture"); } +static int device_stop(void *ud) { return sim_action((struct Sim_State *)ud, "stopping capture"); } + +/* transport transitions */ +static int transport_connect(void *ud) { return sim_action((struct Sim_State *)ud, "connecting transport"); } +static int transport_disconnect(void *ud) { return sim_action((struct Sim_State *)ud, "disconnecting transport"); } + +/* stream transitions */ +static int stream_activate(void *ud) { return sim_action((struct Sim_State *)ud, "activating stream"); } +static int stream_deactivate(void *ud) { return sim_action((struct Sim_State *)ud, "deactivating stream"); } + +/* ----------------------------------------------------------------------- + * Log callback + * ----------------------------------------------------------------------- */ + +static void on_log( + const struct Rec_Resource *res, + int from, int to, int success, + void *userdata) +{ + (void)userdata; + + const char *from_name = reconciler_state_name(res, from); + const char *to_name = reconciler_state_name(res, to); + + if (from_name != NULL && to_name != NULL) { + printf(" [%s] %s -> %s ... %s\n", + reconciler_get_name(res), + from_name, to_name, + success ? "ok" : "FAILED"); + } else { + printf(" [%s] %d -> %d ... %s\n", + reconciler_get_name(res), + from, to, + success ? "ok" : "FAILED"); + } +} + +/* ----------------------------------------------------------------------- + * Helpers + * ----------------------------------------------------------------------- */ + +static const char *status_label(Rec_Status s) { + switch (s) { + case REC_STATUS_STABLE: return "stable"; + case REC_STATUS_WORKING: return "working"; + case REC_STATUS_BLOCKED: return "blocked"; + case REC_STATUS_NO_PATH: return "no_path"; + } + return "?"; +} + +/* + * Find the first unsatisfied dep for a resource. + * Returns NULL if none (or resource is not blocked). + */ +static void print_blocked_reason(const struct Rec_Resource *res) { + /* We need access to internals — expose via a helper approach. + * Since we can't access internal deps from outside the module, + * we rely on reconciler_get_status returning BLOCKED and print + * a generic message. The CLI has access to the sim resources + * directly so we can check ourselves using the public API. */ + (void)res; + printf(" (dependency unsatisfied)"); +} + +static void print_state(const struct Rec_Resource *res, int state) { + const char *name = reconciler_state_name(res, state); + if (name != NULL) { + printf("%s(%d)", name, state); + } else { + printf("%d", state); + } +} + +/* ----------------------------------------------------------------------- + * Blocked dependency introspection + * + * We track resource/dep relationships here in the CLI so we can print + * informative blocked messages without exposing internals from the module. + * ----------------------------------------------------------------------- */ + +#define CLI_MAX_DEPS 8 + +struct Cli_Dep { + const struct Rec_Resource *resource; + int blocked_below; + const struct Rec_Resource *dep; + int dep_min_state; +}; + +static struct Cli_Dep cli_deps[CLI_MAX_DEPS]; +static int cli_dep_count = 0; + +static void cli_add_dep( + const struct Rec_Resource *resource, + int blocked_below, + const struct Rec_Resource *dep, + int dep_min_state) +{ + if (cli_dep_count >= CLI_MAX_DEPS) { + return; + } + struct Cli_Dep *d = &cli_deps[cli_dep_count++]; + d->resource = resource; + d->blocked_below = blocked_below; + d->dep = dep; + d->dep_min_state = dep_min_state; +} + +static void print_blocked_info(const struct Rec_Resource *res) { + int wanted = reconciler_get_wanted(res); + int current = reconciler_get_current(res); + (void)current; + + for (int i = 0; i < cli_dep_count; i++) { + struct Cli_Dep *d = &cli_deps[i]; + if (d->resource != res) { + continue; + } + if (wanted < d->blocked_below) { + continue; + } + if (reconciler_get_current(d->dep) < d->dep_min_state) { + printf(" [blocked: %s < ", reconciler_get_name(d->dep)); + print_state(d->dep, d->dep_min_state); + printf("]"); + return; + } + } + print_blocked_reason(res); +} + +/* ----------------------------------------------------------------------- + * Command implementations + * ----------------------------------------------------------------------- */ + +#define MAX_RESOURCES 8 + +static struct Rec_Resource *all_resources[MAX_RESOURCES]; +static int resource_count = 0; + +static struct Rec_Resource *find_resource(const char *name) { + for (int i = 0; i < resource_count; i++) { + if (strcmp(reconciler_get_name(all_resources[i]), name) == 0) { + return all_resources[i]; + } + } + return NULL; +} + +static int parse_state(const struct Rec_Resource *res, const char *token) { + /* Try numeric first. */ + char *end; + long n = strtol(token, &end, 10); + if (*end == '\0') { + return (int)n; + } + + /* Try case-insensitive name match. */ + int state_count = 0; + /* Iterate states 0..N-1 using reconciler_state_name. We don't know + * state_count without internal access, so scan until NULL. */ + for (int i = 0; i < 64; i++) { + const char *sname = reconciler_state_name(res, i); + if (sname == NULL) { + break; + } + state_count++; + /* Case-insensitive compare. */ + int match = 1; + size_t tlen = strlen(token); + size_t slen = strlen(sname); + if (tlen != slen) { + match = 0; + } else { + for (size_t j = 0; j < tlen; j++) { + if (tolower((unsigned char)token[j]) != tolower((unsigned char)sname[j])) { + match = 0; + break; + } + } + } + if (match) { + return i; + } + } + (void)state_count; + return -1; +} + +static void cmd_status(void) { + for (int i = 0; i < resource_count; i++) { + const struct Rec_Resource *res = all_resources[i]; + int current = reconciler_get_current(res); + int wanted = reconciler_get_wanted(res); + Rec_Status status = reconciler_get_status(res); + + printf(" %-12s ", reconciler_get_name(res)); + print_state(res, current); + printf(" wanted "); + print_state(res, wanted); + printf(" [%s]", status_label(status)); + + if (status == REC_STATUS_BLOCKED) { + print_blocked_info(res); + } + + printf("\n"); + } +} + +static void cmd_want(const char *name, const char *state_token) { + struct Rec_Resource *res = find_resource(name); + if (res == NULL) { + printf("unknown resource: %s\n", name); + return; + } + + int state = parse_state(res, state_token); + if (state < 0) { + printf("unknown state: %s\n", state_token); + return; + } + + reconciler_set_wanted(res, state); + printf(" %s wanted -> ", name); + print_state(res, state); + printf("\n"); +} + +static void cmd_tick(struct Reconciler *r) { + int n = reconciler_tick(r); + if (n == 0) { + printf(" (no transitions)\n"); + } +} + +static void cmd_run(struct Reconciler *r) { + int max_ticks = 20; + int tick = 0; + + while (!reconciler_is_stable(r) && tick < max_ticks) { + printf("tick %d:\n", tick + 1); + int n = reconciler_tick(r); + if (n == 0) { + printf(" (no progress — stopping)\n"); + break; + } + tick++; + } + + if (reconciler_is_stable(r)) { + printf("stable after %d tick(s)\n", tick); + } else if (tick >= max_ticks) { + printf("reached max ticks (%d) without stabilising\n", max_ticks); + } +} + +static void cmd_fail(const char *name, struct Sim_State sim_states[], int sim_count) { + for (int i = 0; i < sim_count; i++) { + if (strcmp(sim_states[i].name, name) == 0) { + sim_states[i].fail_next = 1; + printf(" next action for %s will fail\n", name); + return; + } + } + printf("unknown resource: %s\n", name); +} + +static void cmd_help(void) { + printf("commands:\n"); + printf(" status print all resources with current/wanted state and status\n"); + printf(" want set wanted state (by number or name, case-insensitive)\n"); + printf(" tick run one reconciler tick\n"); + printf(" run tick until stable (max 20 ticks)\n"); + printf(" fail make the next action for this resource fail\n"); + printf(" help show this help\n"); + printf(" quit / exit exit\n"); +} + +/* ----------------------------------------------------------------------- + * Main + * ----------------------------------------------------------------------- */ + +int main(void) { + struct Reconciler *r = reconciler_create(); + reconciler_set_log(r, on_log, NULL); + + /* Device resource. */ + static const struct Rec_Transition device_trans[] = { + {0, 1, device_open}, + {1, 0, device_close}, + {1, 2, device_start}, + {2, 1, device_stop}, + {-1, -1, NULL} + }; + static const char *device_states[] = {"CLOSED", "OPEN", "STREAMING"}; + + /* Transport resource. */ + static const struct Rec_Transition transport_trans[] = { + {0, 1, transport_connect}, + {1, 0, transport_disconnect}, + {-1, -1, NULL} + }; + static const char *transport_states[] = {"DISCONNECTED", "CONNECTED"}; + + /* Stream resource. */ + static const struct Rec_Transition stream_trans[] = { + {0, 1, stream_activate}, + {1, 0, stream_deactivate}, + {-1, -1, NULL} + }; + static const char *stream_states[] = {"INACTIVE", "ACTIVE"}; + + /* Sim userdata — indexed to match resource order. */ + static struct Sim_State sim_states[] = { + {"device", 0}, + {"transport", 0}, + {"stream", 0}, + }; + + struct Rec_Resource *device = reconciler_add_resource(r, + "device", device_trans, 3, device_states, 0, &sim_states[0]); + + struct Rec_Resource *transport = reconciler_add_resource(r, + "transport", transport_trans, 2, transport_states, 0, &sim_states[1]); + + struct Rec_Resource *stream = reconciler_add_resource(r, + "stream", stream_trans, 2, stream_states, 0, &sim_states[2]); + + /* Dependencies. */ + /* transport cannot reach CONNECTED(1) unless device >= OPEN(1). */ + reconciler_add_dep(transport, 1, device, 1); + cli_add_dep(transport, 1, device, 1); + + /* stream cannot reach ACTIVE(1) unless transport >= CONNECTED(1). */ + reconciler_add_dep(stream, 1, transport, 1); + cli_add_dep(stream, 1, transport, 1); + + /* stream cannot reach ACTIVE(1) unless device >= STREAMING(2). */ + reconciler_add_dep(stream, 1, device, 2); + cli_add_dep(stream, 1, device, 2); + + /* Register resources for lookup. */ + all_resources[resource_count++] = device; + all_resources[resource_count++] = transport; + all_resources[resource_count++] = stream; + + /* Welcome. */ + printf("reconciler_cli — interactive declarative state machine demo\n\n"); + cmd_help(); + printf("\n"); + cmd_status(); + printf("\n"); + + /* REPL. */ + char line[256]; + while (1) { + printf("> "); + fflush(stdout); + + if (fgets(line, sizeof(line), stdin) == NULL) { + break; + } + + /* Strip trailing newline. */ + size_t len = strlen(line); + while (len > 0 && (line[len - 1] == '\n' || line[len - 1] == '\r')) { + line[--len] = '\0'; + } + + /* Tokenise. */ + char *tokens[4]; + int ntok = 0; + char *p = line; + + while (*p != '\0' && ntok < 4) { + while (*p == ' ' || *p == '\t') { + p++; + } + if (*p == '\0') { + break; + } + tokens[ntok++] = p; + while (*p != '\0' && *p != ' ' && *p != '\t') { + p++; + } + if (*p != '\0') { + *p++ = '\0'; + } + } + + if (ntok == 0) { + continue; + } + + const char *cmd = tokens[0]; + + if (strcmp(cmd, "quit") == 0 || strcmp(cmd, "exit") == 0) { + break; + } else if (strcmp(cmd, "help") == 0) { + cmd_help(); + } else if (strcmp(cmd, "status") == 0) { + cmd_status(); + } else if (strcmp(cmd, "tick") == 0) { + cmd_tick(r); + } else if (strcmp(cmd, "run") == 0) { + cmd_run(r); + } else if (strcmp(cmd, "want") == 0) { + if (ntok < 3) { + printf("usage: want \n"); + } else { + cmd_want(tokens[1], tokens[2]); + } + } else if (strcmp(cmd, "fail") == 0) { + if (ntok < 2) { + printf("usage: fail \n"); + } else { + cmd_fail(tokens[1], sim_states, 3); + } + } else { + printf("unknown command: %s (type 'help' for commands)\n", cmd); + } + } + + reconciler_destroy(r); + return 0; +} diff --git a/include/ingest.h b/include/ingest.h new file mode 100644 index 0000000..da68dec --- /dev/null +++ b/include/ingest.h @@ -0,0 +1,66 @@ +#pragma once + +#include +#include "error.h" + +typedef struct Ingest_Handle Ingest_Handle; + +/* + * Called from the capture thread for each dequeued frame. + * data points into the mmap'd buffer — valid only for the duration of the call. + * Do not free data; copy if you need to retain it beyond the callback. + */ +typedef void (*Ingest_Frame_Fn)( + const uint8_t *data, uint32_t len, + int width, int height, uint32_t pixfmt, + void *userdata); + +/* + * Called from the capture thread when a fatal error terminates the capture loop. + * After this callback returns, the thread exits and the handle is in a stopped + * state (equivalent to after ingest_stop). msg is a static string. + */ +typedef void (*Ingest_Error_Fn)(const char *msg, void *userdata); + +struct Ingest_Config { + const char *device; /* e.g. "/dev/video0" */ + uint32_t pixfmt; /* V4L2_PIX_FMT_MJPEG etc.; 0 = auto (best MJPEG) */ + int width; /* 0 = auto */ + int height; /* 0 = auto */ + Ingest_Frame_Fn on_frame; + Ingest_Error_Fn on_error; /* may be NULL */ + void *userdata; +}; + +/* + * Open the V4L2 device, negotiate format, allocate MMAP buffers. + * Does NOT start streaming. on_frame must not be NULL. + */ +struct App_Error ingest_open(const struct Ingest_Config *cfg, Ingest_Handle **out); + +/* + * Enable streaming and start the capture thread. + * Must be called on a handle in the OPEN (not streaming) state. + */ +struct App_Error ingest_start(Ingest_Handle *h); + +/* + * Signal the capture thread to stop and block until it exits. + * Disables streaming. The handle returns to the OPEN state and can be + * restarted with ingest_start or released with ingest_close. + */ +struct App_Error ingest_stop(Ingest_Handle *h); + +/* + * Release MMAP buffers and close the device fd. + * Must be called only when the handle is not streaming (before ingest_start + * or after ingest_stop). + */ +void ingest_close(Ingest_Handle *h); + +/* Query the negotiated format — valid after a successful ingest_open. */ +int ingest_width(const Ingest_Handle *h); +int ingest_height(const Ingest_Handle *h); +uint32_t ingest_pixfmt(const Ingest_Handle *h); +int ingest_fps_n(const Ingest_Handle *h); +int ingest_fps_d(const Ingest_Handle *h); diff --git a/include/reconciler.h b/include/reconciler.h new file mode 100644 index 0000000..e55f10b --- /dev/null +++ b/include/reconciler.h @@ -0,0 +1,99 @@ +#pragma once + +/* + * Generic declarative state machine reconciler. + * + * Each managed resource is described as a directed graph of states + * with labelled transitions. The reconciler finds the shortest path + * (BFS) from a resource's current state to its wanted state and + * executes one transition per tick. + * + * Dependencies between resources prevent a resource from advancing + * past a threshold state until a prerequisite resource reaches a + * minimum state. + * + * Usage: + * struct Reconciler *r = reconciler_create(); + * + * static const struct Rec_Transition dev_trans[] = { + * {0, 1, open_device}, + * {1, 0, close_device}, + * {1, 2, start_capture}, + * {2, 1, stop_capture}, + * {-1, -1, NULL} + * }; + * static const char *dev_states[] = {"CLOSED", "OPEN", "STREAMING"}; + * struct Rec_Resource *dev = reconciler_add_resource(r, "device", + * dev_trans, 3, dev_states, 0, &my_device); + * + * reconciler_set_wanted(dev, 2); + * while (!reconciler_is_stable(r)) { + * reconciler_tick(r); + * } + */ + +/* Transition table entry. Sentinel: {-1, -1, NULL}. + * action: return 1 on success, 0 on failure. + * On failure the resource stays in 'from' state. */ +struct Rec_Transition { + int from; + int to; + int (*action)(void *userdata); +}; + +typedef enum { + REC_STATUS_STABLE, /* current == wanted */ + REC_STATUS_WORKING, /* current != wanted, next transition is eligible */ + REC_STATUS_BLOCKED, /* current != wanted, a dependency is unsatisfied */ + REC_STATUS_NO_PATH, /* current != wanted, no transition path exists */ +} Rec_Status; + +struct Reconciler; +struct Rec_Resource; + +/* Optional log callback — called after each transition attempt. */ +typedef void (*Rec_Log_Fn)( + const struct Rec_Resource *res, + int from, int to, int success, + void *userdata); + +struct Reconciler *reconciler_create(void); +void reconciler_destroy(struct Reconciler *r); + +/* Set a log callback. Called after every transition attempt. */ +void reconciler_set_log(struct Reconciler *r, Rec_Log_Fn fn, void *userdata); + +/* Add a resource. + * transitions: caller-owned, sentinel-terminated {-1,-1,NULL}. + * state_names: optional array of state_count strings; NULL for numeric display. + * initial_state: sets both current and wanted initially. */ +struct Rec_Resource *reconciler_add_resource( + struct Reconciler *r, + const char *name, + const struct Rec_Transition *transitions, + int state_count, + const char **state_names, + int initial_state, + void *userdata); + +/* Add a dependency: resource cannot reach state >= blocked_below + * unless dep is currently in state >= dep_min_state. */ +void reconciler_add_dep( + struct Rec_Resource *resource, + int blocked_below, + struct Rec_Resource *dep, + int dep_min_state); + +void reconciler_set_wanted(struct Rec_Resource *r, int wanted_state); +int reconciler_get_current(const struct Rec_Resource *r); +int reconciler_get_wanted(const struct Rec_Resource *r); +const char *reconciler_get_name(const struct Rec_Resource *r); +const char *reconciler_state_name(const struct Rec_Resource *r, int state); +Rec_Status reconciler_get_status(const struct Rec_Resource *r); + +/* Run one reconciliation pass over all resources. + * Returns number of transitions attempted (success or failure). */ +int reconciler_tick(struct Reconciler *r); + +/* Returns 1 if all resources have current == wanted. */ +int reconciler_is_stable(const struct Reconciler *r); diff --git a/planning.md b/planning.md index f2a908c..0e23d34 100644 --- a/planning.md +++ b/planning.md @@ -58,10 +58,10 @@ Modules are listed in intended build order. Each depends only on modules above i | — | `node` | done | Video node binary — config, discovery, transport server, V4L2/media control request handlers | | 8 | `test_image` | done | Test pattern generator — colour bars, luminance ramp, grid crosshatch; YUV420/BGRA output | | 9 | `xorg` | done | GLFW+OpenGL viewer sink — YUV420/BGRA/MJPEG display, all scale/anchor modes, bitmap font atlas text overlays; XRandR queries and screen grab not yet implemented | -| 10 | `reconciler` | not started | Generic wanted/current state machine reconciler — resource state graphs, BFS pathfinding, event + periodic tick; used by node to manage V4L2 devices, transport connections, and future resources (codec processes etc.) | +| 10 | `reconciler` | done | Generic wanted/current state machine reconciler — resource state graphs, BFS pathfinding, event + periodic tick; used by node to manage V4L2 devices, transport connections, and future resources (codec processes etc.) | | 11 | `frame_alloc` | not started | Per-frame allocation with bookkeeping (byte budget, ref counting) | | 12 | `relay` | not started | Input dispatch to output queues (low-latency and completeness modes) | -| 13 | `ingest` | not started | V4L2 capture loop — dequeue buffers, emit one encapsulated frame per buffer | +| 13 | `ingest` | done | V4L2 capture loop — open device, negotiate MJPEG format, MMAP buffers, capture thread with on_frame callback; start/stop lifecycle managed by reconciler | | 14 | `archive` | not started | Write frames to disk, control messages to binary log | | 15 | `codec` | not started | Per-frame encode/decode — MJPEG (libjpeg-turbo), QOI, ZSTD-raw, VA-API H.264 intra; used by screen grab source and archive | | 16 | `web node` | not started | Node.js/Express peer — speaks binary protocol on socket side, HTTP/WebSocket to browser; `protocol.mjs` mirrors C protocol module | diff --git a/src/modules/ingest/Makefile b/src/modules/ingest/Makefile new file mode 100644 index 0000000..9c67687 --- /dev/null +++ b/src/modules/ingest/Makefile @@ -0,0 +1,19 @@ +ROOT := $(abspath ../../..) +include $(ROOT)/common.mk + +MODULE_BUILD = $(BUILD)/ingest + +.PHONY: all clean + +all: $(MODULE_BUILD)/ingest.o + +$(MODULE_BUILD)/ingest.o: ingest.c | $(MODULE_BUILD) + $(CC) $(CFLAGS) $(DEPFLAGS) -c -o $@ $< + +$(MODULE_BUILD): + mkdir -p $@ + +clean: + rm -f $(MODULE_BUILD)/ingest.o $(MODULE_BUILD)/ingest.d + +-include $(MODULE_BUILD)/ingest.d diff --git a/src/modules/ingest/ingest.c b/src/modules/ingest/ingest.c new file mode 100644 index 0000000..2f8f69a --- /dev/null +++ b/src/modules/ingest/ingest.c @@ -0,0 +1,292 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ingest.h" +#include "v4l2_fmt.h" +#include "error.h" + +/* ------------------------------------------------------------------------- + * Internal types + * ------------------------------------------------------------------------- */ + +#define INGEST_N_BUFS 4 + +struct Mmap_Buf { + void *start; + size_t length; +}; + +struct Ingest_Handle { + int fd; + struct Mmap_Buf bufs[INGEST_N_BUFS]; + int buf_count; + + int width, height; + uint32_t pixfmt; + int fps_n, fps_d; + + Ingest_Frame_Fn on_frame; + Ingest_Error_Fn on_error; + void *userdata; + + pthread_t thread; + atomic_int running; /* 1 = thread should keep going; 0 = stop */ + int started; /* 1 = pthread_create was called */ +}; + +/* ------------------------------------------------------------------------- + * Capture thread + * ------------------------------------------------------------------------- */ + +static void *capture_thread(void *arg) +{ + struct Ingest_Handle *h = arg; + + while (atomic_load(&h->running)) { + fd_set fds; + FD_ZERO(&fds); + FD_SET(h->fd, &fds); + struct timeval tv = { 0, 100000 }; /* 100 ms — keeps stop latency short */ + + int r = select(h->fd + 1, &fds, NULL, NULL, &tv); + if (r < 0) { + if (errno == EINTR) { continue; } + if (h->on_error) { h->on_error("select failed", h->userdata); } + break; + } + if (r == 0) { + continue; /* timeout — recheck running flag */ + } + + struct v4l2_buffer buf = {0}; + buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + buf.memory = V4L2_MEMORY_MMAP; + if (v4l2_xioctl(h->fd, VIDIOC_DQBUF, &buf) < 0) { + if (errno == EAGAIN) { continue; } + if (h->on_error) { h->on_error("VIDIOC_DQBUF failed", h->userdata); } + break; + } + + h->on_frame( + (const uint8_t *)h->bufs[buf.index].start, + buf.bytesused, + h->width, h->height, h->pixfmt, + h->userdata); + + if (v4l2_xioctl(h->fd, VIDIOC_QBUF, &buf) < 0) { + if (h->on_error) { h->on_error("VIDIOC_QBUF failed", h->userdata); } + break; + } + } + + atomic_store(&h->running, 0); + return NULL; +} + +/* ------------------------------------------------------------------------- + * Public API + * ------------------------------------------------------------------------- */ + +struct App_Error ingest_open(const struct Ingest_Config *cfg, Ingest_Handle **out) +{ + struct Ingest_Handle *h = calloc(1, sizeof(*h)); + if (!h) { return APP_SYSCALL_ERROR(); } + + h->fd = -1; + h->on_frame = cfg->on_frame; + h->on_error = cfg->on_error; + h->userdata = cfg->userdata; + atomic_init(&h->running, 0); + + /* Open device */ + h->fd = open(cfg->device, O_RDWR | O_NONBLOCK); + if (h->fd < 0) { + free(h); + return APP_SYSCALL_ERROR(); + } + + /* Verify capture + streaming capability */ + struct v4l2_capability cap = {0}; + if (v4l2_xioctl(h->fd, VIDIOC_QUERYCAP, &cap) < 0) { + close(h->fd); free(h); + return APP_SYSCALL_ERROR(); + } + if (!(cap.capabilities & V4L2_CAP_VIDEO_CAPTURE) || + !(cap.capabilities & V4L2_CAP_STREAMING)) { + close(h->fd); free(h); + return APP_INVALID_ERROR_MSG(0, "device does not support MJPEG streaming capture"); + } + + /* Format selection */ + uint32_t want_pixfmt = cfg->pixfmt ? cfg->pixfmt : V4L2_PIX_FMT_MJPEG; + + V4l2_Fmt_Option opts[V4L2_FMT_MAX_OPTS]; + int n = v4l2_enumerate_formats(h->fd, opts, V4L2_FMT_MAX_OPTS, want_pixfmt); + if (n == 0) { + close(h->fd); free(h); + return APP_INVALID_ERROR_MSG(0, "no matching formats found on device"); + } + + /* If caller specified exact w/h use that, otherwise auto-select best */ + const V4l2_Fmt_Option *chosen; + if (cfg->width > 0 && cfg->height > 0) { + chosen = NULL; + for (int i = 0; i < n; i++) { + if (opts[i].w == cfg->width && opts[i].h == cfg->height) { + if (!chosen || v4l2_fmt_fps_gt(&opts[i], chosen)) { + chosen = &opts[i]; + } + } + } + if (!chosen) { + /* Exact size not found — fall back to best available */ + chosen = v4l2_select_best(opts, n); + } + } else { + chosen = v4l2_select_best(opts, n); + } + + /* Apply format */ + struct v4l2_format fmt = {0}; + fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + fmt.fmt.pix.pixelformat = chosen->pixfmt; + fmt.fmt.pix.width = (uint32_t)chosen->w; + fmt.fmt.pix.height = (uint32_t)chosen->h; + fmt.fmt.pix.field = V4L2_FIELD_ANY; + if (v4l2_xioctl(h->fd, VIDIOC_S_FMT, &fmt) < 0) { + close(h->fd); free(h); + return APP_SYSCALL_ERROR(); + } + + h->width = (int)fmt.fmt.pix.width; + h->height = (int)fmt.fmt.pix.height; + h->pixfmt = fmt.fmt.pix.pixelformat; + + /* Apply frame rate */ + { + struct v4l2_streamparm parm = {0}; + parm.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + parm.parm.capture.timeperframe.numerator = (uint32_t)chosen->fps_d; + parm.parm.capture.timeperframe.denominator = (uint32_t)chosen->fps_n; + v4l2_xioctl(h->fd, VIDIOC_S_PARM, &parm); + if (v4l2_xioctl(h->fd, VIDIOC_G_PARM, &parm) == 0 && + parm.parm.capture.timeperframe.denominator > 0) { + h->fps_n = (int)parm.parm.capture.timeperframe.denominator; + h->fps_d = (int)parm.parm.capture.timeperframe.numerator; + } else { + h->fps_n = chosen->fps_n; + h->fps_d = chosen->fps_d; + } + } + + /* Allocate MMAP buffers */ + struct v4l2_requestbuffers req = {0}; + req.count = INGEST_N_BUFS; + req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + req.memory = V4L2_MEMORY_MMAP; + if (v4l2_xioctl(h->fd, VIDIOC_REQBUFS, &req) < 0) { + close(h->fd); free(h); + return APP_SYSCALL_ERROR(); + } + + h->buf_count = (int)req.count; + for (int i = 0; i < h->buf_count; i++) { + struct v4l2_buffer buf = {0}; + buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + buf.memory = V4L2_MEMORY_MMAP; + buf.index = (uint32_t)i; + if (v4l2_xioctl(h->fd, VIDIOC_QUERYBUF, &buf) < 0) { + /* Unmap already-mapped buffers before returning */ + for (int j = 0; j < i; j++) { + munmap(h->bufs[j].start, h->bufs[j].length); + } + close(h->fd); free(h); + return APP_SYSCALL_ERROR(); + } + h->bufs[i].length = buf.length; + h->bufs[i].start = mmap(NULL, buf.length, + PROT_READ | PROT_WRITE, MAP_SHARED, h->fd, buf.m.offset); + if (h->bufs[i].start == MAP_FAILED) { + for (int j = 0; j < i; j++) { + munmap(h->bufs[j].start, h->bufs[j].length); + } + close(h->fd); free(h); + return APP_SYSCALL_ERROR(); + } + } + + *out = h; + return APP_OK; +} + +struct App_Error ingest_start(Ingest_Handle *h) +{ + /* Queue all buffers */ + for (int i = 0; i < h->buf_count; i++) { + struct v4l2_buffer buf = {0}; + buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + buf.memory = V4L2_MEMORY_MMAP; + buf.index = (uint32_t)i; + if (v4l2_xioctl(h->fd, VIDIOC_QBUF, &buf) < 0) { + return APP_SYSCALL_ERROR(); + } + } + + /* Enable streaming */ + enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + if (v4l2_xioctl(h->fd, VIDIOC_STREAMON, &type) < 0) { + return APP_SYSCALL_ERROR(); + } + + /* Start capture thread */ + atomic_store(&h->running, 1); + if (pthread_create(&h->thread, NULL, capture_thread, h) != 0) { + atomic_store(&h->running, 0); + v4l2_xioctl(h->fd, VIDIOC_STREAMOFF, &type); + return APP_SYSCALL_ERROR(); + } + h->started = 1; + return APP_OK; +} + +struct App_Error ingest_stop(Ingest_Handle *h) +{ + if (!h->started) { + return APP_OK; + } + + atomic_store(&h->running, 0); + pthread_join(h->thread, NULL); + h->started = 0; + + enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE; + v4l2_xioctl(h->fd, VIDIOC_STREAMOFF, &type); + + return APP_OK; +} + +void ingest_close(Ingest_Handle *h) +{ + if (!h) { return; } + for (int i = 0; i < h->buf_count; i++) { + if (h->bufs[i].start && h->bufs[i].start != MAP_FAILED) { + munmap(h->bufs[i].start, h->bufs[i].length); + } + } + if (h->fd >= 0) { close(h->fd); } + free(h); +} + +int ingest_width(const Ingest_Handle *h) { return h->width; } +int ingest_height(const Ingest_Handle *h) { return h->height; } +uint32_t ingest_pixfmt(const Ingest_Handle *h) { return h->pixfmt; } +int ingest_fps_n(const Ingest_Handle *h) { return h->fps_n; } +int ingest_fps_d(const Ingest_Handle *h) { return h->fps_d; } diff --git a/src/modules/reconciler/Makefile b/src/modules/reconciler/Makefile new file mode 100644 index 0000000..1908c93 --- /dev/null +++ b/src/modules/reconciler/Makefile @@ -0,0 +1,19 @@ +ROOT := $(abspath ../../..) +include $(ROOT)/common.mk + +MODULE_BUILD = $(BUILD)/reconciler + +.PHONY: all clean + +all: $(MODULE_BUILD)/reconciler.o + +$(MODULE_BUILD)/reconciler.o: reconciler.c | $(MODULE_BUILD) + $(CC) $(CFLAGS) $(DEPFLAGS) -c -o $@ $< + +$(MODULE_BUILD): + mkdir -p $@ + +clean: + rm -f $(MODULE_BUILD)/reconciler.o $(MODULE_BUILD)/reconciler.d + +-include $(MODULE_BUILD)/reconciler.d diff --git a/src/modules/reconciler/reconciler.c b/src/modules/reconciler/reconciler.c new file mode 100644 index 0000000..89f3a7a --- /dev/null +++ b/src/modules/reconciler/reconciler.c @@ -0,0 +1,274 @@ +#include +#include +#include "reconciler.h" + +#define REC_MAX_RESOURCES 32 +#define REC_MAX_STATES 16 +#define REC_MAX_DEPS 8 + +struct Rec_Dep { + struct Rec_Resource *dep; + int dep_min_state; + int blocked_below; +}; + +struct Rec_Resource { + char name[32]; + const struct Rec_Transition *transitions; + int state_count; + const char **state_names; + int current_state; + int wanted_state; + void *userdata; + struct Rec_Dep deps[REC_MAX_DEPS]; + int dep_count; +}; + +struct Reconciler { + struct Rec_Resource resources[REC_MAX_RESOURCES]; + int count; + Rec_Log_Fn log_fn; + void *log_userdata; +}; + +struct Reconciler *reconciler_create(void) { + struct Reconciler *r = calloc(1, sizeof(struct Reconciler)); + return r; +} + +void reconciler_destroy(struct Reconciler *r) { + free(r); +} + +void reconciler_set_log(struct Reconciler *r, Rec_Log_Fn fn, void *userdata) { + r->log_fn = fn; + r->log_userdata = userdata; +} + +struct Rec_Resource *reconciler_add_resource( + struct Reconciler *r, + const char *name, + const struct Rec_Transition *transitions, + int state_count, + const char **state_names, + int initial_state, + void *userdata) +{ + if (r->count >= REC_MAX_RESOURCES) { + return NULL; + } + + struct Rec_Resource *res = &r->resources[r->count++]; + memset(res, 0, sizeof(*res)); + strncpy(res->name, name, sizeof(res->name) - 1); + res->transitions = transitions; + res->state_count = state_count; + res->state_names = state_names; + res->current_state = initial_state; + res->wanted_state = initial_state; + res->userdata = userdata; + res->dep_count = 0; + return res; +} + +void reconciler_add_dep( + struct Rec_Resource *resource, + int blocked_below, + struct Rec_Resource *dep, + int dep_min_state) +{ + if (resource->dep_count >= REC_MAX_DEPS) { + return; + } + + struct Rec_Dep *d = &resource->deps[resource->dep_count++]; + d->dep = dep; + d->dep_min_state = dep_min_state; + d->blocked_below = blocked_below; +} + +void reconciler_set_wanted(struct Rec_Resource *r, int wanted_state) { + r->wanted_state = wanted_state; +} + +int reconciler_get_current(const struct Rec_Resource *r) { + return r->current_state; +} + +int reconciler_get_wanted(const struct Rec_Resource *r) { + return r->wanted_state; +} + +const char *reconciler_get_name(const struct Rec_Resource *r) { + return r->name; +} + +const char *reconciler_state_name(const struct Rec_Resource *r, int state) { + if (r->state_names != NULL && state >= 0 && state < r->state_count) { + return r->state_names[state]; + } + return NULL; +} + +/* + * BFS over the transition graph to find the shortest path from + * current_state to wanted_state. Returns the first transition on + * that path, or NULL if no path exists (or already stable). + */ +static const struct Rec_Transition *find_next_transition(const struct Rec_Resource *res) { + if (res->current_state == res->wanted_state) { + return NULL; + } + + /* prev[s] = index of transition in res->transitions that leads into state s, + * or -1 if not yet visited. */ + int prev_trans[REC_MAX_STATES]; + int visited[REC_MAX_STATES]; + for (int i = 0; i < REC_MAX_STATES; i++) { + prev_trans[i] = -1; + visited[i] = 0; + } + + /* BFS queue — state indices. */ + int queue[REC_MAX_STATES]; + int head = 0; + int tail = 0; + + visited[res->current_state] = 1; + queue[tail++] = res->current_state; + + int found = 0; + + while (head < tail && !found) { + int cur = queue[head++]; + + for (int i = 0; ; i++) { + const struct Rec_Transition *t = &res->transitions[i]; + if (t->from == -1 && t->to == -1 && t->action == NULL) { + break; + } + + if (t->from != cur) { + continue; + } + + int next = t->to; + if (next < 0 || next >= REC_MAX_STATES) { + continue; + } + + if (visited[next]) { + continue; + } + + visited[next] = 1; + prev_trans[next] = i; + queue[tail++] = next; + + if (next == res->wanted_state) { + found = 1; + break; + } + } + } + + if (!found) { + return NULL; + } + + /* Walk back from wanted_state to find the first step. */ + int state = res->wanted_state; + int first_trans_idx = prev_trans[state]; + + while (1) { + int ti = prev_trans[state]; + if (ti == -1) { + break; + } + int from_state = res->transitions[ti].from; + if (from_state == res->current_state) { + first_trans_idx = ti; + break; + } + first_trans_idx = ti; + state = from_state; + } + + return &res->transitions[first_trans_idx]; +} + +/* + * Returns 1 if all dependencies allow the resource to enter next_state. + * Returns 0 if any dependency blocks it. + */ +static int deps_allow(const struct Rec_Resource *res, int next_state) { + for (int i = 0; i < res->dep_count; i++) { + const struct Rec_Dep *d = &res->deps[i]; + if (next_state >= d->blocked_below && d->dep->current_state < d->dep_min_state) { + return 0; + } + } + return 1; +} + +Rec_Status reconciler_get_status(const struct Rec_Resource *r) { + if (r->current_state == r->wanted_state) { + return REC_STATUS_STABLE; + } + + const struct Rec_Transition *t = find_next_transition(r); + if (t == NULL) { + return REC_STATUS_NO_PATH; + } + + if (!deps_allow(r, t->to)) { + return REC_STATUS_BLOCKED; + } + + return REC_STATUS_WORKING; +} + +int reconciler_tick(struct Reconciler *r) { + int attempted = 0; + + for (int i = 0; i < r->count; i++) { + struct Rec_Resource *res = &r->resources[i]; + + if (res->current_state == res->wanted_state) { + continue; + } + + const struct Rec_Transition *t = find_next_transition(res); + if (t == NULL) { + continue; + } + + if (!deps_allow(res, t->to)) { + continue; + } + + int from = res->current_state; + int to = t->to; + int success = t->action(res->userdata); + attempted++; + + if (success) { + res->current_state = to; + } + + if (r->log_fn != NULL) { + r->log_fn(res, from, to, success, r->log_userdata); + } + } + + return attempted; +} + +int reconciler_is_stable(const struct Reconciler *r) { + for (int i = 0; i < r->count; i++) { + if (r->resources[i].current_state != r->resources[i].wanted_state) { + return 0; + } + } + return 1; +}