Add discovery module: UDP multicast announcements and peer tracking
Sends 6-byte framed announcements to 224.0.0.251:5353 on startup and every interval_ms (default 5s). Receive thread maintains a peer table (max 64 entries); fires on_peer_found for new peers, on_peer_lost when a peer misses timeout_intervals (default 3) consecutive intervals. Own announcements are filtered by name+site_id. SO_REUSEADDR+REUSEPORT allows multiple processes on the same host for testing. discovery_cli: announce <name> <tcp_port> [flags] — prints found/lost events. Also notes future config module in planning.md. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -7,13 +7,15 @@ MEDIA_CTRL_OBJ = $(BUILD)/media_ctrl/media_ctrl.o
|
|||||||
V4L2_CTRL_OBJ = $(BUILD)/v4l2_ctrl/v4l2_ctrl.o
|
V4L2_CTRL_OBJ = $(BUILD)/v4l2_ctrl/v4l2_ctrl.o
|
||||||
SERIAL_OBJ = $(BUILD)/serial/serial.o
|
SERIAL_OBJ = $(BUILD)/serial/serial.o
|
||||||
TRANSPORT_OBJ = $(BUILD)/transport/transport.o
|
TRANSPORT_OBJ = $(BUILD)/transport/transport.o
|
||||||
|
DISCOVERY_OBJ = $(BUILD)/discovery/discovery.o
|
||||||
|
|
||||||
.PHONY: all clean modules
|
.PHONY: all clean modules
|
||||||
|
|
||||||
all: modules \
|
all: modules \
|
||||||
$(CLI_BUILD)/media_ctrl_cli \
|
$(CLI_BUILD)/media_ctrl_cli \
|
||||||
$(CLI_BUILD)/v4l2_ctrl_cli \
|
$(CLI_BUILD)/v4l2_ctrl_cli \
|
||||||
$(CLI_BUILD)/transport_cli
|
$(CLI_BUILD)/transport_cli \
|
||||||
|
$(CLI_BUILD)/discovery_cli
|
||||||
|
|
||||||
modules:
|
modules:
|
||||||
$(MAKE) -C $(ROOT)/src/modules/common
|
$(MAKE) -C $(ROOT)/src/modules/common
|
||||||
@@ -21,6 +23,7 @@ modules:
|
|||||||
$(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl
|
$(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl
|
||||||
$(MAKE) -C $(ROOT)/src/modules/serial
|
$(MAKE) -C $(ROOT)/src/modules/serial
|
||||||
$(MAKE) -C $(ROOT)/src/modules/transport
|
$(MAKE) -C $(ROOT)/src/modules/transport
|
||||||
|
$(MAKE) -C $(ROOT)/src/modules/discovery
|
||||||
|
|
||||||
$(CLI_BUILD)/media_ctrl_cli: media_ctrl_cli.c $(COMMON_OBJ) $(MEDIA_CTRL_OBJ) | $(CLI_BUILD)
|
$(CLI_BUILD)/media_ctrl_cli: media_ctrl_cli.c $(COMMON_OBJ) $(MEDIA_CTRL_OBJ) | $(CLI_BUILD)
|
||||||
$(CC) $(CFLAGS) -o $@ $^
|
$(CC) $(CFLAGS) -o $@ $^
|
||||||
@@ -31,6 +34,9 @@ $(CLI_BUILD)/v4l2_ctrl_cli: v4l2_ctrl_cli.c $(COMMON_OBJ) $(V4L2_CTRL_OBJ) | $(C
|
|||||||
$(CLI_BUILD)/transport_cli: transport_cli.c $(COMMON_OBJ) $(SERIAL_OBJ) $(TRANSPORT_OBJ) | $(CLI_BUILD)
|
$(CLI_BUILD)/transport_cli: transport_cli.c $(COMMON_OBJ) $(SERIAL_OBJ) $(TRANSPORT_OBJ) | $(CLI_BUILD)
|
||||||
$(CC) $(CFLAGS) -o $@ $^ -lpthread
|
$(CC) $(CFLAGS) -o $@ $^ -lpthread
|
||||||
|
|
||||||
|
$(CLI_BUILD)/discovery_cli: discovery_cli.c $(COMMON_OBJ) $(SERIAL_OBJ) $(DISCOVERY_OBJ) | $(CLI_BUILD)
|
||||||
|
$(CC) $(CFLAGS) -o $@ $^ -lpthread
|
||||||
|
|
||||||
$(CLI_BUILD):
|
$(CLI_BUILD):
|
||||||
mkdir -p $@
|
mkdir -p $@
|
||||||
|
|
||||||
@@ -38,4 +44,5 @@ clean:
|
|||||||
rm -f \
|
rm -f \
|
||||||
$(CLI_BUILD)/media_ctrl_cli \
|
$(CLI_BUILD)/media_ctrl_cli \
|
||||||
$(CLI_BUILD)/v4l2_ctrl_cli \
|
$(CLI_BUILD)/v4l2_ctrl_cli \
|
||||||
$(CLI_BUILD)/transport_cli
|
$(CLI_BUILD)/transport_cli \
|
||||||
|
$(CLI_BUILD)/discovery_cli
|
||||||
|
|||||||
108
dev/cli/discovery_cli.c
Normal file
108
dev/cli/discovery_cli.c
Normal file
@@ -0,0 +1,108 @@
|
|||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
|
||||||
|
#include "discovery.h"
|
||||||
|
#include "error.h"
|
||||||
|
|
||||||
|
static void flags_str(uint16_t flags, char *buf, size_t len) {
|
||||||
|
buf[0] = '\0';
|
||||||
|
if (flags & DISCOVERY_FLAG_SOURCE) { strncat(buf, "source ", len - strlen(buf) - 1); }
|
||||||
|
if (flags & DISCOVERY_FLAG_RELAY) { strncat(buf, "relay ", len - strlen(buf) - 1); }
|
||||||
|
if (flags & DISCOVERY_FLAG_SINK) { strncat(buf, "sink ", len - strlen(buf) - 1); }
|
||||||
|
if (flags & DISCOVERY_FLAG_CONTROLLER) { strncat(buf, "controller ", len - strlen(buf) - 1); }
|
||||||
|
/* trim trailing space */
|
||||||
|
size_t l = strlen(buf);
|
||||||
|
if (l > 0 && buf[l - 1] == ' ') { buf[l - 1] = '\0'; }
|
||||||
|
}
|
||||||
|
|
||||||
|
static void on_peer_found(const struct Discovery_Peer *peer, void *userdata) {
|
||||||
|
(void)userdata;
|
||||||
|
char addr[INET_ADDRSTRLEN];
|
||||||
|
inet_ntop(AF_INET, &peer->addr, addr, sizeof(addr));
|
||||||
|
char flags[64];
|
||||||
|
flags_str(peer->function_flags, flags, sizeof(flags));
|
||||||
|
printf("found %-30s %s:%u site=%u [%s]\n",
|
||||||
|
peer->name, addr, peer->tcp_port, peer->site_id, flags);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void on_peer_lost(const struct Discovery_Peer *peer, void *userdata) {
|
||||||
|
(void)userdata;
|
||||||
|
char addr[INET_ADDRSTRLEN];
|
||||||
|
inet_ntop(AF_INET, &peer->addr, addr, sizeof(addr));
|
||||||
|
printf("lost %-30s %s:%u\n", peer->name, addr, peer->tcp_port);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void usage(void) {
|
||||||
|
fprintf(stderr,
|
||||||
|
"usage: discovery_cli <name> <tcp_port> [flags]\n"
|
||||||
|
"\n"
|
||||||
|
" name node name in namespace:instance form, e.g. v4l2:microscope\n"
|
||||||
|
" tcp_port port this node listens on for transport connections\n"
|
||||||
|
" flags comma-separated roles: source,relay,sink,controller\n"
|
||||||
|
" default: source\n"
|
||||||
|
"\n"
|
||||||
|
"Announces this node on the multicast group and prints peers as they\n"
|
||||||
|
"appear and disappear. Press Ctrl-C to exit.\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
static uint16_t parse_flags(const char *s) {
|
||||||
|
uint16_t f = 0;
|
||||||
|
if (strstr(s, "source")) { f |= DISCOVERY_FLAG_SOURCE; }
|
||||||
|
if (strstr(s, "relay")) { f |= DISCOVERY_FLAG_RELAY; }
|
||||||
|
if (strstr(s, "sink")) { f |= DISCOVERY_FLAG_SINK; }
|
||||||
|
if (strstr(s, "controller")) { f |= DISCOVERY_FLAG_CONTROLLER; }
|
||||||
|
return f;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char **argv) {
|
||||||
|
if (argc < 3) {
|
||||||
|
usage();
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
const char *name = argv[1];
|
||||||
|
uint16_t tcp_port = (uint16_t)atoi(argv[2]);
|
||||||
|
uint16_t flags = argc >= 4 ? parse_flags(argv[3]) : DISCOVERY_FLAG_SOURCE;
|
||||||
|
|
||||||
|
if (strlen(name) == 0 || strlen(name) > DISCOVERY_MAX_NAME_LEN) {
|
||||||
|
fprintf(stderr, "name must be 1–%d characters\n", DISCOVERY_MAX_NAME_LEN);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Discovery_Config config = {
|
||||||
|
.site_id = 0,
|
||||||
|
.tcp_port = tcp_port,
|
||||||
|
.function_flags = flags,
|
||||||
|
.name = name,
|
||||||
|
.interval_ms = 0, /* use default */
|
||||||
|
.timeout_intervals = 0, /* use default */
|
||||||
|
.on_peer_found = on_peer_found,
|
||||||
|
.on_peer_lost = on_peer_lost,
|
||||||
|
.userdata = NULL,
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Discovery *d;
|
||||||
|
struct App_Error err = discovery_create(&d, &config);
|
||||||
|
if (!APP_IS_OK(err)) {
|
||||||
|
fprintf(stderr, "discovery_create: errno %d\n", err.detail.syscall.err_no);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
err = discovery_start(d);
|
||||||
|
if (!APP_IS_OK(err)) {
|
||||||
|
fprintf(stderr, "discovery_start: errno %d\n", err.detail.syscall.err_no);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
char flags_buf[64];
|
||||||
|
flags_str(flags, flags_buf, sizeof(flags_buf));
|
||||||
|
printf("announcing %-30s port=%u [%s]\n", name, tcp_port, flags_buf);
|
||||||
|
printf("listening on %s:%d\n\n", DISCOVERY_MULTICAST_GROUP, DISCOVERY_PORT);
|
||||||
|
|
||||||
|
pause();
|
||||||
|
|
||||||
|
discovery_destroy(d);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
50
include/discovery.h
Normal file
50
include/discovery.h
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <stdint.h>
|
||||||
|
#include "error.h"
|
||||||
|
|
||||||
|
#define DISCOVERY_MULTICAST_GROUP "224.0.0.251"
|
||||||
|
#define DISCOVERY_PORT 5353
|
||||||
|
#define DISCOVERY_PROTOCOL_VERSION 1
|
||||||
|
#define DISCOVERY_MAX_NAME_LEN 255
|
||||||
|
#define DISCOVERY_MAX_PEERS 64
|
||||||
|
|
||||||
|
/* function_flags bits — matches protocol doc */
|
||||||
|
#define DISCOVERY_FLAG_SOURCE (1u << 0)
|
||||||
|
#define DISCOVERY_FLAG_RELAY (1u << 1)
|
||||||
|
#define DISCOVERY_FLAG_SINK (1u << 2)
|
||||||
|
#define DISCOVERY_FLAG_CONTROLLER (1u << 3)
|
||||||
|
|
||||||
|
struct Discovery_Peer {
|
||||||
|
uint32_t addr; /* source IPv4, network byte order */
|
||||||
|
uint16_t site_id;
|
||||||
|
uint16_t tcp_port;
|
||||||
|
uint16_t function_flags;
|
||||||
|
char name[DISCOVERY_MAX_NAME_LEN + 1];
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef void (*Discovery_Peer_Found_Cb)(const struct Discovery_Peer *peer, void *userdata);
|
||||||
|
typedef void (*Discovery_Peer_Lost_Cb)(const struct Discovery_Peer *peer, void *userdata);
|
||||||
|
|
||||||
|
struct Discovery_Config {
|
||||||
|
/* our own identity */
|
||||||
|
uint16_t site_id;
|
||||||
|
uint16_t tcp_port;
|
||||||
|
uint16_t function_flags;
|
||||||
|
const char *name;
|
||||||
|
|
||||||
|
/* timing */
|
||||||
|
uint32_t interval_ms; /* announcement interval; 0 = use default (5000) */
|
||||||
|
uint32_t timeout_intervals; /* missed intervals before peer_lost; 0 = use default (3) */
|
||||||
|
|
||||||
|
/* callbacks */
|
||||||
|
Discovery_Peer_Found_Cb on_peer_found; /* required */
|
||||||
|
Discovery_Peer_Lost_Cb on_peer_lost; /* optional */
|
||||||
|
void *userdata;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Discovery;
|
||||||
|
|
||||||
|
struct App_Error discovery_create(struct Discovery **out, struct Discovery_Config *config);
|
||||||
|
struct App_Error discovery_start(struct Discovery *d);
|
||||||
|
void discovery_destroy(struct Discovery *d);
|
||||||
20
planning.md
20
planning.md
@@ -46,16 +46,18 @@ Modules are listed in intended build order. Each depends only on modules above i
|
|||||||
| 2 | `media_ctrl` | done | Media Controller API — device and topology enumeration, pad format config |
|
| 2 | `media_ctrl` | done | Media Controller API — device and topology enumeration, pad format config |
|
||||||
| 3 | `v4l2_ctrl` | done | V4L2 controls — enumerate, get, set camera parameters |
|
| 3 | `v4l2_ctrl` | done | V4L2 controls — enumerate, get, set camera parameters |
|
||||||
| 4 | `serial` | done | `put`/`get` primitives for little-endian binary serialization into byte buffers |
|
| 4 | `serial` | done | `put`/`get` primitives for little-endian binary serialization into byte buffers |
|
||||||
| 5 | `transport` | not started | Encapsulated transport — frame header, TCP stream abstraction, single-write send |
|
| 5 | `transport` | done | Encapsulated transport — frame header, TCP stream abstraction, single-write send |
|
||||||
| 6 | `protocol` | not started | Typed `write_*`/`read_*` functions for all message types; builds on serial + transport |
|
| 6 | `discovery` | done | UDP multicast announcements, peer table, found/lost callbacks |
|
||||||
| 7 | `frame_alloc` | not started | Per-frame allocation with bookkeeping (byte budget, ref counting) |
|
| 8 | `protocol` | not started | Typed `write_*`/`read_*` functions for all message types; builds on serial + transport |
|
||||||
| 8 | `relay` | not started | Input dispatch to output queues (low-latency and completeness modes) |
|
| 9 | `frame_alloc` | not started | Per-frame allocation with bookkeeping (byte budget, ref counting) |
|
||||||
| 9 | `ingest` | not started | V4L2 capture loop — dequeue buffers, emit one encapsulated frame per buffer |
|
| 10 | `relay` | not started | Input dispatch to output queues (low-latency and completeness modes) |
|
||||||
| 10 | `archive` | not started | Write frames to disk, control messages to binary log |
|
| 11 | `ingest` | not started | V4L2 capture loop — dequeue buffers, emit one encapsulated frame per buffer |
|
||||||
| 11 | `codec` | not started | Per-frame encode/decode — MJPEG (libjpeg-turbo), QOI, ZSTD-raw, VA-API H.264 intra; used by screen grab source and archive |
|
| 12 | `archive` | not started | Write frames to disk, control messages to binary log |
|
||||||
| 12 | `xorg` | not started | X11 screen geometry queries (XRandR), screen grab source (calls codec), frame viewer sink — see architecture.md |
|
| 13 | `codec` | not started | Per-frame encode/decode — MJPEG (libjpeg-turbo), QOI, ZSTD-raw, VA-API H.264 intra; used by screen grab source and archive |
|
||||||
| 13 | `web node` | not started | Node.js/Express peer — speaks binary protocol on socket side, HTTP/WebSocket to browser; `protocol.mjs` mirrors C protocol module |
|
| 14 | `xorg` | not started | X11 screen geometry queries (XRandR), screen grab source (calls codec), frame viewer sink — see architecture.md |
|
||||||
|
| 15 | `web node` | not started | Node.js/Express peer — speaks binary protocol on socket side, HTTP/WebSocket to browser; `protocol.mjs` mirrors C protocol module |
|
||||||
| — | `mjpeg_scan` | future | EOI marker scanner for misbehaving hardware that does not deliver clean per-buffer frames; not part of the primary pipeline |
|
| — | `mjpeg_scan` | future | EOI marker scanner for misbehaving hardware that does not deliver clean per-buffer frames; not part of the primary pipeline |
|
||||||
|
| — | `config` | future | Unified config file reader; nodes currently configured via CLI args — needed before production deployment |
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|||||||
17
src/modules/discovery/Makefile
Normal file
17
src/modules/discovery/Makefile
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
ROOT := $(abspath ../../..)
|
||||||
|
include $(ROOT)/common.mk
|
||||||
|
|
||||||
|
MODULE_BUILD = $(BUILD)/discovery
|
||||||
|
|
||||||
|
.PHONY: all clean
|
||||||
|
|
||||||
|
all: $(MODULE_BUILD)/discovery.o
|
||||||
|
|
||||||
|
$(MODULE_BUILD)/discovery.o: discovery.c $(ROOT)/include/discovery.h | $(MODULE_BUILD)
|
||||||
|
$(CC) $(CFLAGS) -c -o $@ $<
|
||||||
|
|
||||||
|
$(MODULE_BUILD):
|
||||||
|
mkdir -p $@
|
||||||
|
|
||||||
|
clean:
|
||||||
|
rm -f $(MODULE_BUILD)/discovery.o
|
||||||
326
src/modules/discovery/discovery.c
Normal file
326
src/modules/discovery/discovery.c
Normal file
@@ -0,0 +1,326 @@
|
|||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <stdatomic.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
|
#include "discovery.h"
|
||||||
|
#include "serial.h"
|
||||||
|
#include "transport.h" /* TRANSPORT_FRAME_HEADER_SIZE */
|
||||||
|
|
||||||
|
#define DEFAULT_INTERVAL_MS 5000u
|
||||||
|
#define DEFAULT_TIMEOUT_INTERVALS 3u
|
||||||
|
#define RECV_BUF_SIZE 512u
|
||||||
|
|
||||||
|
/* announcement payload layout (offsets relative to payload start) */
|
||||||
|
#define ANN_PROTOCOL_VERSION 0 /* u8 */
|
||||||
|
#define ANN_SITE_ID 1 /* u16 */
|
||||||
|
#define ANN_TCP_PORT 3 /* u16 */
|
||||||
|
#define ANN_FUNCTION_FLAGS 5 /* u16 */
|
||||||
|
#define ANN_NAME_LEN 7 /* u8 */
|
||||||
|
#define ANN_NAME 8 /* bytes */
|
||||||
|
#define ANN_FIXED_SIZE 8u
|
||||||
|
|
||||||
|
struct Peer_Entry {
|
||||||
|
struct Discovery_Peer info;
|
||||||
|
uint64_t last_seen_ms;
|
||||||
|
int active;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Discovery {
|
||||||
|
int sock;
|
||||||
|
struct sockaddr_in mcast_addr;
|
||||||
|
struct Discovery_Config config;
|
||||||
|
|
||||||
|
pthread_t announce_thread;
|
||||||
|
pthread_t receive_thread;
|
||||||
|
atomic_int running;
|
||||||
|
|
||||||
|
pthread_mutex_t peers_mutex;
|
||||||
|
struct Peer_Entry peers[DISCOVERY_MAX_PEERS];
|
||||||
|
};
|
||||||
|
|
||||||
|
/* -- helpers --------------------------------------------------------------- */
|
||||||
|
|
||||||
|
static uint64_t now_ms(void) {
|
||||||
|
struct timespec ts;
|
||||||
|
clock_gettime(CLOCK_MONOTONIC, &ts);
|
||||||
|
return (uint64_t)ts.tv_sec * 1000u + (uint64_t)ts.tv_nsec / 1000000u;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int find_peer(struct Discovery *d, uint32_t addr, const char *name) {
|
||||||
|
for (int i = 0; i < DISCOVERY_MAX_PEERS; i++) {
|
||||||
|
if (d->peers[i].active
|
||||||
|
&& d->peers[i].info.addr == addr
|
||||||
|
&& strcmp(d->peers[i].info.name, name) == 0) {
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int find_slot(struct Discovery *d) {
|
||||||
|
for (int i = 0; i < DISCOVERY_MAX_PEERS; i++) {
|
||||||
|
if (!d->peers[i].active) { return i; }
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* -- send ------------------------------------------------------------------ */
|
||||||
|
|
||||||
|
static void send_announcement(struct Discovery *d) {
|
||||||
|
size_t name_len = strlen(d->config.name);
|
||||||
|
if (name_len > DISCOVERY_MAX_NAME_LEN) { name_len = DISCOVERY_MAX_NAME_LEN; }
|
||||||
|
|
||||||
|
uint32_t payload_len = (uint32_t)(ANN_FIXED_SIZE + name_len);
|
||||||
|
size_t total = TRANSPORT_FRAME_HEADER_SIZE + payload_len;
|
||||||
|
uint8_t buf[TRANSPORT_FRAME_HEADER_SIZE + ANN_FIXED_SIZE + DISCOVERY_MAX_NAME_LEN];
|
||||||
|
|
||||||
|
/* frame header */
|
||||||
|
put_u16(buf, 0, 0x0010); /* message_type: DISCOVERY_ANNOUNCE */
|
||||||
|
put_u32(buf, 2, payload_len);
|
||||||
|
|
||||||
|
/* announcement payload */
|
||||||
|
uint8_t *p = buf + TRANSPORT_FRAME_HEADER_SIZE;
|
||||||
|
put_u8 (p, ANN_PROTOCOL_VERSION, DISCOVERY_PROTOCOL_VERSION);
|
||||||
|
put_u16(p, ANN_SITE_ID, d->config.site_id);
|
||||||
|
put_u16(p, ANN_TCP_PORT, d->config.tcp_port);
|
||||||
|
put_u16(p, ANN_FUNCTION_FLAGS, d->config.function_flags);
|
||||||
|
put_u8 (p, ANN_NAME_LEN, (uint8_t)name_len);
|
||||||
|
memcpy(p + ANN_NAME, d->config.name, name_len);
|
||||||
|
|
||||||
|
sendto(d->sock, buf, total, 0,
|
||||||
|
(struct sockaddr *)&d->mcast_addr, sizeof(d->mcast_addr));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* -- timeout check --------------------------------------------------------- */
|
||||||
|
|
||||||
|
static void check_timeouts(struct Discovery *d) {
|
||||||
|
uint64_t ts = now_ms();
|
||||||
|
uint64_t timeout = (uint64_t)d->config.interval_ms
|
||||||
|
* (uint64_t)d->config.timeout_intervals;
|
||||||
|
|
||||||
|
for (int i = 0; i < DISCOVERY_MAX_PEERS; i++) {
|
||||||
|
struct Discovery_Peer peer_copy;
|
||||||
|
int expired = 0;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&d->peers_mutex);
|
||||||
|
if (d->peers[i].active && (ts - d->peers[i].last_seen_ms) > timeout) {
|
||||||
|
peer_copy = d->peers[i].info;
|
||||||
|
d->peers[i].active = 0;
|
||||||
|
expired = 1;
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&d->peers_mutex);
|
||||||
|
|
||||||
|
if (expired && d->config.on_peer_lost) {
|
||||||
|
d->config.on_peer_lost(&peer_copy, d->config.userdata);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* -- announce thread ------------------------------------------------------- */
|
||||||
|
|
||||||
|
static void *announce_thread_fn(void *arg) {
|
||||||
|
struct Discovery *d = arg;
|
||||||
|
|
||||||
|
send_announcement(d);
|
||||||
|
|
||||||
|
while (atomic_load(&d->running)) {
|
||||||
|
/* sleep in 100 ms increments so destroy doesn't wait a full interval */
|
||||||
|
uint32_t elapsed = 0;
|
||||||
|
while (atomic_load(&d->running) && elapsed < d->config.interval_ms) {
|
||||||
|
struct timespec ts = { .tv_sec = 0, .tv_nsec = 100 * 1000000L };
|
||||||
|
nanosleep(&ts, NULL);
|
||||||
|
elapsed += 100;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!atomic_load(&d->running)) { break; }
|
||||||
|
|
||||||
|
send_announcement(d);
|
||||||
|
check_timeouts(d);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* -- receive thread -------------------------------------------------------- */
|
||||||
|
|
||||||
|
static void *receive_thread_fn(void *arg) {
|
||||||
|
struct Discovery *d = arg;
|
||||||
|
uint8_t buf[RECV_BUF_SIZE];
|
||||||
|
struct sockaddr_in src;
|
||||||
|
socklen_t src_len;
|
||||||
|
|
||||||
|
while (atomic_load(&d->running)) {
|
||||||
|
src_len = sizeof(src);
|
||||||
|
ssize_t n = recvfrom(d->sock, buf, sizeof(buf), 0,
|
||||||
|
(struct sockaddr *)&src, &src_len);
|
||||||
|
|
||||||
|
if (n < 0) {
|
||||||
|
/* SO_RCVTIMEO fires as EAGAIN — just loop and re-check running */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* validate frame header */
|
||||||
|
if (n < (ssize_t)TRANSPORT_FRAME_HEADER_SIZE) { continue; }
|
||||||
|
uint16_t msg_type = get_u16(buf, 0);
|
||||||
|
uint32_t payload_len = get_u32(buf, 2);
|
||||||
|
if (msg_type != 0x0010) { continue; }
|
||||||
|
if ((ssize_t)(TRANSPORT_FRAME_HEADER_SIZE + payload_len) > n) { continue; }
|
||||||
|
if (payload_len < ANN_FIXED_SIZE) { continue; }
|
||||||
|
|
||||||
|
/* parse announcement payload */
|
||||||
|
uint8_t *p = buf + TRANSPORT_FRAME_HEADER_SIZE;
|
||||||
|
uint16_t site_id = get_u16(p, ANN_SITE_ID);
|
||||||
|
uint16_t tcp_port = get_u16(p, ANN_TCP_PORT);
|
||||||
|
uint16_t flags = get_u16(p, ANN_FUNCTION_FLAGS);
|
||||||
|
uint8_t name_len = get_u8 (p, ANN_NAME_LEN);
|
||||||
|
|
||||||
|
if (payload_len < (uint32_t)(ANN_FIXED_SIZE + name_len)) { continue; }
|
||||||
|
|
||||||
|
char name[DISCOVERY_MAX_NAME_LEN + 1];
|
||||||
|
memcpy(name, p + ANN_NAME, name_len);
|
||||||
|
name[name_len] = '\0';
|
||||||
|
|
||||||
|
/* skip our own announcements */
|
||||||
|
if (site_id == d->config.site_id
|
||||||
|
&& strcmp(name, d->config.name) == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t addr = src.sin_addr.s_addr;
|
||||||
|
uint64_t ts = now_ms();
|
||||||
|
int is_new = 0;
|
||||||
|
struct Discovery_Peer peer_copy;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&d->peers_mutex);
|
||||||
|
|
||||||
|
int idx = find_peer(d, addr, name);
|
||||||
|
if (idx >= 0) {
|
||||||
|
d->peers[idx].last_seen_ms = ts;
|
||||||
|
d->peers[idx].info.site_id = site_id;
|
||||||
|
d->peers[idx].info.tcp_port = tcp_port;
|
||||||
|
d->peers[idx].info.function_flags = flags;
|
||||||
|
} else {
|
||||||
|
idx = find_slot(d);
|
||||||
|
if (idx >= 0) {
|
||||||
|
d->peers[idx].active = 1;
|
||||||
|
d->peers[idx].last_seen_ms = ts;
|
||||||
|
d->peers[idx].info.addr = addr;
|
||||||
|
d->peers[idx].info.site_id = site_id;
|
||||||
|
d->peers[idx].info.tcp_port = tcp_port;
|
||||||
|
d->peers[idx].info.function_flags = flags;
|
||||||
|
strncpy(d->peers[idx].info.name, name, DISCOVERY_MAX_NAME_LEN);
|
||||||
|
d->peers[idx].info.name[DISCOVERY_MAX_NAME_LEN] = '\0';
|
||||||
|
peer_copy = d->peers[idx].info;
|
||||||
|
is_new = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&d->peers_mutex);
|
||||||
|
|
||||||
|
if (is_new && d->config.on_peer_found) {
|
||||||
|
d->config.on_peer_found(&peer_copy, d->config.userdata);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* -- public API ------------------------------------------------------------ */
|
||||||
|
|
||||||
|
struct App_Error discovery_create(struct Discovery **out,
|
||||||
|
struct Discovery_Config *config)
|
||||||
|
{
|
||||||
|
struct Discovery *d = calloc(1, sizeof(*d));
|
||||||
|
if (!d) { return APP_SYSCALL_ERROR(); }
|
||||||
|
|
||||||
|
d->config = *config;
|
||||||
|
d->sock = -1;
|
||||||
|
|
||||||
|
if (d->config.interval_ms == 0) {
|
||||||
|
d->config.interval_ms = DEFAULT_INTERVAL_MS;
|
||||||
|
}
|
||||||
|
if (d->config.timeout_intervals == 0) {
|
||||||
|
d->config.timeout_intervals = DEFAULT_TIMEOUT_INTERVALS;
|
||||||
|
}
|
||||||
|
|
||||||
|
d->mcast_addr.sin_family = AF_INET;
|
||||||
|
d->mcast_addr.sin_port = htons(DISCOVERY_PORT);
|
||||||
|
inet_pton(AF_INET, DISCOVERY_MULTICAST_GROUP, &d->mcast_addr.sin_addr);
|
||||||
|
|
||||||
|
atomic_init(&d->running, 0);
|
||||||
|
pthread_mutex_init(&d->peers_mutex, NULL);
|
||||||
|
|
||||||
|
*out = d;
|
||||||
|
return APP_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct App_Error discovery_start(struct Discovery *d) {
|
||||||
|
int sock = socket(AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0);
|
||||||
|
if (sock < 0) { return APP_SYSCALL_ERROR(); }
|
||||||
|
|
||||||
|
int opt = 1;
|
||||||
|
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
|
||||||
|
setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
|
||||||
|
|
||||||
|
struct sockaddr_in bind_addr = {0};
|
||||||
|
bind_addr.sin_family = AF_INET;
|
||||||
|
bind_addr.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||||
|
bind_addr.sin_port = htons(DISCOVERY_PORT);
|
||||||
|
|
||||||
|
if (bind(sock, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) < 0) {
|
||||||
|
close(sock);
|
||||||
|
return APP_SYSCALL_ERROR();
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ip_mreq mreq = {0};
|
||||||
|
inet_pton(AF_INET, DISCOVERY_MULTICAST_GROUP, &mreq.imr_multiaddr);
|
||||||
|
mreq.imr_interface.s_addr = htonl(INADDR_ANY);
|
||||||
|
if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
|
||||||
|
&mreq, sizeof(mreq)) < 0) {
|
||||||
|
close(sock);
|
||||||
|
return APP_SYSCALL_ERROR();
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t ttl = 1;
|
||||||
|
setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl));
|
||||||
|
|
||||||
|
/* 1 second receive timeout so the receive thread can check running */
|
||||||
|
struct timeval tv = { .tv_sec = 1, .tv_usec = 0 };
|
||||||
|
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
|
||||||
|
|
||||||
|
d->sock = sock;
|
||||||
|
atomic_store(&d->running, 1);
|
||||||
|
|
||||||
|
if (pthread_create(&d->receive_thread, NULL, receive_thread_fn, d) != 0) {
|
||||||
|
atomic_store(&d->running, 0);
|
||||||
|
close(sock);
|
||||||
|
d->sock = -1;
|
||||||
|
return APP_SYSCALL_ERROR();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pthread_create(&d->announce_thread, NULL, announce_thread_fn, d) != 0) {
|
||||||
|
atomic_store(&d->running, 0);
|
||||||
|
close(sock);
|
||||||
|
d->sock = -1;
|
||||||
|
pthread_join(d->receive_thread, NULL);
|
||||||
|
return APP_SYSCALL_ERROR();
|
||||||
|
}
|
||||||
|
|
||||||
|
return APP_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
void discovery_destroy(struct Discovery *d) {
|
||||||
|
atomic_store(&d->running, 0);
|
||||||
|
close(d->sock);
|
||||||
|
pthread_join(d->announce_thread, NULL);
|
||||||
|
pthread_join(d->receive_thread, NULL);
|
||||||
|
pthread_mutex_destroy(&d->peers_mutex);
|
||||||
|
free(d);
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user