reconciler: generic resource state machine — BFS pathfinding from current to wanted state, dependency constraints, event/periodic tick model. reconciler_cli exercises it with simulated device/transport/stream resources. ingest: V4L2 capture module — open device, negotiate MJPEG format, MMAP buffer pool, capture thread with on_frame callback. start/stop lifecycle designed for reconciler management. Transport-agnostic: caller wires on_frame to proto_write_video_frame. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
457 lines
12 KiB
C
457 lines
12 KiB
C
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <ctype.h>
|
|
#include "reconciler.h"
|
|
|
|
/* -----------------------------------------------------------------------
|
|
* Simulated resource userdata
|
|
* ----------------------------------------------------------------------- */
|
|
|
|
struct Sim_State {
|
|
const char *name;
|
|
int fail_next;
|
|
};
|
|
|
|
static int sim_action(struct Sim_State *s, const char *action_name) {
|
|
printf(" [%s] %s\n", s->name, action_name);
|
|
if (s->fail_next) {
|
|
s->fail_next = 0;
|
|
return 0;
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
/* device transitions */
|
|
static int device_open(void *ud) { return sim_action((struct Sim_State *)ud, "opening device"); }
|
|
static int device_close(void *ud) { return sim_action((struct Sim_State *)ud, "closing device"); }
|
|
static int device_start(void *ud) { return sim_action((struct Sim_State *)ud, "starting capture"); }
|
|
static int device_stop(void *ud) { return sim_action((struct Sim_State *)ud, "stopping capture"); }
|
|
|
|
/* transport transitions */
|
|
static int transport_connect(void *ud) { return sim_action((struct Sim_State *)ud, "connecting transport"); }
|
|
static int transport_disconnect(void *ud) { return sim_action((struct Sim_State *)ud, "disconnecting transport"); }
|
|
|
|
/* stream transitions */
|
|
static int stream_activate(void *ud) { return sim_action((struct Sim_State *)ud, "activating stream"); }
|
|
static int stream_deactivate(void *ud) { return sim_action((struct Sim_State *)ud, "deactivating stream"); }
|
|
|
|
/* -----------------------------------------------------------------------
|
|
* Log callback
|
|
* ----------------------------------------------------------------------- */
|
|
|
|
static void on_log(
|
|
const struct Rec_Resource *res,
|
|
int from, int to, int success,
|
|
void *userdata)
|
|
{
|
|
(void)userdata;
|
|
|
|
const char *from_name = reconciler_state_name(res, from);
|
|
const char *to_name = reconciler_state_name(res, to);
|
|
|
|
if (from_name != NULL && to_name != NULL) {
|
|
printf(" [%s] %s -> %s ... %s\n",
|
|
reconciler_get_name(res),
|
|
from_name, to_name,
|
|
success ? "ok" : "FAILED");
|
|
} else {
|
|
printf(" [%s] %d -> %d ... %s\n",
|
|
reconciler_get_name(res),
|
|
from, to,
|
|
success ? "ok" : "FAILED");
|
|
}
|
|
}
|
|
|
|
/* -----------------------------------------------------------------------
|
|
* Helpers
|
|
* ----------------------------------------------------------------------- */
|
|
|
|
static const char *status_label(Rec_Status s) {
|
|
switch (s) {
|
|
case REC_STATUS_STABLE: return "stable";
|
|
case REC_STATUS_WORKING: return "working";
|
|
case REC_STATUS_BLOCKED: return "blocked";
|
|
case REC_STATUS_NO_PATH: return "no_path";
|
|
}
|
|
return "?";
|
|
}
|
|
|
|
/*
|
|
* Find the first unsatisfied dep for a resource.
|
|
* Returns NULL if none (or resource is not blocked).
|
|
*/
|
|
static void print_blocked_reason(const struct Rec_Resource *res) {
|
|
/* We need access to internals — expose via a helper approach.
|
|
* Since we can't access internal deps from outside the module,
|
|
* we rely on reconciler_get_status returning BLOCKED and print
|
|
* a generic message. The CLI has access to the sim resources
|
|
* directly so we can check ourselves using the public API. */
|
|
(void)res;
|
|
printf(" (dependency unsatisfied)");
|
|
}
|
|
|
|
static void print_state(const struct Rec_Resource *res, int state) {
|
|
const char *name = reconciler_state_name(res, state);
|
|
if (name != NULL) {
|
|
printf("%s(%d)", name, state);
|
|
} else {
|
|
printf("%d", state);
|
|
}
|
|
}
|
|
|
|
/* -----------------------------------------------------------------------
|
|
* Blocked dependency introspection
|
|
*
|
|
* We track resource/dep relationships here in the CLI so we can print
|
|
* informative blocked messages without exposing internals from the module.
|
|
* ----------------------------------------------------------------------- */
|
|
|
|
#define CLI_MAX_DEPS 8
|
|
|
|
struct Cli_Dep {
|
|
const struct Rec_Resource *resource;
|
|
int blocked_below;
|
|
const struct Rec_Resource *dep;
|
|
int dep_min_state;
|
|
};
|
|
|
|
static struct Cli_Dep cli_deps[CLI_MAX_DEPS];
|
|
static int cli_dep_count = 0;
|
|
|
|
static void cli_add_dep(
|
|
const struct Rec_Resource *resource,
|
|
int blocked_below,
|
|
const struct Rec_Resource *dep,
|
|
int dep_min_state)
|
|
{
|
|
if (cli_dep_count >= CLI_MAX_DEPS) {
|
|
return;
|
|
}
|
|
struct Cli_Dep *d = &cli_deps[cli_dep_count++];
|
|
d->resource = resource;
|
|
d->blocked_below = blocked_below;
|
|
d->dep = dep;
|
|
d->dep_min_state = dep_min_state;
|
|
}
|
|
|
|
static void print_blocked_info(const struct Rec_Resource *res) {
|
|
int wanted = reconciler_get_wanted(res);
|
|
int current = reconciler_get_current(res);
|
|
(void)current;
|
|
|
|
for (int i = 0; i < cli_dep_count; i++) {
|
|
struct Cli_Dep *d = &cli_deps[i];
|
|
if (d->resource != res) {
|
|
continue;
|
|
}
|
|
if (wanted < d->blocked_below) {
|
|
continue;
|
|
}
|
|
if (reconciler_get_current(d->dep) < d->dep_min_state) {
|
|
printf(" [blocked: %s < ", reconciler_get_name(d->dep));
|
|
print_state(d->dep, d->dep_min_state);
|
|
printf("]");
|
|
return;
|
|
}
|
|
}
|
|
print_blocked_reason(res);
|
|
}
|
|
|
|
/* -----------------------------------------------------------------------
|
|
* Command implementations
|
|
* ----------------------------------------------------------------------- */
|
|
|
|
#define MAX_RESOURCES 8
|
|
|
|
static struct Rec_Resource *all_resources[MAX_RESOURCES];
|
|
static int resource_count = 0;
|
|
|
|
static struct Rec_Resource *find_resource(const char *name) {
|
|
for (int i = 0; i < resource_count; i++) {
|
|
if (strcmp(reconciler_get_name(all_resources[i]), name) == 0) {
|
|
return all_resources[i];
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
static int parse_state(const struct Rec_Resource *res, const char *token) {
|
|
/* Try numeric first. */
|
|
char *end;
|
|
long n = strtol(token, &end, 10);
|
|
if (*end == '\0') {
|
|
return (int)n;
|
|
}
|
|
|
|
/* Try case-insensitive name match. */
|
|
int state_count = 0;
|
|
/* Iterate states 0..N-1 using reconciler_state_name. We don't know
|
|
* state_count without internal access, so scan until NULL. */
|
|
for (int i = 0; i < 64; i++) {
|
|
const char *sname = reconciler_state_name(res, i);
|
|
if (sname == NULL) {
|
|
break;
|
|
}
|
|
state_count++;
|
|
/* Case-insensitive compare. */
|
|
int match = 1;
|
|
size_t tlen = strlen(token);
|
|
size_t slen = strlen(sname);
|
|
if (tlen != slen) {
|
|
match = 0;
|
|
} else {
|
|
for (size_t j = 0; j < tlen; j++) {
|
|
if (tolower((unsigned char)token[j]) != tolower((unsigned char)sname[j])) {
|
|
match = 0;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if (match) {
|
|
return i;
|
|
}
|
|
}
|
|
(void)state_count;
|
|
return -1;
|
|
}
|
|
|
|
static void cmd_status(void) {
|
|
for (int i = 0; i < resource_count; i++) {
|
|
const struct Rec_Resource *res = all_resources[i];
|
|
int current = reconciler_get_current(res);
|
|
int wanted = reconciler_get_wanted(res);
|
|
Rec_Status status = reconciler_get_status(res);
|
|
|
|
printf(" %-12s ", reconciler_get_name(res));
|
|
print_state(res, current);
|
|
printf(" wanted ");
|
|
print_state(res, wanted);
|
|
printf(" [%s]", status_label(status));
|
|
|
|
if (status == REC_STATUS_BLOCKED) {
|
|
print_blocked_info(res);
|
|
}
|
|
|
|
printf("\n");
|
|
}
|
|
}
|
|
|
|
static void cmd_want(const char *name, const char *state_token) {
|
|
struct Rec_Resource *res = find_resource(name);
|
|
if (res == NULL) {
|
|
printf("unknown resource: %s\n", name);
|
|
return;
|
|
}
|
|
|
|
int state = parse_state(res, state_token);
|
|
if (state < 0) {
|
|
printf("unknown state: %s\n", state_token);
|
|
return;
|
|
}
|
|
|
|
reconciler_set_wanted(res, state);
|
|
printf(" %s wanted -> ", name);
|
|
print_state(res, state);
|
|
printf("\n");
|
|
}
|
|
|
|
static void cmd_tick(struct Reconciler *r) {
|
|
int n = reconciler_tick(r);
|
|
if (n == 0) {
|
|
printf(" (no transitions)\n");
|
|
}
|
|
}
|
|
|
|
static void cmd_run(struct Reconciler *r) {
|
|
int max_ticks = 20;
|
|
int tick = 0;
|
|
|
|
while (!reconciler_is_stable(r) && tick < max_ticks) {
|
|
printf("tick %d:\n", tick + 1);
|
|
int n = reconciler_tick(r);
|
|
if (n == 0) {
|
|
printf(" (no progress — stopping)\n");
|
|
break;
|
|
}
|
|
tick++;
|
|
}
|
|
|
|
if (reconciler_is_stable(r)) {
|
|
printf("stable after %d tick(s)\n", tick);
|
|
} else if (tick >= max_ticks) {
|
|
printf("reached max ticks (%d) without stabilising\n", max_ticks);
|
|
}
|
|
}
|
|
|
|
static void cmd_fail(const char *name, struct Sim_State sim_states[], int sim_count) {
|
|
for (int i = 0; i < sim_count; i++) {
|
|
if (strcmp(sim_states[i].name, name) == 0) {
|
|
sim_states[i].fail_next = 1;
|
|
printf(" next action for %s will fail\n", name);
|
|
return;
|
|
}
|
|
}
|
|
printf("unknown resource: %s\n", name);
|
|
}
|
|
|
|
static void cmd_help(void) {
|
|
printf("commands:\n");
|
|
printf(" status print all resources with current/wanted state and status\n");
|
|
printf(" want <name> <state> set wanted state (by number or name, case-insensitive)\n");
|
|
printf(" tick run one reconciler tick\n");
|
|
printf(" run tick until stable (max 20 ticks)\n");
|
|
printf(" fail <name> make the next action for this resource fail\n");
|
|
printf(" help show this help\n");
|
|
printf(" quit / exit exit\n");
|
|
}
|
|
|
|
/* -----------------------------------------------------------------------
|
|
* Main
|
|
* ----------------------------------------------------------------------- */
|
|
|
|
int main(void) {
|
|
struct Reconciler *r = reconciler_create();
|
|
reconciler_set_log(r, on_log, NULL);
|
|
|
|
/* Device resource. */
|
|
static const struct Rec_Transition device_trans[] = {
|
|
{0, 1, device_open},
|
|
{1, 0, device_close},
|
|
{1, 2, device_start},
|
|
{2, 1, device_stop},
|
|
{-1, -1, NULL}
|
|
};
|
|
static const char *device_states[] = {"CLOSED", "OPEN", "STREAMING"};
|
|
|
|
/* Transport resource. */
|
|
static const struct Rec_Transition transport_trans[] = {
|
|
{0, 1, transport_connect},
|
|
{1, 0, transport_disconnect},
|
|
{-1, -1, NULL}
|
|
};
|
|
static const char *transport_states[] = {"DISCONNECTED", "CONNECTED"};
|
|
|
|
/* Stream resource. */
|
|
static const struct Rec_Transition stream_trans[] = {
|
|
{0, 1, stream_activate},
|
|
{1, 0, stream_deactivate},
|
|
{-1, -1, NULL}
|
|
};
|
|
static const char *stream_states[] = {"INACTIVE", "ACTIVE"};
|
|
|
|
/* Sim userdata — indexed to match resource order. */
|
|
static struct Sim_State sim_states[] = {
|
|
{"device", 0},
|
|
{"transport", 0},
|
|
{"stream", 0},
|
|
};
|
|
|
|
struct Rec_Resource *device = reconciler_add_resource(r,
|
|
"device", device_trans, 3, device_states, 0, &sim_states[0]);
|
|
|
|
struct Rec_Resource *transport = reconciler_add_resource(r,
|
|
"transport", transport_trans, 2, transport_states, 0, &sim_states[1]);
|
|
|
|
struct Rec_Resource *stream = reconciler_add_resource(r,
|
|
"stream", stream_trans, 2, stream_states, 0, &sim_states[2]);
|
|
|
|
/* Dependencies. */
|
|
/* transport cannot reach CONNECTED(1) unless device >= OPEN(1). */
|
|
reconciler_add_dep(transport, 1, device, 1);
|
|
cli_add_dep(transport, 1, device, 1);
|
|
|
|
/* stream cannot reach ACTIVE(1) unless transport >= CONNECTED(1). */
|
|
reconciler_add_dep(stream, 1, transport, 1);
|
|
cli_add_dep(stream, 1, transport, 1);
|
|
|
|
/* stream cannot reach ACTIVE(1) unless device >= STREAMING(2). */
|
|
reconciler_add_dep(stream, 1, device, 2);
|
|
cli_add_dep(stream, 1, device, 2);
|
|
|
|
/* Register resources for lookup. */
|
|
all_resources[resource_count++] = device;
|
|
all_resources[resource_count++] = transport;
|
|
all_resources[resource_count++] = stream;
|
|
|
|
/* Welcome. */
|
|
printf("reconciler_cli — interactive declarative state machine demo\n\n");
|
|
cmd_help();
|
|
printf("\n");
|
|
cmd_status();
|
|
printf("\n");
|
|
|
|
/* REPL. */
|
|
char line[256];
|
|
while (1) {
|
|
printf("> ");
|
|
fflush(stdout);
|
|
|
|
if (fgets(line, sizeof(line), stdin) == NULL) {
|
|
break;
|
|
}
|
|
|
|
/* Strip trailing newline. */
|
|
size_t len = strlen(line);
|
|
while (len > 0 && (line[len - 1] == '\n' || line[len - 1] == '\r')) {
|
|
line[--len] = '\0';
|
|
}
|
|
|
|
/* Tokenise. */
|
|
char *tokens[4];
|
|
int ntok = 0;
|
|
char *p = line;
|
|
|
|
while (*p != '\0' && ntok < 4) {
|
|
while (*p == ' ' || *p == '\t') {
|
|
p++;
|
|
}
|
|
if (*p == '\0') {
|
|
break;
|
|
}
|
|
tokens[ntok++] = p;
|
|
while (*p != '\0' && *p != ' ' && *p != '\t') {
|
|
p++;
|
|
}
|
|
if (*p != '\0') {
|
|
*p++ = '\0';
|
|
}
|
|
}
|
|
|
|
if (ntok == 0) {
|
|
continue;
|
|
}
|
|
|
|
const char *cmd = tokens[0];
|
|
|
|
if (strcmp(cmd, "quit") == 0 || strcmp(cmd, "exit") == 0) {
|
|
break;
|
|
} else if (strcmp(cmd, "help") == 0) {
|
|
cmd_help();
|
|
} else if (strcmp(cmd, "status") == 0) {
|
|
cmd_status();
|
|
} else if (strcmp(cmd, "tick") == 0) {
|
|
cmd_tick(r);
|
|
} else if (strcmp(cmd, "run") == 0) {
|
|
cmd_run(r);
|
|
} else if (strcmp(cmd, "want") == 0) {
|
|
if (ntok < 3) {
|
|
printf("usage: want <name> <state>\n");
|
|
} else {
|
|
cmd_want(tokens[1], tokens[2]);
|
|
}
|
|
} else if (strcmp(cmd, "fail") == 0) {
|
|
if (ntok < 2) {
|
|
printf("usage: fail <name>\n");
|
|
} else {
|
|
cmd_fail(tokens[1], sim_states, 3);
|
|
}
|
|
} else {
|
|
printf("unknown command: %s (type 'help' for commands)\n", cmd);
|
|
}
|
|
}
|
|
|
|
reconciler_destroy(r);
|
|
return 0;
|
|
}
|