Add reconciler and ingest modules with CLI driver

reconciler: generic resource state machine — BFS pathfinding from current
to wanted state, dependency constraints, event/periodic tick model.
reconciler_cli exercises it with simulated device/transport/stream resources.

ingest: V4L2 capture module — open device, negotiate MJPEG format, MMAP
buffer pool, capture thread with on_frame callback. start/stop lifecycle
designed for reconciler management. Transport-agnostic: caller wires
on_frame to proto_write_video_frame.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-29 01:52:17 +00:00
parent 4e40223478
commit 639a84b1b9
9 changed files with 1238 additions and 5 deletions

292
src/modules/ingest/ingest.c Normal file
View File

@@ -0,0 +1,292 @@
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <sys/mman.h>
#include <sys/select.h>
#include <pthread.h>
#include <stdatomic.h>
#include <linux/videodev2.h>
#include "ingest.h"
#include "v4l2_fmt.h"
#include "error.h"
/* -------------------------------------------------------------------------
* Internal types
* ------------------------------------------------------------------------- */
#define INGEST_N_BUFS 4
struct Mmap_Buf {
void *start;
size_t length;
};
struct Ingest_Handle {
int fd;
struct Mmap_Buf bufs[INGEST_N_BUFS];
int buf_count;
int width, height;
uint32_t pixfmt;
int fps_n, fps_d;
Ingest_Frame_Fn on_frame;
Ingest_Error_Fn on_error;
void *userdata;
pthread_t thread;
atomic_int running; /* 1 = thread should keep going; 0 = stop */
int started; /* 1 = pthread_create was called */
};
/* -------------------------------------------------------------------------
* Capture thread
* ------------------------------------------------------------------------- */
static void *capture_thread(void *arg)
{
struct Ingest_Handle *h = arg;
while (atomic_load(&h->running)) {
fd_set fds;
FD_ZERO(&fds);
FD_SET(h->fd, &fds);
struct timeval tv = { 0, 100000 }; /* 100 ms — keeps stop latency short */
int r = select(h->fd + 1, &fds, NULL, NULL, &tv);
if (r < 0) {
if (errno == EINTR) { continue; }
if (h->on_error) { h->on_error("select failed", h->userdata); }
break;
}
if (r == 0) {
continue; /* timeout — recheck running flag */
}
struct v4l2_buffer buf = {0};
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
if (v4l2_xioctl(h->fd, VIDIOC_DQBUF, &buf) < 0) {
if (errno == EAGAIN) { continue; }
if (h->on_error) { h->on_error("VIDIOC_DQBUF failed", h->userdata); }
break;
}
h->on_frame(
(const uint8_t *)h->bufs[buf.index].start,
buf.bytesused,
h->width, h->height, h->pixfmt,
h->userdata);
if (v4l2_xioctl(h->fd, VIDIOC_QBUF, &buf) < 0) {
if (h->on_error) { h->on_error("VIDIOC_QBUF failed", h->userdata); }
break;
}
}
atomic_store(&h->running, 0);
return NULL;
}
/* -------------------------------------------------------------------------
* Public API
* ------------------------------------------------------------------------- */
struct App_Error ingest_open(const struct Ingest_Config *cfg, Ingest_Handle **out)
{
struct Ingest_Handle *h = calloc(1, sizeof(*h));
if (!h) { return APP_SYSCALL_ERROR(); }
h->fd = -1;
h->on_frame = cfg->on_frame;
h->on_error = cfg->on_error;
h->userdata = cfg->userdata;
atomic_init(&h->running, 0);
/* Open device */
h->fd = open(cfg->device, O_RDWR | O_NONBLOCK);
if (h->fd < 0) {
free(h);
return APP_SYSCALL_ERROR();
}
/* Verify capture + streaming capability */
struct v4l2_capability cap = {0};
if (v4l2_xioctl(h->fd, VIDIOC_QUERYCAP, &cap) < 0) {
close(h->fd); free(h);
return APP_SYSCALL_ERROR();
}
if (!(cap.capabilities & V4L2_CAP_VIDEO_CAPTURE) ||
!(cap.capabilities & V4L2_CAP_STREAMING)) {
close(h->fd); free(h);
return APP_INVALID_ERROR_MSG(0, "device does not support MJPEG streaming capture");
}
/* Format selection */
uint32_t want_pixfmt = cfg->pixfmt ? cfg->pixfmt : V4L2_PIX_FMT_MJPEG;
V4l2_Fmt_Option opts[V4L2_FMT_MAX_OPTS];
int n = v4l2_enumerate_formats(h->fd, opts, V4L2_FMT_MAX_OPTS, want_pixfmt);
if (n == 0) {
close(h->fd); free(h);
return APP_INVALID_ERROR_MSG(0, "no matching formats found on device");
}
/* If caller specified exact w/h use that, otherwise auto-select best */
const V4l2_Fmt_Option *chosen;
if (cfg->width > 0 && cfg->height > 0) {
chosen = NULL;
for (int i = 0; i < n; i++) {
if (opts[i].w == cfg->width && opts[i].h == cfg->height) {
if (!chosen || v4l2_fmt_fps_gt(&opts[i], chosen)) {
chosen = &opts[i];
}
}
}
if (!chosen) {
/* Exact size not found — fall back to best available */
chosen = v4l2_select_best(opts, n);
}
} else {
chosen = v4l2_select_best(opts, n);
}
/* Apply format */
struct v4l2_format fmt = {0};
fmt.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
fmt.fmt.pix.pixelformat = chosen->pixfmt;
fmt.fmt.pix.width = (uint32_t)chosen->w;
fmt.fmt.pix.height = (uint32_t)chosen->h;
fmt.fmt.pix.field = V4L2_FIELD_ANY;
if (v4l2_xioctl(h->fd, VIDIOC_S_FMT, &fmt) < 0) {
close(h->fd); free(h);
return APP_SYSCALL_ERROR();
}
h->width = (int)fmt.fmt.pix.width;
h->height = (int)fmt.fmt.pix.height;
h->pixfmt = fmt.fmt.pix.pixelformat;
/* Apply frame rate */
{
struct v4l2_streamparm parm = {0};
parm.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
parm.parm.capture.timeperframe.numerator = (uint32_t)chosen->fps_d;
parm.parm.capture.timeperframe.denominator = (uint32_t)chosen->fps_n;
v4l2_xioctl(h->fd, VIDIOC_S_PARM, &parm);
if (v4l2_xioctl(h->fd, VIDIOC_G_PARM, &parm) == 0 &&
parm.parm.capture.timeperframe.denominator > 0) {
h->fps_n = (int)parm.parm.capture.timeperframe.denominator;
h->fps_d = (int)parm.parm.capture.timeperframe.numerator;
} else {
h->fps_n = chosen->fps_n;
h->fps_d = chosen->fps_d;
}
}
/* Allocate MMAP buffers */
struct v4l2_requestbuffers req = {0};
req.count = INGEST_N_BUFS;
req.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
req.memory = V4L2_MEMORY_MMAP;
if (v4l2_xioctl(h->fd, VIDIOC_REQBUFS, &req) < 0) {
close(h->fd); free(h);
return APP_SYSCALL_ERROR();
}
h->buf_count = (int)req.count;
for (int i = 0; i < h->buf_count; i++) {
struct v4l2_buffer buf = {0};
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = (uint32_t)i;
if (v4l2_xioctl(h->fd, VIDIOC_QUERYBUF, &buf) < 0) {
/* Unmap already-mapped buffers before returning */
for (int j = 0; j < i; j++) {
munmap(h->bufs[j].start, h->bufs[j].length);
}
close(h->fd); free(h);
return APP_SYSCALL_ERROR();
}
h->bufs[i].length = buf.length;
h->bufs[i].start = mmap(NULL, buf.length,
PROT_READ | PROT_WRITE, MAP_SHARED, h->fd, buf.m.offset);
if (h->bufs[i].start == MAP_FAILED) {
for (int j = 0; j < i; j++) {
munmap(h->bufs[j].start, h->bufs[j].length);
}
close(h->fd); free(h);
return APP_SYSCALL_ERROR();
}
}
*out = h;
return APP_OK;
}
struct App_Error ingest_start(Ingest_Handle *h)
{
/* Queue all buffers */
for (int i = 0; i < h->buf_count; i++) {
struct v4l2_buffer buf = {0};
buf.type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
buf.memory = V4L2_MEMORY_MMAP;
buf.index = (uint32_t)i;
if (v4l2_xioctl(h->fd, VIDIOC_QBUF, &buf) < 0) {
return APP_SYSCALL_ERROR();
}
}
/* Enable streaming */
enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
if (v4l2_xioctl(h->fd, VIDIOC_STREAMON, &type) < 0) {
return APP_SYSCALL_ERROR();
}
/* Start capture thread */
atomic_store(&h->running, 1);
if (pthread_create(&h->thread, NULL, capture_thread, h) != 0) {
atomic_store(&h->running, 0);
v4l2_xioctl(h->fd, VIDIOC_STREAMOFF, &type);
return APP_SYSCALL_ERROR();
}
h->started = 1;
return APP_OK;
}
struct App_Error ingest_stop(Ingest_Handle *h)
{
if (!h->started) {
return APP_OK;
}
atomic_store(&h->running, 0);
pthread_join(h->thread, NULL);
h->started = 0;
enum v4l2_buf_type type = V4L2_BUF_TYPE_VIDEO_CAPTURE;
v4l2_xioctl(h->fd, VIDIOC_STREAMOFF, &type);
return APP_OK;
}
void ingest_close(Ingest_Handle *h)
{
if (!h) { return; }
for (int i = 0; i < h->buf_count; i++) {
if (h->bufs[i].start && h->bufs[i].start != MAP_FAILED) {
munmap(h->bufs[i].start, h->bufs[i].length);
}
}
if (h->fd >= 0) { close(h->fd); }
free(h);
}
int ingest_width(const Ingest_Handle *h) { return h->width; }
int ingest_height(const Ingest_Handle *h) { return h->height; }
uint32_t ingest_pixfmt(const Ingest_Handle *h) { return h->pixfmt; }
int ingest_fps_n(const Ingest_Handle *h) { return h->fps_n; }
int ingest_fps_d(const Ingest_Handle *h) { return h->fps_d; }