Wire reconciler and ingest into video node
Each ingest stream gets two reconciler resources (device, transport) with dependencies: transport waits for device OPEN (needs format for STREAM_OPEN), device waits for transport CONNECTED before starting capture. START_INGEST sets wanted state and triggers a tick; the reconciler drives device CLOSED→OPEN→STREAMING and transport DISCONNECTED→CONNECTED over subsequent ticks. STOP_INGEST reverses both. External events (transport drop, ingest thread error) use reconciler_force_current to push state backward; the periodic 500ms timer thread re-drives toward wanted state automatically. All 8 stream slots are pre-allocated at startup. on_ingest_frame sends VIDEO_FRAME messages over the outbound transport connection, protected by a per-stream conn_mutex. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -91,6 +91,10 @@ void reconciler_set_wanted(struct Rec_Resource *r, int wanted_state) {
|
||||
r->wanted_state = wanted_state;
|
||||
}
|
||||
|
||||
void reconciler_force_current(struct Rec_Resource *r, int state) {
|
||||
r->current_state = state;
|
||||
}
|
||||
|
||||
int reconciler_get_current(const struct Rec_Resource *r) {
|
||||
return r->current_state;
|
||||
}
|
||||
|
||||
@@ -10,8 +10,10 @@ SERIAL_OBJ = $(BUILD)/serial/serial.o
|
||||
TRANSPORT_OBJ = $(BUILD)/transport/transport.o
|
||||
DISCOVERY_OBJ = $(BUILD)/discovery/discovery.o
|
||||
CONFIG_OBJ = $(BUILD)/config/config.o
|
||||
PROTOCOL_OBJ = $(BUILD)/protocol/protocol.o
|
||||
XORG_OBJ = $(BUILD)/xorg/xorg.o
|
||||
PROTOCOL_OBJ = $(BUILD)/protocol/protocol.o
|
||||
RECONCILER_OBJ = $(BUILD)/reconciler/reconciler.o
|
||||
INGEST_OBJ = $(BUILD)/ingest/ingest.o
|
||||
XORG_OBJ = $(BUILD)/xorg/xorg.o
|
||||
|
||||
.PHONY: all clean
|
||||
|
||||
@@ -20,7 +22,7 @@ all: $(NODE_BUILD)/video-node
|
||||
$(NODE_BUILD)/video-node: $(MAIN_OBJ) \
|
||||
$(COMMON_OBJ) $(MEDIA_OBJ) $(V4L2_OBJ) $(SERIAL_OBJ) \
|
||||
$(TRANSPORT_OBJ) $(DISCOVERY_OBJ) $(CONFIG_OBJ) $(PROTOCOL_OBJ) \
|
||||
$(XORG_OBJ)
|
||||
$(RECONCILER_OBJ) $(INGEST_OBJ) $(XORG_OBJ)
|
||||
$(CC) $(CFLAGS) -o $@ $^ -lpthread $(PKG_LDFLAGS)
|
||||
|
||||
$(MAIN_OBJ): main.c | $(NODE_BUILD)
|
||||
@@ -33,8 +35,10 @@ $(SERIAL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/serial
|
||||
$(TRANSPORT_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/transport
|
||||
$(DISCOVERY_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/discovery
|
||||
$(CONFIG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/config
|
||||
$(PROTOCOL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/protocol
|
||||
$(XORG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/xorg
|
||||
$(PROTOCOL_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/protocol
|
||||
$(RECONCILER_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/reconciler
|
||||
$(INGEST_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/ingest
|
||||
$(XORG_OBJ): ; $(MAKE) -C $(ROOT)/src/modules/xorg
|
||||
|
||||
$(NODE_BUILD):
|
||||
mkdir -p $@
|
||||
|
||||
733
src/node/main.c
733
src/node/main.c
@@ -4,6 +4,7 @@
|
||||
#include <dirent.h>
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#include <pthread.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <sys/sysmacros.h>
|
||||
@@ -15,30 +16,42 @@
|
||||
#include "protocol.h"
|
||||
#include "media_ctrl.h"
|
||||
#include "v4l2_ctrl.h"
|
||||
#include "reconciler.h"
|
||||
#include "ingest.h"
|
||||
#include "stream_stats.h"
|
||||
#include "error.h"
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Device enumeration
|
||||
* Stream slot constants
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
#define MAX_STREAMS 8
|
||||
|
||||
#define DEV_CLOSED 0
|
||||
#define DEV_OPEN 1
|
||||
#define DEV_STREAMING 2
|
||||
|
||||
#define CONN_DISCONNECTED 0
|
||||
#define CONN_CONNECTED 1
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Device enumeration (unchanged from original)
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
/* Entity type flag for a V4L2 I/O interface entity */
|
||||
#define MEDIA_ENT_F_IO_V4L 0x10001u
|
||||
|
||||
#define MAX_VIDEO_NODES 32
|
||||
#define MAX_MEDIA_DEVICES 8
|
||||
#define MAX_VNODES_PER_MD 8
|
||||
#define MAX_CONTROLS 256
|
||||
|
||||
/* 5 ("/dev/") + NAME_MAX (255) + 1 (NUL) = 261; round up */
|
||||
#define DEV_PATH_MAX 264
|
||||
#define MAX_VIDEO_NODES 32
|
||||
#define MAX_MEDIA_DEVICES 8
|
||||
#define MAX_VNODES_PER_MD 8
|
||||
#define MAX_CONTROLS 256
|
||||
#define DEV_PATH_MAX 264
|
||||
|
||||
struct VNode {
|
||||
char path[DEV_PATH_MAX];
|
||||
uint32_t dev_major;
|
||||
uint32_t dev_minor;
|
||||
uint32_t device_caps; /* V4L2_CAP_* from VIDIOC_QUERYCAP cap.device_caps */
|
||||
uint32_t device_caps;
|
||||
int claimed;
|
||||
char card[32]; /* VIDIOC_QUERYCAP card name, empty if unavailable */
|
||||
char card[32];
|
||||
};
|
||||
|
||||
struct MNode {
|
||||
@@ -48,7 +61,7 @@ struct MNode {
|
||||
uint32_t entity_flags;
|
||||
uint32_t device_caps;
|
||||
uint8_t is_capture;
|
||||
int vnode_index; /* index into VNode array */
|
||||
int vnode_index;
|
||||
};
|
||||
|
||||
struct MediaDev {
|
||||
@@ -67,7 +80,347 @@ struct Device_List {
|
||||
int media_count;
|
||||
};
|
||||
|
||||
static int scan_video_nodes(struct Device_List *dl) {
|
||||
/* -------------------------------------------------------------------------
|
||||
* Forward declarations
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
struct Node;
|
||||
struct Ingest_Stream;
|
||||
|
||||
static int trans_device_open(void *ud);
|
||||
static int trans_device_close(void *ud);
|
||||
static int trans_device_start(void *ud);
|
||||
static int trans_device_stop(void *ud);
|
||||
static int trans_transport_connect(void *ud);
|
||||
static int trans_transport_disconnect(void *ud);
|
||||
static void on_ingest_frame(const uint8_t *data, uint32_t len,
|
||||
int width, int height, uint32_t pixfmt, void *userdata);
|
||||
static void on_ingest_error(const char *msg, void *userdata);
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Transition tables — shared across all stream slots
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
static const struct Rec_Transition device_transitions[] = {
|
||||
{ DEV_CLOSED, DEV_OPEN, trans_device_open },
|
||||
{ DEV_OPEN, DEV_CLOSED, trans_device_close },
|
||||
{ DEV_OPEN, DEV_STREAMING, trans_device_start },
|
||||
{ DEV_STREAMING, DEV_OPEN, trans_device_stop },
|
||||
{ -1, -1, NULL }
|
||||
};
|
||||
static const char *device_state_names[] = { "CLOSED", "OPEN", "STREAMING" };
|
||||
|
||||
static const struct Rec_Transition transport_transitions[] = {
|
||||
{ CONN_DISCONNECTED, CONN_CONNECTED, trans_transport_connect },
|
||||
{ CONN_CONNECTED, CONN_DISCONNECTED, trans_transport_disconnect },
|
||||
{ -1, -1, NULL }
|
||||
};
|
||||
static const char *transport_state_names[] = { "DISCONNECTED", "CONNECTED" };
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Per-stream state
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
struct Ingest_Stream {
|
||||
int in_use;
|
||||
uint16_t stream_id;
|
||||
|
||||
/* Wanted config — written by START_INGEST handler */
|
||||
uint16_t format;
|
||||
uint16_t width, height;
|
||||
uint16_t fps_n, fps_d;
|
||||
uint16_t transport_mode;
|
||||
char device_path[256];
|
||||
char dest_host[256];
|
||||
uint16_t dest_port;
|
||||
|
||||
/* Reconciler resources */
|
||||
struct Rec_Resource *res_device;
|
||||
struct Rec_Resource *res_transport;
|
||||
|
||||
/* Runtime */
|
||||
struct Ingest_Handle *ingest_handle;
|
||||
pthread_mutex_t conn_mutex;
|
||||
struct Transport_Conn *conn;
|
||||
Stream_Stats stats;
|
||||
|
||||
struct Node *node;
|
||||
};
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Node
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
struct Node {
|
||||
struct Config *config;
|
||||
struct Transport_Server *server;
|
||||
struct Discovery *discovery;
|
||||
struct Device_List devices;
|
||||
|
||||
struct Reconciler *reconciler;
|
||||
pthread_mutex_t reconciler_mutex;
|
||||
|
||||
struct Ingest_Stream streams[MAX_STREAMS];
|
||||
};
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Transition implementations
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
static int trans_device_open(void *ud)
|
||||
{
|
||||
struct Ingest_Stream *s = ud;
|
||||
|
||||
/* Map PROTO_FORMAT_* to V4L2 pixfmt; 0 = auto-select best MJPEG */
|
||||
uint32_t pixfmt = 0;
|
||||
switch (s->format) {
|
||||
case PROTO_FORMAT_MJPEG: pixfmt = V4L2_PIX_FMT_MJPEG; break;
|
||||
default: pixfmt = 0; break;
|
||||
}
|
||||
|
||||
struct Ingest_Config cfg = {
|
||||
.device = s->device_path,
|
||||
.pixfmt = pixfmt,
|
||||
.width = s->width,
|
||||
.height = s->height,
|
||||
.on_frame = on_ingest_frame,
|
||||
.on_error = on_ingest_error,
|
||||
.userdata = s,
|
||||
};
|
||||
|
||||
struct App_Error e = ingest_open(&cfg, &s->ingest_handle);
|
||||
if (!APP_IS_OK(e)) {
|
||||
app_error_print(&e);
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int trans_device_close(void *ud)
|
||||
{
|
||||
struct Ingest_Stream *s = ud;
|
||||
ingest_close(s->ingest_handle);
|
||||
s->ingest_handle = NULL;
|
||||
return 1;
|
||||
}
|
||||
|
||||
static void on_ingest_frame(const uint8_t *data, uint32_t len,
|
||||
int width, int height, uint32_t pixfmt, void *userdata)
|
||||
{
|
||||
struct Ingest_Stream *s = userdata;
|
||||
(void)width; (void)height; (void)pixfmt;
|
||||
|
||||
pthread_mutex_lock(&s->conn_mutex);
|
||||
if (s->conn) {
|
||||
struct App_Error e = proto_write_video_frame(s->conn, s->stream_id, data, len);
|
||||
if (!APP_IS_OK(e)) {
|
||||
/* transport will fire on_disconnect; nothing to do here */
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&s->conn_mutex);
|
||||
|
||||
stream_stats_record_frame(&s->stats, len);
|
||||
if (stream_stats_tick(&s->stats)) {
|
||||
fprintf(stderr, "stream %u %.1f fps %.2f Mbps\n",
|
||||
s->stream_id, s->stats.fps, s->stats.mbps);
|
||||
}
|
||||
}
|
||||
|
||||
static void on_ingest_error(const char *msg, void *userdata)
|
||||
{
|
||||
struct Ingest_Stream *s = userdata;
|
||||
fprintf(stderr, "ingest error [stream %u]: %s\n", s->stream_id, msg);
|
||||
|
||||
/* Capture thread has exited; push device back to OPEN so the
|
||||
* reconciler will restart it on the next tick. */
|
||||
pthread_mutex_lock(&s->node->reconciler_mutex);
|
||||
reconciler_force_current(s->res_device, DEV_OPEN);
|
||||
pthread_mutex_unlock(&s->node->reconciler_mutex);
|
||||
}
|
||||
|
||||
static int trans_device_start(void *ud)
|
||||
{
|
||||
struct Ingest_Stream *s = ud;
|
||||
struct App_Error e = ingest_start(s->ingest_handle);
|
||||
if (!APP_IS_OK(e)) {
|
||||
app_error_print(&e);
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int trans_device_stop(void *ud)
|
||||
{
|
||||
struct Ingest_Stream *s = ud;
|
||||
struct App_Error e = ingest_stop(s->ingest_handle);
|
||||
if (!APP_IS_OK(e)) {
|
||||
app_error_print(&e);
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
static void outbound_on_frame(struct Transport_Conn *conn,
|
||||
struct Transport_Frame *frame, void *userdata)
|
||||
{
|
||||
(void)conn; (void)userdata;
|
||||
/* Discard control responses from the sink */
|
||||
free(frame->payload);
|
||||
}
|
||||
|
||||
static void outbound_on_disconnect(struct Transport_Conn *conn,
|
||||
void *userdata)
|
||||
{
|
||||
struct Ingest_Stream *s = userdata;
|
||||
(void)conn;
|
||||
|
||||
fprintf(stderr, "stream %u: transport disconnected\n", s->stream_id);
|
||||
|
||||
pthread_mutex_lock(&s->conn_mutex);
|
||||
s->conn = NULL;
|
||||
pthread_mutex_unlock(&s->conn_mutex);
|
||||
|
||||
/* Force transport resource back so the reconciler re-drives it */
|
||||
pthread_mutex_lock(&s->node->reconciler_mutex);
|
||||
reconciler_force_current(s->res_transport, CONN_DISCONNECTED);
|
||||
reconciler_tick(s->node->reconciler);
|
||||
pthread_mutex_unlock(&s->node->reconciler_mutex);
|
||||
}
|
||||
|
||||
static int trans_transport_connect(void *ud)
|
||||
{
|
||||
struct Ingest_Stream *s = ud;
|
||||
|
||||
struct Transport_Conn *conn = NULL;
|
||||
struct App_Error e = transport_connect(&conn,
|
||||
s->dest_host, s->dest_port,
|
||||
TRANSPORT_DEFAULT_MAX_PAYLOAD,
|
||||
outbound_on_frame, outbound_on_disconnect, s);
|
||||
if (!APP_IS_OK(e)) {
|
||||
app_error_print(&e);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Send STREAM_OPEN so the sink knows what's coming */
|
||||
uint16_t proto_fmt = s->format ? s->format : PROTO_FORMAT_MJPEG;
|
||||
e = proto_write_stream_open(conn, 1 /* request_id */, s->stream_id,
|
||||
proto_fmt, 0 /* pixel_format: compressed */, PROTO_ORIGIN_DEVICE_NATIVE);
|
||||
if (!APP_IS_OK(e)) {
|
||||
app_error_print(&e);
|
||||
transport_conn_close(conn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&s->conn_mutex);
|
||||
s->conn = conn;
|
||||
pthread_mutex_unlock(&s->conn_mutex);
|
||||
|
||||
stream_stats_init(&s->stats, s->stream_id);
|
||||
return 1;
|
||||
}
|
||||
|
||||
static int trans_transport_disconnect(void *ud)
|
||||
{
|
||||
struct Ingest_Stream *s = ud;
|
||||
|
||||
pthread_mutex_lock(&s->conn_mutex);
|
||||
struct Transport_Conn *conn = s->conn;
|
||||
s->conn = NULL;
|
||||
pthread_mutex_unlock(&s->conn_mutex);
|
||||
|
||||
if (conn) {
|
||||
transport_conn_close(conn);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Stream slot management
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
static void stream_slot_init(struct Node *node, int idx)
|
||||
{
|
||||
struct Ingest_Stream *s = &node->streams[idx];
|
||||
memset(s, 0, sizeof(*s));
|
||||
pthread_mutex_init(&s->conn_mutex, NULL);
|
||||
s->node = node;
|
||||
|
||||
char dev_name[32], conn_name[32];
|
||||
snprintf(dev_name, sizeof(dev_name), "device_%d", idx);
|
||||
snprintf(conn_name, sizeof(conn_name), "transport_%d", idx);
|
||||
|
||||
s->res_device = reconciler_add_resource(node->reconciler,
|
||||
dev_name, device_transitions, 3, device_state_names,
|
||||
DEV_CLOSED, s);
|
||||
s->res_transport = reconciler_add_resource(node->reconciler,
|
||||
conn_name, transport_transitions, 2, transport_state_names,
|
||||
CONN_DISCONNECTED, s);
|
||||
|
||||
/* Transport needs device open (to know the format for STREAM_OPEN) */
|
||||
reconciler_add_dep(s->res_transport, CONN_CONNECTED, s->res_device, DEV_OPEN);
|
||||
|
||||
/* Don't start capture until transport is connected */
|
||||
reconciler_add_dep(s->res_device, DEV_STREAMING, s->res_transport, CONN_CONNECTED);
|
||||
}
|
||||
|
||||
/* Find slot by stream_id, or find a free slot if stream_id is 0 */
|
||||
static struct Ingest_Stream *stream_find(struct Node *node, uint16_t stream_id)
|
||||
{
|
||||
for (int i = 0; i < MAX_STREAMS; i++) {
|
||||
if (node->streams[i].in_use && node->streams[i].stream_id == stream_id) {
|
||||
return &node->streams[i];
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static struct Ingest_Stream *stream_alloc(struct Node *node)
|
||||
{
|
||||
for (int i = 0; i < MAX_STREAMS; i++) {
|
||||
if (!node->streams[i].in_use) {
|
||||
return &node->streams[i];
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Reconciler log
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
static void on_reconciler_log(const struct Rec_Resource *res,
|
||||
int from, int to, int success, void *userdata)
|
||||
{
|
||||
(void)userdata;
|
||||
fprintf(stderr, "reconciler: [%s] %s -> %s ... %s\n",
|
||||
reconciler_get_name(res),
|
||||
reconciler_state_name(res, from),
|
||||
reconciler_state_name(res, to),
|
||||
success ? "ok" : "FAILED");
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Periodic reconciler tick thread
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
static void *reconciler_timer(void *arg)
|
||||
{
|
||||
struct Node *node = arg;
|
||||
while (1) {
|
||||
usleep(500000); /* 500 ms */
|
||||
pthread_mutex_lock(&node->reconciler_mutex);
|
||||
reconciler_tick(node->reconciler);
|
||||
pthread_mutex_unlock(&node->reconciler_mutex);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Device enumeration (unchanged)
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
static int scan_video_nodes(struct Device_List *dl)
|
||||
{
|
||||
DIR *d = opendir("/dev");
|
||||
if (!d) { return -1; }
|
||||
|
||||
@@ -91,32 +444,27 @@ static int scan_video_nodes(struct Device_List *dl) {
|
||||
v->claimed = 0;
|
||||
v->card[0] = '\0';
|
||||
|
||||
/* Try to get card name */
|
||||
int fd = open(v->path, O_RDONLY | O_NONBLOCK);
|
||||
if (fd >= 0) {
|
||||
struct v4l2_capability cap;
|
||||
if (ioctl(fd, VIDIOC_QUERYCAP, &cap) == 0) {
|
||||
strncpy(v->card, (const char *)cap.card, sizeof(v->card) - 1);
|
||||
v->card[sizeof(v->card) - 1] = '\0';
|
||||
/* use per-node caps when available, fall back to physical caps */
|
||||
v->device_caps = (cap.capabilities & V4L2_CAP_DEVICE_CAPS)
|
||||
? cap.device_caps : cap.capabilities;
|
||||
}
|
||||
close(fd);
|
||||
}
|
||||
|
||||
dl->vnode_count++;
|
||||
}
|
||||
closedir(d);
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct Entity_Cb_State {
|
||||
struct Device_List *dl;
|
||||
struct MediaDev *md;
|
||||
};
|
||||
struct Entity_Cb_State { struct Device_List *dl; struct MediaDev *md; };
|
||||
|
||||
static void entity_callback(const struct Media_Entity *entity, void *userdata) {
|
||||
static void entity_callback(const struct Media_Entity *entity, void *userdata)
|
||||
{
|
||||
struct Entity_Cb_State *state = userdata;
|
||||
struct Device_List *dl = state->dl;
|
||||
struct MediaDev *md = state->md;
|
||||
@@ -124,11 +472,9 @@ static void entity_callback(const struct Media_Entity *entity, void *userdata) {
|
||||
if (entity->dev_major == 0) { return; }
|
||||
if (md->vnode_count >= MAX_VNODES_PER_MD) { return; }
|
||||
|
||||
/* Find matching video node by device number */
|
||||
for (int i = 0; i < dl->vnode_count; i++) {
|
||||
if (dl->vnodes[i].dev_major != entity->dev_major) { continue; }
|
||||
if (dl->vnodes[i].dev_minor != entity->dev_minor) { continue; }
|
||||
|
||||
dl->vnodes[i].claimed = 1;
|
||||
|
||||
struct MNode *mn = &md->vnodes[md->vnode_count++];
|
||||
@@ -145,7 +491,8 @@ static void entity_callback(const struct Media_Entity *entity, void *userdata) {
|
||||
}
|
||||
}
|
||||
|
||||
static void scan_media_devices(struct Device_List *dl) {
|
||||
static void scan_media_devices(struct Device_List *dl)
|
||||
{
|
||||
DIR *d = opendir("/dev");
|
||||
if (!d) { return; }
|
||||
|
||||
@@ -179,23 +526,23 @@ static void scan_media_devices(struct Device_List *dl) {
|
||||
struct Entity_Cb_State state = { .dl = dl, .md = md };
|
||||
media_ctrl_enum_entities(ctrl, entity_callback, &state);
|
||||
media_ctrl_close(ctrl);
|
||||
|
||||
dl->media_count++;
|
||||
}
|
||||
closedir(d);
|
||||
}
|
||||
|
||||
static void build_device_list(struct Device_List *dl) {
|
||||
static void build_device_list(struct Device_List *dl)
|
||||
{
|
||||
memset(dl, 0, sizeof(*dl));
|
||||
scan_video_nodes(dl);
|
||||
scan_media_devices(dl);
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Control enumeration helpers
|
||||
* Control enumeration helpers (unchanged)
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
#define MAX_MENU_POOL 128 /* total menu items across all controls */
|
||||
#define MAX_MENU_POOL 128
|
||||
|
||||
struct Ctrl_Build {
|
||||
struct Proto_Control_Info items[MAX_CONTROLS];
|
||||
@@ -250,15 +597,23 @@ static void ctrl_enum_cb(
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Node state
|
||||
* Device path resolver (shared by enum/get/set control handlers)
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
struct Node {
|
||||
struct Config *config;
|
||||
struct Transport_Server *server;
|
||||
struct Discovery *discovery;
|
||||
struct Device_List devices;
|
||||
};
|
||||
static const char *resolve_device_path(struct Node *node, int idx)
|
||||
{
|
||||
for (int i = 0; i < node->devices.media_count; i++) {
|
||||
struct MediaDev *md = &node->devices.media[i];
|
||||
if (idx < md->vnode_count) { return md->vnodes[idx].path; }
|
||||
idx -= md->vnode_count;
|
||||
}
|
||||
for (int i = 0; i < node->devices.vnode_count; i++) {
|
||||
if (node->devices.vnodes[i].claimed) { continue; }
|
||||
if (idx == 0) { return node->devices.vnodes[i].path; }
|
||||
idx--;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Request handlers
|
||||
@@ -267,7 +622,6 @@ struct Node {
|
||||
static void handle_enum_devices(struct Node *node,
|
||||
struct Transport_Conn *conn, uint16_t request_id)
|
||||
{
|
||||
/* Build Proto_Media_Device_Info array */
|
||||
struct Proto_Video_Node_Info vnodes[MAX_MEDIA_DEVICES * MAX_VNODES_PER_MD];
|
||||
struct Proto_Media_Device_Info mdevs[MAX_MEDIA_DEVICES];
|
||||
int vnode_offset = 0;
|
||||
@@ -294,7 +648,6 @@ static void handle_enum_devices(struct Node *node,
|
||||
vnode_offset += md->vnode_count;
|
||||
}
|
||||
|
||||
/* Build standalone list */
|
||||
struct Proto_Standalone_Device_Info standalone[MAX_VIDEO_NODES];
|
||||
int standalone_count = 0;
|
||||
for (int i = 0; i < node->devices.vnode_count; i++) {
|
||||
@@ -318,52 +671,25 @@ static void handle_enum_controls(struct Node *node,
|
||||
struct Proto_Enum_Controls_Req req;
|
||||
struct App_Error e = proto_read_enum_controls_req(payload, length, &req);
|
||||
if (!APP_IS_OK(e)) {
|
||||
proto_write_control_response(conn, 0,
|
||||
PROTO_STATUS_INVALID_PARAMS, NULL, 0);
|
||||
proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Resolve device_index to a path across media-owned + standalone nodes */
|
||||
const char *path = NULL;
|
||||
int idx = (int)req.device_index;
|
||||
for (int i = 0; i < node->devices.media_count && path == NULL; i++) {
|
||||
struct MediaDev *md = &node->devices.media[i];
|
||||
if (idx < md->vnode_count) {
|
||||
path = md->vnodes[idx].path;
|
||||
} else {
|
||||
idx -= md->vnode_count;
|
||||
}
|
||||
}
|
||||
if (path == NULL) {
|
||||
/* Check standalone */
|
||||
for (int i = 0; i < node->devices.vnode_count; i++) {
|
||||
if (node->devices.vnodes[i].claimed) { continue; }
|
||||
if (idx == 0) { path = node->devices.vnodes[i].path; break; }
|
||||
idx--;
|
||||
}
|
||||
}
|
||||
|
||||
if (path == NULL) {
|
||||
proto_write_control_response(conn, req.request_id,
|
||||
PROTO_STATUS_NOT_FOUND, NULL, 0);
|
||||
const char *path = resolve_device_path(node, (int)req.device_index);
|
||||
if (!path) {
|
||||
proto_write_control_response(conn, req.request_id, PROTO_STATUS_NOT_FOUND, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
struct V4l2_Ctrl_Handle *handle;
|
||||
e = v4l2_ctrl_open(path, &handle);
|
||||
if (!APP_IS_OK(e)) {
|
||||
proto_write_control_response(conn, req.request_id,
|
||||
PROTO_STATUS_ERROR, NULL, 0);
|
||||
proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
struct Ctrl_Build build = { .count = 0 };
|
||||
v4l2_ctrl_enumerate(handle, ctrl_enum_cb, &build);
|
||||
v4l2_ctrl_close(handle);
|
||||
|
||||
e = proto_write_enum_controls_response(conn,
|
||||
req.request_id, PROTO_STATUS_OK,
|
||||
build.items, (uint16_t)build.count);
|
||||
req.request_id, PROTO_STATUS_OK, build.items, (uint16_t)build.count);
|
||||
if (!APP_IS_OK(e)) { app_error_print(&e); }
|
||||
}
|
||||
|
||||
@@ -374,56 +700,28 @@ static void handle_get_control(struct Node *node,
|
||||
struct Proto_Get_Control_Req req;
|
||||
struct App_Error e = proto_read_get_control_req(payload, length, &req);
|
||||
if (!APP_IS_OK(e)) {
|
||||
proto_write_control_response(conn, 0,
|
||||
PROTO_STATUS_INVALID_PARAMS, NULL, 0);
|
||||
proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Same device resolution as enum_controls */
|
||||
const char *path = NULL;
|
||||
int idx = (int)req.device_index;
|
||||
for (int i = 0; i < node->devices.media_count && path == NULL; i++) {
|
||||
struct MediaDev *md = &node->devices.media[i];
|
||||
if (idx < md->vnode_count) {
|
||||
path = md->vnodes[idx].path;
|
||||
} else {
|
||||
idx -= md->vnode_count;
|
||||
}
|
||||
}
|
||||
if (path == NULL) {
|
||||
for (int i = 0; i < node->devices.vnode_count; i++) {
|
||||
if (node->devices.vnodes[i].claimed) { continue; }
|
||||
if (idx == 0) { path = node->devices.vnodes[i].path; break; }
|
||||
idx--;
|
||||
}
|
||||
}
|
||||
|
||||
if (path == NULL) {
|
||||
proto_write_control_response(conn, req.request_id,
|
||||
PROTO_STATUS_NOT_FOUND, NULL, 0);
|
||||
const char *path = resolve_device_path(node, (int)req.device_index);
|
||||
if (!path) {
|
||||
proto_write_control_response(conn, req.request_id, PROTO_STATUS_NOT_FOUND, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
struct V4l2_Ctrl_Handle *handle;
|
||||
e = v4l2_ctrl_open(path, &handle);
|
||||
if (!APP_IS_OK(e)) {
|
||||
proto_write_control_response(conn, req.request_id,
|
||||
PROTO_STATUS_ERROR, NULL, 0);
|
||||
proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t value;
|
||||
e = v4l2_ctrl_get(handle, req.control_id, &value);
|
||||
v4l2_ctrl_close(handle);
|
||||
|
||||
if (!APP_IS_OK(e)) {
|
||||
proto_write_control_response(conn, req.request_id,
|
||||
PROTO_STATUS_ERROR, NULL, 0);
|
||||
proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
e = proto_write_get_control_response(conn,
|
||||
req.request_id, PROTO_STATUS_OK, value);
|
||||
e = proto_write_get_control_response(conn, req.request_id, PROTO_STATUS_OK, value);
|
||||
if (!APP_IS_OK(e)) { app_error_print(&e); }
|
||||
}
|
||||
|
||||
@@ -434,52 +732,114 @@ static void handle_set_control(struct Node *node,
|
||||
struct Proto_Set_Control_Req req;
|
||||
struct App_Error e = proto_read_set_control_req(payload, length, &req);
|
||||
if (!APP_IS_OK(e)) {
|
||||
proto_write_control_response(conn, 0,
|
||||
PROTO_STATUS_INVALID_PARAMS, NULL, 0);
|
||||
proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0);
|
||||
return;
|
||||
}
|
||||
const char *path = resolve_device_path(node, (int)req.device_index);
|
||||
if (!path) {
|
||||
proto_write_control_response(conn, req.request_id, PROTO_STATUS_NOT_FOUND, NULL, 0);
|
||||
return;
|
||||
}
|
||||
struct V4l2_Ctrl_Handle *handle;
|
||||
e = v4l2_ctrl_open(path, &handle);
|
||||
if (!APP_IS_OK(e)) {
|
||||
proto_write_control_response(conn, req.request_id, PROTO_STATUS_ERROR, NULL, 0);
|
||||
return;
|
||||
}
|
||||
e = v4l2_ctrl_set(handle, req.control_id, req.value);
|
||||
v4l2_ctrl_close(handle);
|
||||
uint16_t status = APP_IS_OK(e) ? PROTO_STATUS_OK : PROTO_STATUS_ERROR;
|
||||
proto_write_control_response(conn, req.request_id, status, NULL, 0);
|
||||
}
|
||||
|
||||
static void handle_start_ingest(struct Node *node,
|
||||
struct Transport_Conn *conn,
|
||||
const uint8_t *payload, uint32_t length)
|
||||
{
|
||||
struct Proto_Start_Ingest req;
|
||||
struct App_Error e = proto_read_start_ingest(payload, length, &req);
|
||||
if (!APP_IS_OK(e)) {
|
||||
proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
const char *path = NULL;
|
||||
int idx = (int)req.device_index;
|
||||
for (int i = 0; i < node->devices.media_count && path == NULL; i++) {
|
||||
struct MediaDev *md = &node->devices.media[i];
|
||||
if (idx < md->vnode_count) {
|
||||
path = md->vnodes[idx].path;
|
||||
} else {
|
||||
idx -= md->vnode_count;
|
||||
}
|
||||
}
|
||||
if (path == NULL) {
|
||||
for (int i = 0; i < node->devices.vnode_count; i++) {
|
||||
if (node->devices.vnodes[i].claimed) { continue; }
|
||||
if (idx == 0) { path = node->devices.vnodes[i].path; break; }
|
||||
idx--;
|
||||
pthread_mutex_lock(&node->reconciler_mutex);
|
||||
|
||||
/* Reuse existing slot for this stream_id, or allocate a fresh one */
|
||||
struct Ingest_Stream *s = stream_find(node, req.stream_id);
|
||||
if (!s) {
|
||||
s = stream_alloc(node);
|
||||
if (!s) {
|
||||
pthread_mutex_unlock(&node->reconciler_mutex);
|
||||
proto_write_control_response(conn, req.request_id,
|
||||
PROTO_STATUS_ERROR, NULL, 0);
|
||||
fprintf(stderr, "START_INGEST: no free stream slots\n");
|
||||
return;
|
||||
}
|
||||
s->in_use = 1;
|
||||
s->stream_id = req.stream_id;
|
||||
}
|
||||
|
||||
if (path == NULL) {
|
||||
/* Copy wanted config (NUL-terminate the wire strings) */
|
||||
s->format = req.format;
|
||||
s->width = req.width;
|
||||
s->height = req.height;
|
||||
s->fps_n = req.fps_n;
|
||||
s->fps_d = req.fps_d;
|
||||
s->transport_mode = req.transport_mode;
|
||||
s->dest_port = req.dest_port;
|
||||
|
||||
size_t dp_len = req.device_path_len < sizeof(s->device_path) - 1
|
||||
? req.device_path_len : sizeof(s->device_path) - 1;
|
||||
memcpy(s->device_path, req.device_path, dp_len);
|
||||
s->device_path[dp_len] = '\0';
|
||||
|
||||
size_t dh_len = req.dest_host_len < sizeof(s->dest_host) - 1
|
||||
? req.dest_host_len : sizeof(s->dest_host) - 1;
|
||||
memcpy(s->dest_host, req.dest_host, dh_len);
|
||||
s->dest_host[dh_len] = '\0';
|
||||
|
||||
reconciler_set_wanted(s->res_device, DEV_STREAMING);
|
||||
reconciler_set_wanted(s->res_transport, CONN_CONNECTED);
|
||||
reconciler_tick(node->reconciler);
|
||||
|
||||
pthread_mutex_unlock(&node->reconciler_mutex);
|
||||
|
||||
proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0);
|
||||
}
|
||||
|
||||
static void handle_stop_ingest(struct Node *node,
|
||||
struct Transport_Conn *conn,
|
||||
const uint8_t *payload, uint32_t length)
|
||||
{
|
||||
struct Proto_Stop_Ingest req;
|
||||
struct App_Error e = proto_read_stop_ingest(payload, length, &req);
|
||||
if (!APP_IS_OK(e)) {
|
||||
proto_write_control_response(conn, 0, PROTO_STATUS_INVALID_PARAMS, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&node->reconciler_mutex);
|
||||
|
||||
struct Ingest_Stream *s = stream_find(node, req.stream_id);
|
||||
if (!s) {
|
||||
pthread_mutex_unlock(&node->reconciler_mutex);
|
||||
proto_write_control_response(conn, req.request_id,
|
||||
PROTO_STATUS_NOT_FOUND, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
struct V4l2_Ctrl_Handle *handle;
|
||||
e = v4l2_ctrl_open(path, &handle);
|
||||
if (!APP_IS_OK(e)) {
|
||||
proto_write_control_response(conn, req.request_id,
|
||||
PROTO_STATUS_ERROR, NULL, 0);
|
||||
return;
|
||||
}
|
||||
reconciler_set_wanted(s->res_device, DEV_CLOSED);
|
||||
reconciler_set_wanted(s->res_transport, CONN_DISCONNECTED);
|
||||
reconciler_tick(node->reconciler);
|
||||
|
||||
e = v4l2_ctrl_set(handle, req.control_id, req.value);
|
||||
v4l2_ctrl_close(handle);
|
||||
pthread_mutex_unlock(&node->reconciler_mutex);
|
||||
|
||||
uint16_t status = APP_IS_OK(e) ? PROTO_STATUS_OK : PROTO_STATUS_ERROR;
|
||||
proto_write_control_response(conn, req.request_id, status, NULL, 0);
|
||||
proto_write_control_response(conn, req.request_id, PROTO_STATUS_OK, NULL, 0);
|
||||
}
|
||||
|
||||
/* -------------------------------------------------------------------------
|
||||
* Transport callbacks
|
||||
* Transport callbacks (inbound server)
|
||||
* ------------------------------------------------------------------------- */
|
||||
|
||||
static void on_frame(struct Transport_Conn *conn,
|
||||
@@ -491,10 +851,7 @@ static void on_frame(struct Transport_Conn *conn,
|
||||
struct Proto_Request_Header hdr;
|
||||
struct App_Error e = proto_read_request_header(
|
||||
frame->payload, frame->payload_length, &hdr);
|
||||
if (!APP_IS_OK(e)) {
|
||||
free(frame->payload);
|
||||
return;
|
||||
}
|
||||
if (!APP_IS_OK(e)) { free(frame->payload); return; }
|
||||
|
||||
switch (hdr.command) {
|
||||
case PROTO_CMD_ENUM_DEVICES:
|
||||
@@ -512,6 +869,19 @@ static void on_frame(struct Transport_Conn *conn,
|
||||
handle_set_control(node, conn,
|
||||
frame->payload, frame->payload_length);
|
||||
break;
|
||||
case PROTO_CMD_START_INGEST:
|
||||
handle_start_ingest(node, conn,
|
||||
frame->payload, frame->payload_length);
|
||||
break;
|
||||
case PROTO_CMD_STOP_INGEST:
|
||||
handle_stop_ingest(node, conn,
|
||||
frame->payload, frame->payload_length);
|
||||
break;
|
||||
case PROTO_CMD_STREAM_OPEN:
|
||||
/* A sender is opening a stream on this node (sink role) */
|
||||
proto_write_control_response(conn, hdr.request_id,
|
||||
PROTO_STATUS_OK, NULL, 0);
|
||||
break;
|
||||
default:
|
||||
proto_write_control_response(conn, hdr.request_id,
|
||||
PROTO_STATUS_UNKNOWN_CMD, NULL, 0);
|
||||
@@ -522,12 +892,14 @@ static void on_frame(struct Transport_Conn *conn,
|
||||
free(frame->payload);
|
||||
}
|
||||
|
||||
static void on_connect(struct Transport_Conn *conn, void *userdata) {
|
||||
static void on_connect(struct Transport_Conn *conn, void *userdata)
|
||||
{
|
||||
(void)conn; (void)userdata;
|
||||
printf("peer connected\n");
|
||||
}
|
||||
|
||||
static void on_disconnect(struct Transport_Conn *conn, void *userdata) {
|
||||
static void on_disconnect(struct Transport_Conn *conn, void *userdata)
|
||||
{
|
||||
(void)conn; (void)userdata;
|
||||
printf("peer disconnected\n");
|
||||
}
|
||||
@@ -545,14 +917,14 @@ static const struct Config_Flag_Def function_flag_defs[] = {
|
||||
};
|
||||
|
||||
static const struct Config_Def schema[] = {
|
||||
{ "node", "name", CONFIG_STRING, "unnamed:0", NULL },
|
||||
{ "node", "site_id", CONFIG_UINT16, "0", NULL },
|
||||
{ "node", "tcp_port", CONFIG_UINT16, "8000", NULL },
|
||||
{ "node", "name", CONFIG_STRING, "unnamed:0", NULL },
|
||||
{ "node", "site_id", CONFIG_UINT16, "0", NULL },
|
||||
{ "node", "tcp_port", CONFIG_UINT16, "8000", NULL },
|
||||
{ "node", "function", CONFIG_FLAGS, "source", function_flag_defs },
|
||||
{ "discovery", "interval_ms", CONFIG_UINT32, "5000", NULL },
|
||||
{ "discovery", "timeout_intervals", CONFIG_UINT32, "3", NULL },
|
||||
{ "transport", "max_connections", CONFIG_UINT32, "16", NULL },
|
||||
{ "transport", "max_payload", CONFIG_UINT32, "16777216", NULL },
|
||||
{ "discovery", "interval_ms", CONFIG_UINT32, "5000", NULL },
|
||||
{ "discovery", "timeout_intervals", CONFIG_UINT32, "3", NULL },
|
||||
{ "transport", "max_connections", CONFIG_UINT32, "16", NULL },
|
||||
{ "transport", "max_payload", CONFIG_UINT32, "16777216", NULL },
|
||||
{ NULL }
|
||||
};
|
||||
|
||||
@@ -566,7 +938,8 @@ static void usage(void) {
|
||||
" video-node --defaults\n");
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
if (argc < 2) { usage(); return 1; }
|
||||
|
||||
struct Node node;
|
||||
@@ -581,14 +954,14 @@ int main(int argc, char **argv) {
|
||||
}
|
||||
if (!APP_IS_OK(e)) { app_error_print(&e); return 1; }
|
||||
|
||||
uint16_t tcp_port = config_get_u16(node.config, "node", "tcp_port");
|
||||
uint16_t site_id = config_get_u16(node.config, "node", "site_id");
|
||||
uint32_t func = config_get_flags(node.config, "node", "function");
|
||||
const char *name = config_get_str(node.config, "node", "name");
|
||||
uint32_t interval = config_get_u32(node.config, "discovery", "interval_ms");
|
||||
uint32_t timeout_i = config_get_u32(node.config, "discovery", "timeout_intervals");
|
||||
uint32_t max_conn = config_get_u32(node.config, "transport", "max_connections");
|
||||
uint32_t max_pay = config_get_u32(node.config, "transport", "max_payload");
|
||||
uint16_t tcp_port = config_get_u16(node.config, "node", "tcp_port");
|
||||
uint16_t site_id = config_get_u16(node.config, "node", "site_id");
|
||||
uint32_t func = config_get_flags(node.config, "node", "function");
|
||||
const char *name = config_get_str(node.config, "node", "name");
|
||||
uint32_t interval = config_get_u32(node.config, "discovery", "interval_ms");
|
||||
uint32_t timeout_i = config_get_u32(node.config, "discovery", "timeout_intervals");
|
||||
uint32_t max_conn = config_get_u32(node.config, "transport", "max_connections");
|
||||
uint32_t max_pay = config_get_u32(node.config, "transport", "max_payload");
|
||||
|
||||
printf("node: %s port=%u site=%u\n", name, tcp_port, site_id);
|
||||
|
||||
@@ -598,7 +971,21 @@ int main(int argc, char **argv) {
|
||||
printf("found %d media device(s), %d video node(s)\n",
|
||||
node.devices.media_count, node.devices.vnode_count);
|
||||
|
||||
/* Start transport server */
|
||||
/* Reconciler */
|
||||
node.reconciler = reconciler_create();
|
||||
if (!node.reconciler) {
|
||||
fprintf(stderr, "failed to create reconciler\n");
|
||||
return 1;
|
||||
}
|
||||
reconciler_set_log(node.reconciler, on_reconciler_log, NULL);
|
||||
pthread_mutex_init(&node.reconciler_mutex, NULL);
|
||||
|
||||
/* Pre-allocate stream slots */
|
||||
for (int i = 0; i < MAX_STREAMS; i++) {
|
||||
stream_slot_init(&node, i);
|
||||
}
|
||||
|
||||
/* Transport server */
|
||||
struct Transport_Server_Config srv_cfg = {
|
||||
.port = tcp_port,
|
||||
.max_connections = (int)max_conn,
|
||||
@@ -608,32 +995,32 @@ int main(int argc, char **argv) {
|
||||
.on_disconnect = on_disconnect,
|
||||
.userdata = &node,
|
||||
};
|
||||
|
||||
e = transport_server_create(&node.server, &srv_cfg);
|
||||
if (!APP_IS_OK(e)) { app_error_print(&e); return 1; }
|
||||
|
||||
e = transport_server_start(node.server);
|
||||
if (!APP_IS_OK(e)) { app_error_print(&e); return 1; }
|
||||
|
||||
/* Start discovery */
|
||||
/* Discovery */
|
||||
struct Discovery_Config disc_cfg = {
|
||||
.site_id = site_id,
|
||||
.tcp_port = tcp_port,
|
||||
.function_flags = (uint16_t)func,
|
||||
.name = name,
|
||||
.interval_ms = interval,
|
||||
.timeout_intervals= timeout_i,
|
||||
.on_peer_found = NULL,
|
||||
.on_peer_lost = NULL,
|
||||
.site_id = site_id,
|
||||
.tcp_port = tcp_port,
|
||||
.function_flags = (uint16_t)func,
|
||||
.name = name,
|
||||
.interval_ms = interval,
|
||||
.timeout_intervals = timeout_i,
|
||||
.on_peer_found = NULL,
|
||||
.on_peer_lost = NULL,
|
||||
};
|
||||
|
||||
e = discovery_create(&node.discovery, &disc_cfg);
|
||||
if (!APP_IS_OK(e)) { app_error_print(&e); return 1; }
|
||||
|
||||
e = discovery_start(node.discovery);
|
||||
if (!APP_IS_OK(e)) { app_error_print(&e); return 1; }
|
||||
|
||||
/* Periodic reconciler tick */
|
||||
pthread_t timer_thread;
|
||||
pthread_create(&timer_thread, NULL, reconciler_timer, &node);
|
||||
|
||||
printf("ready\n");
|
||||
pause();
|
||||
pthread_join(timer_thread, NULL);
|
||||
return 0;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user