#include #include #include #include #include #include #include #include #include #include #include "discovery.h" #include "serial.h" #include "transport.h" /* TRANSPORT_FRAME_HEADER_SIZE */ #define DEFAULT_INTERVAL_MS 5000u #define DEFAULT_TIMEOUT_INTERVALS 3u #define RECV_BUF_SIZE 512u /* announcement payload layout (offsets relative to payload start) */ #define ANN_PROTOCOL_VERSION 0 /* u8 */ #define ANN_SITE_ID 1 /* u16 */ #define ANN_TCP_PORT 3 /* u16 */ #define ANN_FUNCTION_FLAGS 5 /* u16 */ #define ANN_NAME_LEN 7 /* u8 */ #define ANN_NAME 8 /* bytes */ #define ANN_FIXED_SIZE 8u struct Peer_Entry { struct Discovery_Peer info; uint64_t last_seen_ms; int active; }; struct Discovery { int sock; struct sockaddr_in mcast_addr; struct Discovery_Config config; pthread_t announce_thread; pthread_t receive_thread; atomic_int running; pthread_mutex_t announce_mutex; pthread_cond_t announce_cond; /* signaled to wake announce thread early */ pthread_mutex_t peers_mutex; struct Peer_Entry peers[DISCOVERY_MAX_PEERS]; }; /* -- helpers --------------------------------------------------------------- */ static uint64_t now_ms(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (uint64_t)ts.tv_sec * 1000u + (uint64_t)ts.tv_nsec / 1000000u; } static int find_peer(struct Discovery *d, uint32_t addr, uint16_t tcp_port) { for (int i = 0; i < DISCOVERY_MAX_PEERS; i++) { if (d->peers[i].active && d->peers[i].info.addr == addr && d->peers[i].info.tcp_port == tcp_port) { return i; } } return -1; } static int find_slot(struct Discovery *d) { for (int i = 0; i < DISCOVERY_MAX_PEERS; i++) { if (!d->peers[i].active) { return i; } } return -1; } /* -- send ------------------------------------------------------------------ */ static void send_announcement(struct Discovery *d) { size_t name_len = strlen(d->config.name); if (name_len > DISCOVERY_MAX_NAME_LEN) { name_len = DISCOVERY_MAX_NAME_LEN; } uint32_t payload_len = (uint32_t)(ANN_FIXED_SIZE + name_len); size_t total = TRANSPORT_FRAME_HEADER_SIZE + payload_len; uint8_t buf[TRANSPORT_FRAME_HEADER_SIZE + ANN_FIXED_SIZE + DISCOVERY_MAX_NAME_LEN]; /* frame header */ put_u16(buf, 0, 0x0010); /* message_type: DISCOVERY_ANNOUNCE */ put_u32(buf, 2, payload_len); /* announcement payload */ uint8_t *p = buf + TRANSPORT_FRAME_HEADER_SIZE; put_u8 (p, ANN_PROTOCOL_VERSION, DISCOVERY_PROTOCOL_VERSION); put_u16(p, ANN_SITE_ID, d->config.site_id); put_u16(p, ANN_TCP_PORT, d->config.tcp_port); put_u16(p, ANN_FUNCTION_FLAGS, d->config.function_flags); put_u8 (p, ANN_NAME_LEN, (uint8_t)name_len); memcpy(p + ANN_NAME, d->config.name, name_len); sendto(d->sock, buf, total, 0, (struct sockaddr *)&d->mcast_addr, sizeof(d->mcast_addr)); } /* -- timeout check --------------------------------------------------------- */ static void check_timeouts(struct Discovery *d) { uint64_t ts = now_ms(); uint64_t timeout = (uint64_t)d->config.interval_ms * (uint64_t)d->config.timeout_intervals; for (int i = 0; i < DISCOVERY_MAX_PEERS; i++) { struct Discovery_Peer peer_copy; int expired = 0; pthread_mutex_lock(&d->peers_mutex); if (d->peers[i].active && (ts - d->peers[i].last_seen_ms) > timeout) { peer_copy = d->peers[i].info; d->peers[i].active = 0; expired = 1; } pthread_mutex_unlock(&d->peers_mutex); if (expired && d->config.on_peer_lost) { d->config.on_peer_lost(&peer_copy, d->config.userdata); } } } /* -- announce thread ------------------------------------------------------- */ static void *announce_thread_fn(void *arg) { struct Discovery *d = arg; send_announcement(d); pthread_mutex_lock(&d->announce_mutex); while (atomic_load(&d->running)) { struct timespec abs; clock_gettime(CLOCK_REALTIME, &abs); uint32_t ms = d->config.interval_ms; abs.tv_sec += ms / 1000u; abs.tv_nsec += (long)(ms % 1000u) * 1000000L; if (abs.tv_nsec >= 1000000000L) { abs.tv_sec++; abs.tv_nsec -= 1000000000L; } /* blocks until signaled (new peer / shutdown) or interval elapses */ pthread_cond_timedwait(&d->announce_cond, &d->announce_mutex, &abs); if (!atomic_load(&d->running)) { break; } send_announcement(d); check_timeouts(d); } pthread_mutex_unlock(&d->announce_mutex); return NULL; } /* -- receive thread -------------------------------------------------------- */ static void *receive_thread_fn(void *arg) { struct Discovery *d = arg; uint8_t buf[RECV_BUF_SIZE]; struct sockaddr_in src; socklen_t src_len; while (atomic_load(&d->running)) { src_len = sizeof(src); ssize_t n = recvfrom(d->sock, buf, sizeof(buf), 0, (struct sockaddr *)&src, &src_len); if (n < 0) { /* SO_RCVTIMEO fires as EAGAIN — just loop and re-check running */ continue; } /* validate frame header */ if (n < (ssize_t)TRANSPORT_FRAME_HEADER_SIZE) { continue; } uint16_t msg_type = get_u16(buf, 0); uint32_t payload_len = get_u32(buf, 2); if (msg_type != 0x0010) { continue; } if ((ssize_t)(TRANSPORT_FRAME_HEADER_SIZE + payload_len) > n) { continue; } if (payload_len < ANN_FIXED_SIZE) { continue; } /* parse announcement payload */ uint8_t *p = buf + TRANSPORT_FRAME_HEADER_SIZE; uint16_t site_id = get_u16(p, ANN_SITE_ID); uint16_t tcp_port = get_u16(p, ANN_TCP_PORT); uint16_t flags = get_u16(p, ANN_FUNCTION_FLAGS); uint8_t name_len = get_u8 (p, ANN_NAME_LEN); if (payload_len < (uint32_t)(ANN_FIXED_SIZE + name_len)) { continue; } char name[DISCOVERY_MAX_NAME_LEN + 1]; memcpy(name, p + ANN_NAME, name_len); name[name_len] = '\0'; /* skip our own announcements */ if (site_id == d->config.site_id && tcp_port == d->config.tcp_port) { continue; } uint32_t addr = src.sin_addr.s_addr; uint64_t ts = now_ms(); int is_new = 0; struct Discovery_Peer peer_copy; pthread_mutex_lock(&d->peers_mutex); int idx = find_peer(d, addr, tcp_port); if (idx >= 0) { d->peers[idx].last_seen_ms = ts; d->peers[idx].info.site_id = site_id; d->peers[idx].info.tcp_port = tcp_port; d->peers[idx].info.function_flags = flags; } else { idx = find_slot(d); if (idx >= 0) { d->peers[idx].active = 1; d->peers[idx].last_seen_ms = ts; d->peers[idx].info.addr = addr; d->peers[idx].info.site_id = site_id; d->peers[idx].info.tcp_port = tcp_port; d->peers[idx].info.function_flags = flags; strncpy(d->peers[idx].info.name, name, DISCOVERY_MAX_NAME_LEN); d->peers[idx].info.name[DISCOVERY_MAX_NAME_LEN] = '\0'; peer_copy = d->peers[idx].info; is_new = 1; } } pthread_mutex_unlock(&d->peers_mutex); if (is_new) { /* announce ourselves immediately so the new peer learns about us * without waiting up to interval_ms */ pthread_mutex_lock(&d->announce_mutex); pthread_cond_signal(&d->announce_cond); pthread_mutex_unlock(&d->announce_mutex); } if (is_new && d->config.on_peer_found) { d->config.on_peer_found(&peer_copy, d->config.userdata); } } return NULL; } /* -- public API ------------------------------------------------------------ */ struct App_Error discovery_create(struct Discovery **out, struct Discovery_Config *config) { struct Discovery *d = calloc(1, sizeof(*d)); if (!d) { return APP_SYSCALL_ERROR(); } d->config = *config; d->sock = -1; if (d->config.interval_ms == 0) { d->config.interval_ms = DEFAULT_INTERVAL_MS; } if (d->config.timeout_intervals == 0) { d->config.timeout_intervals = DEFAULT_TIMEOUT_INTERVALS; } d->mcast_addr.sin_family = AF_INET; d->mcast_addr.sin_port = htons(DISCOVERY_PORT); inet_pton(AF_INET, DISCOVERY_MULTICAST_GROUP, &d->mcast_addr.sin_addr); atomic_init(&d->running, 0); pthread_mutex_init(&d->announce_mutex, NULL); pthread_cond_init(&d->announce_cond, NULL); pthread_mutex_init(&d->peers_mutex, NULL); *out = d; return APP_OK; } struct App_Error discovery_start(struct Discovery *d) { int sock = socket(AF_INET, SOCK_DGRAM | SOCK_CLOEXEC, 0); if (sock < 0) { return APP_SYSCALL_ERROR(); } int opt = 1; setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)); struct sockaddr_in bind_addr = {0}; bind_addr.sin_family = AF_INET; bind_addr.sin_addr.s_addr = htonl(INADDR_ANY); bind_addr.sin_port = htons(DISCOVERY_PORT); if (bind(sock, (struct sockaddr *)&bind_addr, sizeof(bind_addr)) < 0) { close(sock); return APP_SYSCALL_ERROR(); } struct ip_mreq mreq = {0}; inet_pton(AF_INET, DISCOVERY_MULTICAST_GROUP, &mreq.imr_multiaddr); mreq.imr_interface.s_addr = htonl(INADDR_ANY); if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { close(sock); return APP_SYSCALL_ERROR(); } uint8_t ttl = 1; setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)); /* 1 second receive timeout so the receive thread can check running */ struct timeval tv = { .tv_sec = 1, .tv_usec = 0 }; setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); d->sock = sock; atomic_store(&d->running, 1); if (pthread_create(&d->receive_thread, NULL, receive_thread_fn, d) != 0) { atomic_store(&d->running, 0); close(sock); d->sock = -1; return APP_SYSCALL_ERROR(); } if (pthread_create(&d->announce_thread, NULL, announce_thread_fn, d) != 0) { atomic_store(&d->running, 0); close(sock); d->sock = -1; pthread_join(d->receive_thread, NULL); return APP_SYSCALL_ERROR(); } return APP_OK; } void discovery_destroy(struct Discovery *d) { atomic_store(&d->running, 0); /* wake announce thread so it exits without waiting for the full interval */ pthread_mutex_lock(&d->announce_mutex); pthread_cond_signal(&d->announce_cond); pthread_mutex_unlock(&d->announce_mutex); close(d->sock); pthread_join(d->announce_thread, NULL); pthread_join(d->receive_thread, NULL); pthread_cond_destroy(&d->announce_cond); pthread_mutex_destroy(&d->announce_mutex); pthread_mutex_destroy(&d->peers_mutex); free(d); }