From fe350e531e1c7c5080cb8c01c13e81659099f420 Mon Sep 17 00:00:00 2001 From: mikael-lovqvists-claude-agent Date: Thu, 26 Mar 2026 22:19:56 +0000 Subject: [PATCH] Add discovery module: UDP multicast announcements and peer tracking MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sends 6-byte framed announcements to 224.0.0.251:5353 on startup and every interval_ms (default 5s). Receive thread maintains a peer table (max 64 entries); fires on_peer_found for new peers, on_peer_lost when a peer misses timeout_intervals (default 3) consecutive intervals. Own announcements are filtered by name+site_id. SO_REUSEADDR+REUSEPORT allows multiple processes on the same host for testing. discovery_cli: announce [flags] — prints found/lost events. Also notes future config module in planning.md. Co-Authored-By: Claude Sonnet 4.6 --- dev/cli/Makefile | 23 ++- dev/cli/discovery_cli.c | 108 ++++++++++ include/discovery.h | 50 +++++ planning.md | 20 +- src/modules/discovery/Makefile | 17 ++ src/modules/discovery/discovery.c | 326 ++++++++++++++++++++++++++++++ 6 files changed, 527 insertions(+), 17 deletions(-) create mode 100644 dev/cli/discovery_cli.c create mode 100644 include/discovery.h create mode 100644 src/modules/discovery/Makefile create mode 100644 src/modules/discovery/discovery.c diff --git a/dev/cli/Makefile b/dev/cli/Makefile index e5b9203..5bb8cbf 100644 --- a/dev/cli/Makefile +++ b/dev/cli/Makefile @@ -1,19 +1,21 @@ ROOT := $(abspath ../..) include $(ROOT)/common.mk -CLI_BUILD = $(BUILD)/cli -COMMON_OBJ = $(BUILD)/common/error.o -MEDIA_CTRL_OBJ = $(BUILD)/media_ctrl/media_ctrl.o -V4L2_CTRL_OBJ = $(BUILD)/v4l2_ctrl/v4l2_ctrl.o -SERIAL_OBJ = $(BUILD)/serial/serial.o -TRANSPORT_OBJ = $(BUILD)/transport/transport.o +CLI_BUILD = $(BUILD)/cli +COMMON_OBJ = $(BUILD)/common/error.o +MEDIA_CTRL_OBJ = $(BUILD)/media_ctrl/media_ctrl.o +V4L2_CTRL_OBJ = $(BUILD)/v4l2_ctrl/v4l2_ctrl.o +SERIAL_OBJ = $(BUILD)/serial/serial.o +TRANSPORT_OBJ = $(BUILD)/transport/transport.o +DISCOVERY_OBJ = $(BUILD)/discovery/discovery.o .PHONY: all clean modules all: modules \ $(CLI_BUILD)/media_ctrl_cli \ $(CLI_BUILD)/v4l2_ctrl_cli \ - $(CLI_BUILD)/transport_cli + $(CLI_BUILD)/transport_cli \ + $(CLI_BUILD)/discovery_cli modules: $(MAKE) -C $(ROOT)/src/modules/common @@ -21,6 +23,7 @@ modules: $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl $(MAKE) -C $(ROOT)/src/modules/serial $(MAKE) -C $(ROOT)/src/modules/transport + $(MAKE) -C $(ROOT)/src/modules/discovery $(CLI_BUILD)/media_ctrl_cli: media_ctrl_cli.c $(COMMON_OBJ) $(MEDIA_CTRL_OBJ) | $(CLI_BUILD) $(CC) $(CFLAGS) -o $@ $^ @@ -31,6 +34,9 @@ $(CLI_BUILD)/v4l2_ctrl_cli: v4l2_ctrl_cli.c $(COMMON_OBJ) $(V4L2_CTRL_OBJ) | $(C $(CLI_BUILD)/transport_cli: transport_cli.c $(COMMON_OBJ) $(SERIAL_OBJ) $(TRANSPORT_OBJ) | $(CLI_BUILD) $(CC) $(CFLAGS) -o $@ $^ -lpthread +$(CLI_BUILD)/discovery_cli: discovery_cli.c $(COMMON_OBJ) $(SERIAL_OBJ) $(DISCOVERY_OBJ) | $(CLI_BUILD) + $(CC) $(CFLAGS) -o $@ $^ -lpthread + $(CLI_BUILD): mkdir -p $@ @@ -38,4 +44,5 @@ clean: rm -f \ $(CLI_BUILD)/media_ctrl_cli \ $(CLI_BUILD)/v4l2_ctrl_cli \ - $(CLI_BUILD)/transport_cli + $(CLI_BUILD)/transport_cli \ + $(CLI_BUILD)/discovery_cli diff --git a/dev/cli/discovery_cli.c b/dev/cli/discovery_cli.c new file mode 100644 index 0000000..b4420aa --- /dev/null +++ b/dev/cli/discovery_cli.c @@ -0,0 +1,108 @@ +#include +#include +#include +#include + +#include "discovery.h" +#include "error.h" + +static void flags_str(uint16_t flags, char *buf, size_t len) { + buf[0] = '\0'; + if (flags & DISCOVERY_FLAG_SOURCE) { strncat(buf, "source ", len - strlen(buf) - 1); } + if (flags & DISCOVERY_FLAG_RELAY) { strncat(buf, "relay ", len - strlen(buf) - 1); } + if (flags & DISCOVERY_FLAG_SINK) { strncat(buf, "sink ", len - strlen(buf) - 1); } + if (flags & DISCOVERY_FLAG_CONTROLLER) { strncat(buf, "controller ", len - strlen(buf) - 1); } + /* trim trailing space */ + size_t l = strlen(buf); + if (l > 0 && buf[l - 1] == ' ') { buf[l - 1] = '\0'; } +} + +static void on_peer_found(const struct Discovery_Peer *peer, void *userdata) { + (void)userdata; + char addr[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &peer->addr, addr, sizeof(addr)); + char flags[64]; + flags_str(peer->function_flags, flags, sizeof(flags)); + printf("found %-30s %s:%u site=%u [%s]\n", + peer->name, addr, peer->tcp_port, peer->site_id, flags); +} + +static void on_peer_lost(const struct Discovery_Peer *peer, void *userdata) { + (void)userdata; + char addr[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &peer->addr, addr, sizeof(addr)); + printf("lost %-30s %s:%u\n", peer->name, addr, peer->tcp_port); +} + +static void usage(void) { + fprintf(stderr, + "usage: discovery_cli [flags]\n" + "\n" + " name node name in namespace:instance form, e.g. v4l2:microscope\n" + " tcp_port port this node listens on for transport connections\n" + " flags comma-separated roles: source,relay,sink,controller\n" + " default: source\n" + "\n" + "Announces this node on the multicast group and prints peers as they\n" + "appear and disappear. Press Ctrl-C to exit.\n"); +} + +static uint16_t parse_flags(const char *s) { + uint16_t f = 0; + if (strstr(s, "source")) { f |= DISCOVERY_FLAG_SOURCE; } + if (strstr(s, "relay")) { f |= DISCOVERY_FLAG_RELAY; } + if (strstr(s, "sink")) { f |= DISCOVERY_FLAG_SINK; } + if (strstr(s, "controller")) { f |= DISCOVERY_FLAG_CONTROLLER; } + return f; +} + +int main(int argc, char **argv) { + if (argc < 3) { + usage(); + return 1; + } + + const char *name = argv[1]; + uint16_t tcp_port = (uint16_t)atoi(argv[2]); + uint16_t flags = argc >= 4 ? parse_flags(argv[3]) : DISCOVERY_FLAG_SOURCE; + + if (strlen(name) == 0 || strlen(name) > DISCOVERY_MAX_NAME_LEN) { + fprintf(stderr, "name must be 1–%d characters\n", DISCOVERY_MAX_NAME_LEN); + return 1; + } + + struct Discovery_Config config = { + .site_id = 0, + .tcp_port = tcp_port, + .function_flags = flags, + .name = name, + .interval_ms = 0, /* use default */ + .timeout_intervals = 0, /* use default */ + .on_peer_found = on_peer_found, + .on_peer_lost = on_peer_lost, + .userdata = NULL, + }; + + struct Discovery *d; + struct App_Error err = discovery_create(&d, &config); + if (!APP_IS_OK(err)) { + fprintf(stderr, "discovery_create: errno %d\n", err.detail.syscall.err_no); + return 1; + } + + err = discovery_start(d); + if (!APP_IS_OK(err)) { + fprintf(stderr, "discovery_start: errno %d\n", err.detail.syscall.err_no); + return 1; + } + + char flags_buf[64]; + flags_str(flags, flags_buf, sizeof(flags_buf)); + printf("announcing %-30s port=%u [%s]\n", name, tcp_port, flags_buf); + printf("listening on %s:%d\n\n", DISCOVERY_MULTICAST_GROUP, DISCOVERY_PORT); + + pause(); + + discovery_destroy(d); + return 0; +} diff --git a/include/discovery.h b/include/discovery.h new file mode 100644 index 0000000..4e1e487 --- /dev/null +++ b/include/discovery.h @@ -0,0 +1,50 @@ +#pragma once + +#include +#include "error.h" + +#define DISCOVERY_MULTICAST_GROUP "224.0.0.251" +#define DISCOVERY_PORT 5353 +#define DISCOVERY_PROTOCOL_VERSION 1 +#define DISCOVERY_MAX_NAME_LEN 255 +#define DISCOVERY_MAX_PEERS 64 + +/* function_flags bits — matches protocol doc */ +#define DISCOVERY_FLAG_SOURCE (1u << 0) +#define DISCOVERY_FLAG_RELAY (1u << 1) +#define DISCOVERY_FLAG_SINK (1u << 2) +#define DISCOVERY_FLAG_CONTROLLER (1u << 3) + +struct Discovery_Peer { + uint32_t addr; /* source IPv4, network byte order */ + uint16_t site_id; + uint16_t tcp_port; + uint16_t function_flags; + char name[DISCOVERY_MAX_NAME_LEN + 1]; +}; + +typedef void (*Discovery_Peer_Found_Cb)(const struct Discovery_Peer *peer, void *userdata); +typedef void (*Discovery_Peer_Lost_Cb)(const struct Discovery_Peer *peer, void *userdata); + +struct Discovery_Config { + /* our own identity */ + uint16_t site_id; + uint16_t tcp_port; + uint16_t function_flags; + const char *name; + + /* timing */ + uint32_t interval_ms; /* announcement interval; 0 = use default (5000) */ + uint32_t timeout_intervals; /* missed intervals before peer_lost; 0 = use default (3) */ + + /* callbacks */ + Discovery_Peer_Found_Cb on_peer_found; /* required */ + Discovery_Peer_Lost_Cb on_peer_lost; /* optional */ + void *userdata; +}; + +struct Discovery; + +struct App_Error discovery_create(struct Discovery **out, struct Discovery_Config *config); +struct App_Error discovery_start(struct Discovery *d); +void discovery_destroy(struct Discovery *d); diff --git a/planning.md b/planning.md index b981d7b..3727b3a 100644 --- a/planning.md +++ b/planning.md @@ -46,16 +46,18 @@ Modules are listed in intended build order. Each depends only on modules above i | 2 | `media_ctrl` | done | Media Controller API — device and topology enumeration, pad format config | | 3 | `v4l2_ctrl` | done | V4L2 controls — enumerate, get, set camera parameters | | 4 | `serial` | done | `put`/`get` primitives for little-endian binary serialization into byte buffers | -| 5 | `transport` | not started | Encapsulated transport — frame header, TCP stream abstraction, single-write send | -| 6 | `protocol` | not started | Typed `write_*`/`read_*` functions for all message types; builds on serial + transport | -| 7 | `frame_alloc` | not started | Per-frame allocation with bookkeeping (byte budget, ref counting) | -| 8 | `relay` | not started | Input dispatch to output queues (low-latency and completeness modes) | -| 9 | `ingest` | not started | V4L2 capture loop — dequeue buffers, emit one encapsulated frame per buffer | -| 10 | `archive` | not started | Write frames to disk, control messages to binary log | -| 11 | `codec` | not started | Per-frame encode/decode — MJPEG (libjpeg-turbo), QOI, ZSTD-raw, VA-API H.264 intra; used by screen grab source and archive | -| 12 | `xorg` | not started | X11 screen geometry queries (XRandR), screen grab source (calls codec), frame viewer sink — see architecture.md | -| 13 | `web node` | not started | Node.js/Express peer — speaks binary protocol on socket side, HTTP/WebSocket to browser; `protocol.mjs` mirrors C protocol module | +| 5 | `transport` | done | Encapsulated transport — frame header, TCP stream abstraction, single-write send | +| 6 | `discovery` | done | UDP multicast announcements, peer table, found/lost callbacks | +| 8 | `protocol` | not started | Typed `write_*`/`read_*` functions for all message types; builds on serial + transport | +| 9 | `frame_alloc` | not started | Per-frame allocation with bookkeeping (byte budget, ref counting) | +| 10 | `relay` | not started | Input dispatch to output queues (low-latency and completeness modes) | +| 11 | `ingest` | not started | V4L2 capture loop — dequeue buffers, emit one encapsulated frame per buffer | +| 12 | `archive` | not started | Write frames to disk, control messages to binary log | +| 13 | `codec` | not started | Per-frame encode/decode — MJPEG (libjpeg-turbo), QOI, ZSTD-raw, VA-API H.264 intra; used by screen grab source and archive | +| 14 | `xorg` | not started | X11 screen geometry queries (XRandR), screen grab source (calls codec), frame viewer sink — see architecture.md | +| 15 | `web node` | not started | Node.js/Express peer — speaks binary protocol on socket side, HTTP/WebSocket to browser; `protocol.mjs` mirrors C protocol module | | — | `mjpeg_scan` | future | EOI marker scanner for misbehaving hardware that does not deliver clean per-buffer frames; not part of the primary pipeline | +| — | `config` | future | Unified config file reader; nodes currently configured via CLI args — needed before production deployment | --- diff --git a/src/modules/discovery/Makefile b/src/modules/discovery/Makefile new file mode 100644 index 0000000..3202cf1 --- /dev/null +++ b/src/modules/discovery/Makefile @@ -0,0 +1,17 @@ +ROOT := $(abspath ../../..) +include $(ROOT)/common.mk + +MODULE_BUILD = $(BUILD)/discovery + +.PHONY: all clean + +all: $(MODULE_BUILD)/discovery.o + +$(MODULE_BUILD)/discovery.o: discovery.c $(ROOT)/include/discovery.h | $(MODULE_BUILD) + $(CC) $(CFLAGS) -c -o $@ $< + +$(MODULE_BUILD): + mkdir -p $@ + +clean: + rm -f $(MODULE_BUILD)/discovery.o diff --git a/src/modules/discovery/discovery.c b/src/modules/discovery/discovery.c new file mode 100644 index 0000000..f0f3319 --- /dev/null +++ b/src/modules/discovery/discovery.c @@ -0,0 +1,326 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "discovery.h" +#include "serial.h" +#include "transport.h" /* TRANSPORT_FRAME_HEADER_SIZE */ + +#define DEFAULT_INTERVAL_MS 5000u +#define DEFAULT_TIMEOUT_INTERVALS 3u +#define RECV_BUF_SIZE 512u + +/* announcement payload layout (offsets relative to payload start) */ +#define ANN_PROTOCOL_VERSION 0 /* u8 */ +#define ANN_SITE_ID 1 /* u16 */ +#define ANN_TCP_PORT 3 /* u16 */ +#define ANN_FUNCTION_FLAGS 5 /* u16 */ +#define ANN_NAME_LEN 7 /* u8 */ +#define ANN_NAME 8 /* bytes */ +#define ANN_FIXED_SIZE 8u + +struct Peer_Entry { + struct Discovery_Peer info; + uint64_t last_seen_ms; + int active; +}; + +struct Discovery { + int sock; + struct sockaddr_in mcast_addr; + struct Discovery_Config config; + + pthread_t announce_thread; + pthread_t receive_thread; + atomic_int running; + + pthread_mutex_t peers_mutex; + struct Peer_Entry peers[DISCOVERY_MAX_PEERS]; +}; + +/* -- helpers --------------------------------------------------------------- */ + +static uint64_t now_ms(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (uint64_t)ts.tv_sec * 1000u + (uint64_t)ts.tv_nsec / 1000000u; +} + +static int find_peer(struct Discovery *d, uint32_t addr, const char *name) { + for (int i = 0; i < DISCOVERY_MAX_PEERS; i++) { + if (d->peers[i].active + && d->peers[i].info.addr == addr + && strcmp(d->peers[i].info.name, name) == 0) { + return i; + } + } + return -1; +} + +static int find_slot(struct Discovery *d) { + for (int i = 0; i < DISCOVERY_MAX_PEERS; i++) { + if (!d->peers[i].active) { return i; } + } + return -1; +} + +/* -- send ------------------------------------------------------------------ */ + +static void send_announcement(struct Discovery *d) { + size_t name_len = strlen(d->config.name); + if (name_len > DISCOVERY_MAX_NAME_LEN) { name_len = DISCOVERY_MAX_NAME_LEN; } + + uint32_t payload_len = (uint32_t)(ANN_FIXED_SIZE + name_len); + size_t total = TRANSPORT_FRAME_HEADER_SIZE + payload_len; + uint8_t buf[TRANSPORT_FRAME_HEADER_SIZE + ANN_FIXED_SIZE + DISCOVERY_MAX_NAME_LEN]; + + /* frame header */ + put_u16(buf, 0, 0x0010); /* message_type: DISCOVERY_ANNOUNCE */ + put_u32(buf, 2, payload_len); + + /* announcement payload */ + uint8_t *p = buf + TRANSPORT_FRAME_HEADER_SIZE; + put_u8 (p, ANN_PROTOCOL_VERSION, DISCOVERY_PROTOCOL_VERSION); + put_u16(p, ANN_SITE_ID, d->config.site_id); + put_u16(p, ANN_TCP_PORT, d->config.tcp_port); + put_u16(p, ANN_FUNCTION_FLAGS, d->config.function_flags); + put_u8 (p, ANN_NAME_LEN, (uint8_t)name_len); + memcpy(p + ANN_NAME, d->config.name, name_len); + + sendto(d->sock, buf, total, 0, + (struct sockaddr *)&d->mcast_addr, sizeof(d->mcast_addr)); +} + +/* -- timeout check --------------------------------------------------------- */ + +static void check_timeouts(struct Discovery *d) { + uint64_t ts = now_ms(); + uint64_t timeout = (uint64_t)d->config.interval_ms + * (uint64_t)d->config.timeout_intervals; + + for (int i = 0; i < DISCOVERY_MAX_PEERS; i++) { + struct Discovery_Peer peer_copy; + int expired = 0; + + pthread_mutex_lock(&d->peers_mutex); + if (d->peers[i].active && (ts - d->peers[i].last_seen_ms) > timeout) { + peer_copy = d->peers[i].info; + d->peers[i].active = 0; + expired = 1; + } + pthread_mutex_unlock(&d->peers_mutex); + + if (expired && d->config.on_peer_lost) { + d->config.on_peer_lost(&peer_copy, d->config.userdata); + } + } +} + +/* -- announce thread ------------------------------------------------------- */ + +static void *announce_thread_fn(void *arg) { + struct Discovery *d = arg; + + send_announcement(d); + + while (atomic_load(&d->running)) { + /* sleep in 100 ms increments so destroy doesn't wait a full interval */ + uint32_t elapsed = 0; + while (atomic_load(&d->running) && elapsed < d->config.interval_ms) { + struct timespec ts = { .tv_sec = 0, .tv_nsec = 100 * 1000000L }; + nanosleep(&ts, NULL); + elapsed += 100; + } + + if (!atomic_load(&d->running)) { break; } + + send_announcement(d); + check_timeouts(d); + } + + return NULL; +} + +/* -- receive thread -------------------------------------------------------- */ + +static void *receive_thread_fn(void *arg) { + struct Discovery *d = arg; + uint8_t buf[RECV_BUF_SIZE]; + struct sockaddr_in src; + socklen_t src_len; + + while (atomic_load(&d->running)) { + src_len = sizeof(src); + ssize_t n = recvfrom(d->sock, buf, sizeof(buf), 0, + (struct sockaddr *)&src, &src_len); + + if (n < 0) { + /* SO_RCVTIMEO fires as EAGAIN — just loop and re-check running */ + continue; + } + + /* validate frame header */ + if (n < (ssize_t)TRANSPORT_FRAME_HEADER_SIZE) { continue; } + uint16_t msg_type = get_u16(buf, 0); + uint32_t payload_len = get_u32(buf, 2); + if (msg_type != 0x0010) { continue; } + if ((ssize_t)(TRANSPORT_FRAME_HEADER_SIZE + payload_len) > n) { continue; } + if (payload_len < ANN_FIXED_SIZE) { continue; } + + /* parse announcement payload */ + uint8_t *p = buf + TRANSPORT_FRAME_HEADER_SIZE; + uint16_t site_id = get_u16(p, ANN_SITE_ID); + uint16_t tcp_port = get_u16(p, ANN_TCP_PORT); + uint16_t flags = get_u16(p, ANN_FUNCTION_FLAGS); + uint8_t name_len = get_u8 (p, ANN_NAME_LEN); + + if (payload_len < (uint32_t)(ANN_FIXED_SIZE + name_len)) { continue; } + + char name[DISCOVERY_MAX_NAME_LEN + 1]; + memcpy(name, p + ANN_NAME, name_len); + name[name_len] = '\0'; + + /* skip our own announcements */ + if (site_id == d->config.site_id + && strcmp(name, d->config.name) == 0) { + continue; + } + + uint32_t addr = src.sin_addr.s_addr; + uint64_t ts = now_ms(); + int is_new = 0; + struct Discovery_Peer peer_copy; + + pthread_mutex_lock(&d->peers_mutex); + + int idx = find_peer(d, addr, name); + if (idx >= 0) { + d->peers[idx].last_seen_ms = ts; + d->peers[idx].info.site_id = site_id; + d->peers[idx].info.tcp_port = tcp_port; + d->peers[idx].info.function_flags = flags; + } else { + idx = find_slot(d); + if (idx >= 0) { + d->peers[idx].active = 1; + d->peers[idx].last_seen_ms = ts; + d->peers[idx].info.addr = addr; + d->peers[idx].info.site_id = site_id; + d->peers[idx].info.tcp_port = tcp_port; + d->peers[idx].info.function_flags = flags; + strncpy(d->peers[idx].info.name, name, DISCOVERY_MAX_NAME_LEN); + d->peers[idx].info.name[DISCOVERY_MAX_NAME_LEN] = '\0'; + peer_copy = d->peers[idx].info; + is_new = 1; + } + } + + pthread_mutex_unlock(&d->peers_mutex); + + if (is_new && d->config.on_peer_found) { + d->config.on_peer_found(&peer_copy, d->config.userdata); + } + } + + return NULL; +} + +/* -- public API ------------------------------------------------------------ */ + +struct App_Error discovery_create(struct Discovery **out, + struct Discovery_Config *config) +{ + struct Discovery *d = calloc(1, sizeof(*d)); + if (!d) { return APP_SYSCALL_ERROR(); } + + d->config = *config; + d->sock = -1; + + if (d->config.interval_ms == 0) { + d->config.interval_ms = DEFAULT_INTERVAL_MS; + } + if (d->config.timeout_intervals == 0) { + d->config.timeout_intervals = DEFAULT_TIMEOUT_INTERVALS; + } + + d->mcast_addr.sin_family = AF_INET; + d->mcast_addr.sin_port = htons(DISCOVERY_PORT); + inet_pton(AF_INET, DISCOVERY_MULTICAST_GROUP, &d->mcast_addr.sin_addr); + + atomic_init(&d->running, 0); + pthread_mutex_init(&d->peers_mutex, NULL); + + *out = d; + return APP_OK; +} + +struct App_Error discovery_start(struct Discovery *d) { + int sock = socket(AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0); + if (sock < 0) { return APP_SYSCALL_ERROR(); } + + int opt = 1; + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)); + + struct sockaddr_in bind_addr = {0}; + bind_addr.sin_family = AF_INET; + bind_addr.sin_addr.s_addr = htonl(INADDR_ANY); + bind_addr.sin_port = htons(DISCOVERY_PORT); + + if (bind(sock, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) < 0) { + close(sock); + return APP_SYSCALL_ERROR(); + } + + struct ip_mreq mreq = {0}; + inet_pton(AF_INET, DISCOVERY_MULTICAST_GROUP, &mreq.imr_multiaddr); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mreq, sizeof(mreq)) < 0) { + close(sock); + return APP_SYSCALL_ERROR(); + } + + uint8_t ttl = 1; + setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)); + + /* 1 second receive timeout so the receive thread can check running */ + struct timeval tv = { .tv_sec = 1, .tv_usec = 0 }; + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + + d->sock = sock; + atomic_store(&d->running, 1); + + if (pthread_create(&d->receive_thread, NULL, receive_thread_fn, d) != 0) { + atomic_store(&d->running, 0); + close(sock); + d->sock = -1; + return APP_SYSCALL_ERROR(); + } + + if (pthread_create(&d->announce_thread, NULL, announce_thread_fn, d) != 0) { + atomic_store(&d->running, 0); + close(sock); + d->sock = -1; + pthread_join(d->receive_thread, NULL); + return APP_SYSCALL_ERROR(); + } + + return APP_OK; +} + +void discovery_destroy(struct Discovery *d) { + atomic_store(&d->running, 0); + close(d->sock); + pthread_join(d->announce_thread, NULL); + pthread_join(d->receive_thread, NULL); + pthread_mutex_destroy(&d->peers_mutex); + free(d); +}