- Protocol module: framed binary encoding for control requests/responses (ENUM_DEVICES, ENUM_CONTROLS, GET/SET_CONTROL, STREAM_OPEN/CLOSE) - video-node: scans /dev/media* and /dev/video*, serves V4L2 device topology and controls over TCP; uses UDP discovery for peer announce - query_cli: auto-discovers a node, queries devices and controls - protocol_cli: low-level protocol frame decoder for debugging - dev/web: Express 5 ESM web inspector — live SSE discovery picker, REST bridge to video-node, controls UI with sliders/selects/checkboxes - Makefile: sequential module builds before cli/node to fix make -j races - common.mk: add DEPFLAGS (-MMD -MP) for automatic header dependencies - All module Makefiles: split compile/link, generate .d dependency files - discovery: replace 100ms poll loop with pthread_cond_timedwait; respond to all announcements (not just new peers) for instant re-discovery - ENUM_DEVICES response: carry device_caps (V4L2_CAP_*) per video node so clients can distinguish capture nodes from metadata nodes Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
350 lines
10 KiB
C
350 lines
10 KiB
C
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <unistd.h>
|
|
#include <stdatomic.h>
|
|
#include <pthread.h>
|
|
#include <time.h>
|
|
#include <sys/socket.h>
|
|
#include <netinet/in.h>
|
|
#include <arpa/inet.h>
|
|
#include <errno.h>
|
|
|
|
#include "discovery.h"
|
|
#include "serial.h"
|
|
#include "transport.h" /* TRANSPORT_FRAME_HEADER_SIZE */
|
|
|
|
#define DEFAULT_INTERVAL_MS 5000u
|
|
#define DEFAULT_TIMEOUT_INTERVALS 3u
|
|
#define RECV_BUF_SIZE 512u
|
|
|
|
/* announcement payload layout (offsets relative to payload start) */
|
|
#define ANN_PROTOCOL_VERSION 0 /* u8 */
|
|
#define ANN_SITE_ID 1 /* u16 */
|
|
#define ANN_TCP_PORT 3 /* u16 */
|
|
#define ANN_FUNCTION_FLAGS 5 /* u16 */
|
|
#define ANN_NAME_LEN 7 /* u8 */
|
|
#define ANN_NAME 8 /* bytes */
|
|
#define ANN_FIXED_SIZE 8u
|
|
|
|
struct Peer_Entry {
|
|
struct Discovery_Peer info;
|
|
uint64_t last_seen_ms;
|
|
int active;
|
|
};
|
|
|
|
struct Discovery {
|
|
int sock;
|
|
struct sockaddr_in mcast_addr;
|
|
struct Discovery_Config config;
|
|
|
|
pthread_t announce_thread;
|
|
pthread_t receive_thread;
|
|
atomic_int running;
|
|
|
|
pthread_mutex_t announce_mutex;
|
|
pthread_cond_t announce_cond; /* signaled to wake announce thread early */
|
|
|
|
pthread_mutex_t peers_mutex;
|
|
struct Peer_Entry peers[DISCOVERY_MAX_PEERS];
|
|
};
|
|
|
|
/* -- helpers --------------------------------------------------------------- */
|
|
|
|
static uint64_t now_ms(void) {
|
|
struct timespec ts;
|
|
clock_gettime(CLOCK_MONOTONIC, &ts);
|
|
return (uint64_t)ts.tv_sec * 1000u + (uint64_t)ts.tv_nsec / 1000000u;
|
|
}
|
|
|
|
static int find_peer(struct Discovery *d, uint32_t addr, const char *name) {
|
|
for (int i = 0; i < DISCOVERY_MAX_PEERS; i++) {
|
|
if (d->peers[i].active
|
|
&& d->peers[i].info.addr == addr
|
|
&& strcmp(d->peers[i].info.name, name) == 0) {
|
|
return i;
|
|
}
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
static int find_slot(struct Discovery *d) {
|
|
for (int i = 0; i < DISCOVERY_MAX_PEERS; i++) {
|
|
if (!d->peers[i].active) { return i; }
|
|
}
|
|
return -1;
|
|
}
|
|
|
|
/* -- send ------------------------------------------------------------------ */
|
|
|
|
static void send_announcement(struct Discovery *d) {
|
|
size_t name_len = strlen(d->config.name);
|
|
if (name_len > DISCOVERY_MAX_NAME_LEN) { name_len = DISCOVERY_MAX_NAME_LEN; }
|
|
|
|
uint32_t payload_len = (uint32_t)(ANN_FIXED_SIZE + name_len);
|
|
size_t total = TRANSPORT_FRAME_HEADER_SIZE + payload_len;
|
|
uint8_t buf[TRANSPORT_FRAME_HEADER_SIZE + ANN_FIXED_SIZE + DISCOVERY_MAX_NAME_LEN];
|
|
|
|
/* frame header */
|
|
put_u16(buf, 0, 0x0010); /* message_type: DISCOVERY_ANNOUNCE */
|
|
put_u32(buf, 2, payload_len);
|
|
|
|
/* announcement payload */
|
|
uint8_t *p = buf + TRANSPORT_FRAME_HEADER_SIZE;
|
|
put_u8 (p, ANN_PROTOCOL_VERSION, DISCOVERY_PROTOCOL_VERSION);
|
|
put_u16(p, ANN_SITE_ID, d->config.site_id);
|
|
put_u16(p, ANN_TCP_PORT, d->config.tcp_port);
|
|
put_u16(p, ANN_FUNCTION_FLAGS, d->config.function_flags);
|
|
put_u8 (p, ANN_NAME_LEN, (uint8_t)name_len);
|
|
memcpy(p + ANN_NAME, d->config.name, name_len);
|
|
|
|
sendto(d->sock, buf, total, 0,
|
|
(struct sockaddr *)&d->mcast_addr, sizeof(d->mcast_addr));
|
|
}
|
|
|
|
/* -- timeout check --------------------------------------------------------- */
|
|
|
|
static void check_timeouts(struct Discovery *d) {
|
|
uint64_t ts = now_ms();
|
|
uint64_t timeout = (uint64_t)d->config.interval_ms
|
|
* (uint64_t)d->config.timeout_intervals;
|
|
|
|
for (int i = 0; i < DISCOVERY_MAX_PEERS; i++) {
|
|
struct Discovery_Peer peer_copy;
|
|
int expired = 0;
|
|
|
|
pthread_mutex_lock(&d->peers_mutex);
|
|
if (d->peers[i].active && (ts - d->peers[i].last_seen_ms) > timeout) {
|
|
peer_copy = d->peers[i].info;
|
|
d->peers[i].active = 0;
|
|
expired = 1;
|
|
}
|
|
pthread_mutex_unlock(&d->peers_mutex);
|
|
|
|
if (expired && d->config.on_peer_lost) {
|
|
d->config.on_peer_lost(&peer_copy, d->config.userdata);
|
|
}
|
|
}
|
|
}
|
|
|
|
/* -- announce thread ------------------------------------------------------- */
|
|
|
|
static void *announce_thread_fn(void *arg) {
|
|
struct Discovery *d = arg;
|
|
|
|
send_announcement(d);
|
|
|
|
pthread_mutex_lock(&d->announce_mutex);
|
|
while (atomic_load(&d->running)) {
|
|
struct timespec abs;
|
|
clock_gettime(CLOCK_REALTIME, &abs);
|
|
uint32_t ms = d->config.interval_ms;
|
|
abs.tv_sec += ms / 1000u;
|
|
abs.tv_nsec += (long)(ms % 1000u) * 1000000L;
|
|
if (abs.tv_nsec >= 1000000000L) {
|
|
abs.tv_sec++;
|
|
abs.tv_nsec -= 1000000000L;
|
|
}
|
|
/* blocks until signaled (new peer / shutdown) or interval elapses */
|
|
pthread_cond_timedwait(&d->announce_cond, &d->announce_mutex, &abs);
|
|
|
|
if (!atomic_load(&d->running)) { break; }
|
|
|
|
send_announcement(d);
|
|
check_timeouts(d);
|
|
}
|
|
pthread_mutex_unlock(&d->announce_mutex);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
/* -- receive thread -------------------------------------------------------- */
|
|
|
|
static void *receive_thread_fn(void *arg) {
|
|
struct Discovery *d = arg;
|
|
uint8_t buf[RECV_BUF_SIZE];
|
|
struct sockaddr_in src;
|
|
socklen_t src_len;
|
|
|
|
while (atomic_load(&d->running)) {
|
|
src_len = sizeof(src);
|
|
ssize_t n = recvfrom(d->sock, buf, sizeof(buf), 0,
|
|
(struct sockaddr *)&src, &src_len);
|
|
|
|
if (n < 0) {
|
|
/* SO_RCVTIMEO fires as EAGAIN — just loop and re-check running */
|
|
continue;
|
|
}
|
|
|
|
/* validate frame header */
|
|
if (n < (ssize_t)TRANSPORT_FRAME_HEADER_SIZE) { continue; }
|
|
uint16_t msg_type = get_u16(buf, 0);
|
|
uint32_t payload_len = get_u32(buf, 2);
|
|
if (msg_type != 0x0010) { continue; }
|
|
if ((ssize_t)(TRANSPORT_FRAME_HEADER_SIZE + payload_len) > n) { continue; }
|
|
if (payload_len < ANN_FIXED_SIZE) { continue; }
|
|
|
|
/* parse announcement payload */
|
|
uint8_t *p = buf + TRANSPORT_FRAME_HEADER_SIZE;
|
|
uint16_t site_id = get_u16(p, ANN_SITE_ID);
|
|
uint16_t tcp_port = get_u16(p, ANN_TCP_PORT);
|
|
uint16_t flags = get_u16(p, ANN_FUNCTION_FLAGS);
|
|
uint8_t name_len = get_u8 (p, ANN_NAME_LEN);
|
|
|
|
if (payload_len < (uint32_t)(ANN_FIXED_SIZE + name_len)) { continue; }
|
|
|
|
char name[DISCOVERY_MAX_NAME_LEN + 1];
|
|
memcpy(name, p + ANN_NAME, name_len);
|
|
name[name_len] = '\0';
|
|
|
|
/* skip our own announcements */
|
|
if (site_id == d->config.site_id
|
|
&& strcmp(name, d->config.name) == 0) {
|
|
continue;
|
|
}
|
|
|
|
uint32_t addr = src.sin_addr.s_addr;
|
|
uint64_t ts = now_ms();
|
|
int is_new = 0;
|
|
struct Discovery_Peer peer_copy;
|
|
|
|
pthread_mutex_lock(&d->peers_mutex);
|
|
|
|
int idx = find_peer(d, addr, name);
|
|
if (idx >= 0) {
|
|
d->peers[idx].last_seen_ms = ts;
|
|
d->peers[idx].info.site_id = site_id;
|
|
d->peers[idx].info.tcp_port = tcp_port;
|
|
d->peers[idx].info.function_flags = flags;
|
|
} else {
|
|
idx = find_slot(d);
|
|
if (idx >= 0) {
|
|
d->peers[idx].active = 1;
|
|
d->peers[idx].last_seen_ms = ts;
|
|
d->peers[idx].info.addr = addr;
|
|
d->peers[idx].info.site_id = site_id;
|
|
d->peers[idx].info.tcp_port = tcp_port;
|
|
d->peers[idx].info.function_flags = flags;
|
|
strncpy(d->peers[idx].info.name, name, DISCOVERY_MAX_NAME_LEN);
|
|
d->peers[idx].info.name[DISCOVERY_MAX_NAME_LEN] = '\0';
|
|
peer_copy = d->peers[idx].info;
|
|
is_new = 1;
|
|
}
|
|
}
|
|
|
|
pthread_mutex_unlock(&d->peers_mutex);
|
|
|
|
/* respond to every announcement — the sender may be a fresh instance
|
|
* that doesn't know about us yet even if we already have it in our table */
|
|
pthread_mutex_lock(&d->announce_mutex);
|
|
pthread_cond_signal(&d->announce_cond);
|
|
pthread_mutex_unlock(&d->announce_mutex);
|
|
|
|
if (is_new && d->config.on_peer_found) {
|
|
d->config.on_peer_found(&peer_copy, d->config.userdata);
|
|
}
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
/* -- public API ------------------------------------------------------------ */
|
|
|
|
struct App_Error discovery_create(struct Discovery **out,
|
|
struct Discovery_Config *config)
|
|
{
|
|
struct Discovery *d = calloc(1, sizeof(*d));
|
|
if (!d) { return APP_SYSCALL_ERROR(); }
|
|
|
|
d->config = *config;
|
|
d->sock = -1;
|
|
|
|
if (d->config.interval_ms == 0) {
|
|
d->config.interval_ms = DEFAULT_INTERVAL_MS;
|
|
}
|
|
if (d->config.timeout_intervals == 0) {
|
|
d->config.timeout_intervals = DEFAULT_TIMEOUT_INTERVALS;
|
|
}
|
|
|
|
d->mcast_addr.sin_family = AF_INET;
|
|
d->mcast_addr.sin_port = htons(DISCOVERY_PORT);
|
|
inet_pton(AF_INET, DISCOVERY_MULTICAST_GROUP, &d->mcast_addr.sin_addr);
|
|
|
|
atomic_init(&d->running, 0);
|
|
pthread_mutex_init(&d->announce_mutex, NULL);
|
|
pthread_cond_init(&d->announce_cond, NULL);
|
|
pthread_mutex_init(&d->peers_mutex, NULL);
|
|
|
|
*out = d;
|
|
return APP_OK;
|
|
}
|
|
|
|
struct App_Error discovery_start(struct Discovery *d) {
|
|
int sock = socket(AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0);
|
|
if (sock < 0) { return APP_SYSCALL_ERROR(); }
|
|
|
|
int opt = 1;
|
|
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
|
|
setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
|
|
|
|
struct sockaddr_in bind_addr = {0};
|
|
bind_addr.sin_family = AF_INET;
|
|
bind_addr.sin_addr.s_addr = htonl(INADDR_ANY);
|
|
bind_addr.sin_port = htons(DISCOVERY_PORT);
|
|
|
|
if (bind(sock, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) < 0) {
|
|
close(sock);
|
|
return APP_SYSCALL_ERROR();
|
|
}
|
|
|
|
struct ip_mreq mreq = {0};
|
|
inet_pton(AF_INET, DISCOVERY_MULTICAST_GROUP, &mreq.imr_multiaddr);
|
|
mreq.imr_interface.s_addr = htonl(INADDR_ANY);
|
|
if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
|
|
&mreq, sizeof(mreq)) < 0) {
|
|
close(sock);
|
|
return APP_SYSCALL_ERROR();
|
|
}
|
|
|
|
uint8_t ttl = 1;
|
|
setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl));
|
|
|
|
/* 1 second receive timeout so the receive thread can check running */
|
|
struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };
|
|
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
|
|
|
|
d->sock = sock;
|
|
atomic_store(&d->running, 1);
|
|
|
|
if (pthread_create(&d->receive_thread, NULL, receive_thread_fn, d) != 0) {
|
|
atomic_store(&d->running, 0);
|
|
close(sock);
|
|
d->sock = -1;
|
|
return APP_SYSCALL_ERROR();
|
|
}
|
|
|
|
if (pthread_create(&d->announce_thread, NULL, announce_thread_fn, d) != 0) {
|
|
atomic_store(&d->running, 0);
|
|
close(sock);
|
|
d->sock = -1;
|
|
pthread_join(d->receive_thread, NULL);
|
|
return APP_SYSCALL_ERROR();
|
|
}
|
|
|
|
return APP_OK;
|
|
}
|
|
|
|
void discovery_destroy(struct Discovery *d) {
|
|
atomic_store(&d->running, 0);
|
|
/* wake announce thread so it exits without waiting for the full interval */
|
|
pthread_mutex_lock(&d->announce_mutex);
|
|
pthread_cond_signal(&d->announce_cond);
|
|
pthread_mutex_unlock(&d->announce_mutex);
|
|
close(d->sock);
|
|
pthread_join(d->announce_thread, NULL);
|
|
pthread_join(d->receive_thread, NULL);
|
|
pthread_cond_destroy(&d->announce_cond);
|
|
pthread_mutex_destroy(&d->announce_mutex);
|
|
pthread_mutex_destroy(&d->peers_mutex);
|
|
free(d);
|
|
}
|