#include #include #include #include #include #include #include #include #include #include "transport.h" #include "protocol.h" #include "discovery.h" #include "error.h" /* ------------------------------------------------------------------------- * Discovery peer table * ------------------------------------------------------------------------- */ #define MAX_PEERS 16 struct Peer_Entry { char host[64]; uint16_t port; char name[DISCOVERY_MAX_NAME_LEN + 1]; }; static struct Peer_Entry peer_table[MAX_PEERS]; static int peer_count = 0; static pthread_mutex_t peer_mutex = PTHREAD_MUTEX_INITIALIZER; static void on_peer_found(const struct Discovery_Peer *peer, void *ud) { (void)ud; pthread_mutex_lock(&peer_mutex); if (peer_count < MAX_PEERS) { struct in_addr a; a.s_addr = peer->addr; inet_ntop(AF_INET, &a, peer_table[peer_count].host, sizeof(peer_table[0].host)); peer_table[peer_count].port = peer->tcp_port; strncpy(peer_table[peer_count].name, peer->name, DISCOVERY_MAX_NAME_LEN); peer_table[peer_count].name[DISCOVERY_MAX_NAME_LEN] = '\0'; peer_count++; /* Print inline — readline will redraw the prompt */ printf("\n[discovered %d] %s %s:%u\n", peer_count - 1, peer_table[peer_count - 1].name, peer_table[peer_count - 1].host, peer->tcp_port); rl_on_new_line(); rl_redisplay(); } pthread_mutex_unlock(&peer_mutex); } static void on_peer_lost(const struct Discovery_Peer *peer, void *ud) { (void)ud; struct in_addr a; a.s_addr = peer->addr; char host[64]; inet_ntop(AF_INET, &a, host, sizeof(host)); pthread_mutex_lock(&peer_mutex); for (int i = 0; i < peer_count; i++) { if (strcmp(peer_table[i].host, host) == 0 && peer_table[i].port == peer->tcp_port) { printf("\n[lost] %s %s:%u\n", peer_table[i].name, host, peer->tcp_port); rl_on_new_line(); rl_redisplay(); memmove(&peer_table[i], &peer_table[i + 1], (size_t)(peer_count - i - 1) * sizeof(peer_table[0])); peer_count--; break; } } pthread_mutex_unlock(&peer_mutex); } /* ------------------------------------------------------------------------- * Shared state between REPL and transport read thread * ------------------------------------------------------------------------- */ struct Ctrl_State { sem_t sem; uint16_t pending_cmd; uint16_t last_status; int32_t last_value; /* GET_CONTROL response */ }; /* ------------------------------------------------------------------------- * Response display helpers — reused across commands * ------------------------------------------------------------------------- */ static void caps_str(uint32_t caps, char *buf, size_t len) { static const struct { uint32_t bit; const char *name; } flags[] = { { 0x00000001u, "video-capture" }, { 0x00000002u, "video-output" }, { 0x00800000u, "meta-capture" }, { 0x04000000u, "streaming" }, }; buf[0] = '\0'; size_t pos = 0; for (size_t i = 0; i < sizeof(flags)/sizeof(flags[0]); i++) { if (!(caps & flags[i].bit)) { continue; } int n = snprintf(buf + pos, len - pos, "%s%s", pos ? "," : "", flags[i].name); if (n < 0 || (size_t)n >= len - pos) { break; } pos += (size_t)n; } } static void on_media_device( const char *path, uint8_t path_len, const char *driver, uint8_t driver_len, const char *model, uint8_t model_len, const char *bus_info, uint8_t bus_info_len, uint8_t vcount, void *ud) { (void)ud; printf(" media %.*s driver=%.*s model=%.*s bus=%.*s (%u video node(s))\n", (int)path_len, path, (int)driver_len, driver, (int)model_len, model, (int)bus_info_len, bus_info, (unsigned)vcount); } static void on_video_node( const char *path, uint8_t path_len, const char *ename, uint8_t ename_len, uint32_t etype, uint32_t eflags, uint32_t dcaps, uint8_t pflags, uint8_t is_capture, void *ud) { (void)eflags; (void)pflags; int *idx = ud; char caps[128]; caps_str(dcaps, caps, sizeof(caps)); printf(" [%d] video %.*s entity=%.*s type=0x%08x caps=[%s]%s\n", *idx, (int)path_len, path, (int)ename_len, ename, etype, caps, is_capture ? " [capture]" : ""); (*idx)++; } static void on_standalone( const char *path, uint8_t path_len, const char *name, uint8_t name_len, void *ud) { int *idx = ud; printf(" [%d] standalone %.*s card=%.*s\n", *idx, (int)path_len, path, (int)name_len, name); (*idx)++; } static const char *scale_mode_name(uint8_t s) { switch (s) { case 0: return "stretch"; case 1: return "fit"; case 2: return "fill"; case 3: return "1:1"; default: return "?"; } } static void on_display( uint16_t device_id, uint16_t stream_id, int16_t win_x, int16_t win_y, uint16_t win_w, uint16_t win_h, uint8_t scale_mode, uint8_t anchor, void *ud) { (void)ud; printf(" [%u] display stream=%u pos=%d,%d size=%ux%u scale_mode=%s anchor=%s\n", device_id, stream_id, win_x, win_y, win_w, win_h, scale_mode_name(scale_mode), anchor == 0 ? "center" : "topleft"); } static void on_control( uint32_t id, uint8_t type, uint32_t flags, const char *name, uint8_t name_len, int32_t min, int32_t max, int32_t step, int32_t default_val, int32_t current_val, uint8_t menu_count, void *ud) { (void)flags; (void)ud; printf(" ctrl id=0x%08x type=%u %.*s" " min=%d max=%d step=%d default=%d current=%d", id, type, (int)name_len, name, min, max, step, default_val, current_val); if (menu_count) { printf(" (%u menu items)", (unsigned)menu_count); } printf("\n"); } static void on_menu_item( uint32_t index, const char *name, uint8_t name_len, int64_t int_value, void *ud) { (void)ud; printf(" menu %u %.*s val=%lld\n", index, (int)name_len, name, (long long)int_value); } /* ------------------------------------------------------------------------- * Transport callbacks * ------------------------------------------------------------------------- */ static void on_frame(struct Transport_Conn *conn, struct Transport_Frame *frame, void *userdata) { (void)conn; struct Ctrl_State *cs = userdata; if (frame->message_type != PROTO_MSG_CONTROL_RESPONSE) { free(frame->payload); return; } switch (cs->pending_cmd) { case PROTO_CMD_ENUM_DEVICES: { struct Proto_Response_Header hdr; int dev_idx = 0; struct App_Error e = proto_read_enum_devices_response( frame->payload, frame->payload_length, &hdr, on_media_device, on_video_node, on_standalone, on_display, &dev_idx); if (!APP_IS_OK(e)) { app_error_print(&e); } else if (hdr.status != PROTO_STATUS_OK) { fprintf(stderr, "ENUM_DEVICES: status=%u\n", hdr.status); } cs->last_status = hdr.status; break; } case PROTO_CMD_ENUM_CONTROLS: { struct Proto_Response_Header hdr; struct App_Error e = proto_read_enum_controls_response( frame->payload, frame->payload_length, &hdr, on_control, on_menu_item, NULL); if (!APP_IS_OK(e)) { app_error_print(&e); } else if (hdr.status != PROTO_STATUS_OK) { fprintf(stderr, "ENUM_CONTROLS: status=%u\n", hdr.status); } cs->last_status = hdr.status; break; } case PROTO_CMD_GET_CONTROL: { struct Proto_Get_Control_Resp resp; struct App_Error e = proto_read_get_control_response( frame->payload, frame->payload_length, &resp); if (!APP_IS_OK(e)) { app_error_print(&e); } else if (resp.status == PROTO_STATUS_OK) { printf(" value = %d\n", resp.value); } else { fprintf(stderr, "GET_CONTROL: status=%u\n", resp.status); } cs->last_status = resp.status; break; } default: { /* Generic response: just read request_id + status */ struct Proto_Response_Header hdr; struct App_Error e = proto_read_response_header( frame->payload, frame->payload_length, &hdr); if (!APP_IS_OK(e)) { app_error_print(&e); } else if (hdr.status != PROTO_STATUS_OK) { fprintf(stderr, "command 0x%04x: status=%u\n", cs->pending_cmd, hdr.status); } else { printf(" ok\n"); } cs->last_status = APP_IS_OK(e) ? hdr.status : PROTO_STATUS_ERROR; break; } } free(frame->payload); sem_post(&cs->sem); } static void on_disconnect(struct Transport_Conn *conn, void *userdata) { (void)conn; (void)userdata; printf("\ndisconnected from node\n"); rl_on_new_line(); rl_redisplay(); } /* ------------------------------------------------------------------------- * Request helpers * ------------------------------------------------------------------------- */ static uint16_t next_req_id(uint16_t *counter) { return ++(*counter); } /* Send a request, set pending_cmd, wait for response */ #define SEND_AND_WAIT(cs, cmd, send_expr) do { \ (cs)->pending_cmd = (cmd); \ struct App_Error _e = (send_expr); \ if (!APP_IS_OK(_e)) { app_error_print(&_e); break; } \ sem_wait(&(cs)->sem); \ } while (0) /* ------------------------------------------------------------------------- * REPL command implementations * ------------------------------------------------------------------------- */ static void cmd_enum_devices(struct Transport_Conn *conn, struct Ctrl_State *cs, uint16_t *req) { printf("devices:\n"); SEND_AND_WAIT(cs, PROTO_CMD_ENUM_DEVICES, proto_write_enum_devices(conn, next_req_id(req))); } static void cmd_enum_controls(struct Transport_Conn *conn, struct Ctrl_State *cs, uint16_t *req, const char *idx_str) { int idx = atoi(idx_str); printf("controls for device %d:\n", idx); SEND_AND_WAIT(cs, PROTO_CMD_ENUM_CONTROLS, proto_write_enum_controls(conn, next_req_id(req), (uint16_t)idx)); } static void cmd_get_control(struct Transport_Conn *conn, struct Ctrl_State *cs, uint16_t *req, const char *idx_str, const char *id_str) { int idx = atoi(idx_str); uint32_t id = (uint32_t)strtoul(id_str, NULL, 0); printf("get control 0x%08x on device %d:\n", id, idx); SEND_AND_WAIT(cs, PROTO_CMD_GET_CONTROL, proto_write_get_control(conn, next_req_id(req), (uint16_t)idx, id)); } static void cmd_set_control(struct Transport_Conn *conn, struct Ctrl_State *cs, uint16_t *req, const char *idx_str, const char *id_str, const char *val_str) { int idx = atoi(idx_str); uint32_t id = (uint32_t)strtoul(id_str, NULL, 0); int32_t val = (int32_t)atoi(val_str); SEND_AND_WAIT(cs, PROTO_CMD_SET_CONTROL, proto_write_set_control(conn, next_req_id(req), (uint16_t)idx, id, val)); } static void cmd_start_ingest(struct Transport_Conn *conn, struct Ctrl_State *cs, uint16_t *req, int ntok, char *tokens[]) { /* Required: stream_id device dest_host dest_port * Optional: format width height fps_n fps_d */ if (ntok < 5) { printf("usage: start-ingest " " [format] [width] [height] [fps_n] [fps_d]\n" " format: 0=auto 1=mjpeg (default 0)\n"); return; } uint16_t stream_id = (uint16_t)atoi(tokens[1]); const char *device = tokens[2]; const char *host = tokens[3]; uint16_t port = (uint16_t)atoi(tokens[4]); uint16_t format = ntok > 5 ? (uint16_t)atoi(tokens[5]) : 0; uint16_t width = ntok > 6 ? (uint16_t)atoi(tokens[6]) : 0; uint16_t height = ntok > 7 ? (uint16_t)atoi(tokens[7]) : 0; uint16_t fps_n = ntok > 8 ? (uint16_t)atoi(tokens[8]) : 0; uint16_t fps_d = ntok > 9 ? (uint16_t)atoi(tokens[9]) : 1; printf("start-ingest: stream=%u device=%s dest=%s:%u" " format=%u %ux%u fps=%u/%u\n", stream_id, device, host, port, format, width, height, fps_n, fps_d); SEND_AND_WAIT(cs, PROTO_CMD_START_INGEST, proto_write_start_ingest(conn, next_req_id(req), stream_id, format, width, height, fps_n, fps_d, PROTO_TRANSPORT_ENCAPSULATED, device, host, port)); } static void cmd_stop_ingest(struct Transport_Conn *conn, struct Ctrl_State *cs, uint16_t *req, const char *sid_str) { uint16_t stream_id = (uint16_t)atoi(sid_str); printf("stop-ingest: stream=%u\n", stream_id); SEND_AND_WAIT(cs, PROTO_CMD_STOP_INGEST, proto_write_stop_ingest(conn, next_req_id(req), stream_id)); } static void cmd_start_display(struct Transport_Conn *conn, struct Ctrl_State *cs, uint16_t *req, int ntok, char *tokens[]) { /* Required: stream_id * Optional: win_x win_y win_w win_h no_signal_fps */ if (ntok < 2) { printf("usage: start-display [win_x] [win_y] [win_w] [win_h] [no_signal_fps]\n"); return; } uint16_t stream_id = (uint16_t)atoi(tokens[1]); int16_t win_x = ntok > 2 ? (int16_t)atoi(tokens[2]) : 0; int16_t win_y = ntok > 3 ? (int16_t)atoi(tokens[3]) : 0; uint16_t win_w = ntok > 4 ? (uint16_t)atoi(tokens[4]) : 0; uint16_t win_h = ntok > 5 ? (uint16_t)atoi(tokens[5]) : 0; uint8_t no_signal_fps = ntok > 6 ? (uint8_t)atoi(tokens[6]) : 0; printf("start-display: stream=%u pos=%d,%d size=%ux%u no_signal_fps=%u\n", stream_id, win_x, win_y, win_w, win_h, no_signal_fps > 0 ? no_signal_fps : 15); SEND_AND_WAIT(cs, PROTO_CMD_START_DISPLAY, proto_write_start_display(conn, next_req_id(req), stream_id, win_x, win_y, win_w, win_h, PROTO_DISPLAY_SCALE_FIT, PROTO_DISPLAY_ANCHOR_CENTER, no_signal_fps)); } static void cmd_stop_display(struct Transport_Conn *conn, struct Ctrl_State *cs, uint16_t *req, const char *sid_str) { uint16_t stream_id = (uint16_t)atoi(sid_str); printf("stop-display: stream=%u\n", stream_id); SEND_AND_WAIT(cs, PROTO_CMD_STOP_DISPLAY, proto_write_stop_display(conn, next_req_id(req), stream_id)); } static void cmd_help(void) { printf("commands:\n" " peers list discovered nodes\n" " connect [idx|host:port] connect to peer (no arg = first discovered)\n" " enum-devices\n" " enum-controls \n" " get-control \n" " set-control \n" " start-ingest " " [format] [width] [height] [fps_n] [fps_d]\n" " stop-ingest \n" " start-display [win_x] [win_y] [win_w] [win_h] [no_signal_fps]\n" " stop-display \n" " help\n" " quit / exit\n"); } /* ------------------------------------------------------------------------- * Entry point * ------------------------------------------------------------------------- */ static void usage(void) { fprintf(stderr, "usage: controller_cli [--host HOST] [--port PORT]\n" "\n" " Interactive controller for a video node.\n" " --host HOST connect directly on startup\n" " --port PORT TCP port (default 8000; used with --host)\n" "\n" " Without --host: starts discovery and waits for nodes.\n" " Use 'connect' in the REPL to connect to a discovered node.\n"); } /* Attempt to connect/reconnect; prints result. Returns new conn or NULL. */ static struct Transport_Conn *do_connect(struct Ctrl_State *cs, const char *host, uint16_t port, struct Transport_Conn *old_conn) { if (old_conn) { transport_conn_close(old_conn); } /* Reset state — drain stale semaphore posts from the old connection */ cs->pending_cmd = 0; while (sem_trywait(&cs->sem) == 0) { /* drain */ } struct Transport_Conn *conn; struct App_Error e = transport_connect(&conn, host, port, TRANSPORT_DEFAULT_MAX_PAYLOAD, on_frame, on_disconnect, cs); if (!APP_IS_OK(e)) { app_error_print(&e); return NULL; } printf("connected to %s:%u\n", host, port); return conn; } int main(int argc, char **argv) { const char *init_host = NULL; uint16_t init_port = 8000; for (int i = 1; i < argc; i++) { if (strcmp(argv[i], "--host") == 0 && i + 1 < argc) { init_host = argv[++i]; } else if (strcmp(argv[i], "--port") == 0 && i + 1 < argc) { init_port = (uint16_t)atoi(argv[++i]); } else { usage(); return 1; } } /* Start discovery (always — useful even when --host given, for 'peers') */ struct Discovery *disc = NULL; struct Discovery_Config dcfg = {0}; dcfg.site_id = 0; dcfg.tcp_port = 0; dcfg.function_flags = DISCOVERY_FLAG_CONTROLLER; dcfg.name = "controller_cli"; dcfg.on_peer_found = on_peer_found; dcfg.on_peer_lost = on_peer_lost; if (!APP_IS_OK(discovery_create(&disc, &dcfg)) || !APP_IS_OK(discovery_start(disc))) { fprintf(stderr, "warning: discovery failed to start\n"); disc = NULL; } struct Ctrl_State cs; memset(&cs, 0, sizeof(cs)); sem_init(&cs.sem, 0, 0); struct Transport_Conn *conn = NULL; if (init_host) { conn = do_connect(&cs, init_host, init_port, NULL); if (!conn) { return 1; } } else { printf("listening for nodes — type 'peers' to list, 'connect' to connect\n"); } cmd_help(); printf("\n"); /* REPL */ uint16_t req_id = 0; char line[512]; while (1) { char *rl_line = readline(conn ? "> " : "(no node) > "); if (!rl_line) { break; } if (*rl_line) { add_history(rl_line); } strncpy(line, rl_line, sizeof(line) - 1); line[sizeof(line) - 1] = '\0'; free(rl_line); /* Tokenise (up to 12 tokens) */ char *tokens[12]; int ntok = 0; char *p = line; while (*p && ntok < 12) { while (*p == ' ' || *p == '\t') { p++; } if (!*p) { break; } tokens[ntok++] = p; while (*p && *p != ' ' && *p != '\t') { p++; } if (*p) { *p++ = '\0'; } } if (ntok == 0) { continue; } const char *cmd = tokens[0]; if (strcmp(cmd, "quit") == 0 || strcmp(cmd, "exit") == 0) { break; } else if (strcmp(cmd, "help") == 0) { cmd_help(); } else if (strcmp(cmd, "peers") == 0) { pthread_mutex_lock(&peer_mutex); if (peer_count == 0) { printf("no peers discovered yet\n"); } else { for (int i = 0; i < peer_count; i++) { printf(" [%d] %s %s:%u\n", i, peer_table[i].name, peer_table[i].host, peer_table[i].port); } } pthread_mutex_unlock(&peer_mutex); } else if (strcmp(cmd, "connect") == 0) { char host[64]; uint16_t port = 8000; if (ntok < 2) { /* No argument — connect to first discovered peer */ pthread_mutex_lock(&peer_mutex); int ok = peer_count > 0; if (ok) { strncpy(host, peer_table[0].host, sizeof(host) - 1); host[sizeof(host) - 1] = '\0'; port = peer_table[0].port; } pthread_mutex_unlock(&peer_mutex); if (!ok) { printf("no peers discovered yet — try 'peers'\n"); continue; } } else if (strchr(tokens[1], ':')) { /* host:port */ char *colon = strchr(tokens[1], ':'); size_t hlen = (size_t)(colon - tokens[1]); if (hlen >= sizeof(host)) { hlen = sizeof(host) - 1; } memcpy(host, tokens[1], hlen); host[hlen] = '\0'; port = (uint16_t)atoi(colon + 1); } else { /* numeric index into peer table */ int idx = atoi(tokens[1]); pthread_mutex_lock(&peer_mutex); int ok = idx >= 0 && idx < peer_count; if (ok) { strncpy(host, peer_table[idx].host, sizeof(host) - 1); host[sizeof(host) - 1] = '\0'; port = peer_table[idx].port; } pthread_mutex_unlock(&peer_mutex); if (!ok) { printf("index %d out of range — try 'peers'\n", idx); continue; } } conn = do_connect(&cs, host, port, conn); } else if (!conn) { printf("not connected — use 'connect' to connect to a node\n"); } else if (strcmp(cmd, "enum-devices") == 0) { cmd_enum_devices(conn, &cs, &req_id); } else if (strcmp(cmd, "enum-controls") == 0) { if (ntok < 2) { printf("usage: enum-controls \n"); } else { cmd_enum_controls(conn, &cs, &req_id, tokens[1]); } } else if (strcmp(cmd, "get-control") == 0) { if (ntok < 3) { printf("usage: get-control \n"); } else { cmd_get_control(conn, &cs, &req_id, tokens[1], tokens[2]); } } else if (strcmp(cmd, "set-control") == 0) { if (ntok < 4) { printf("usage: set-control \n"); } else { cmd_set_control(conn, &cs, &req_id, tokens[1], tokens[2], tokens[3]); } } else if (strcmp(cmd, "start-ingest") == 0) { cmd_start_ingest(conn, &cs, &req_id, ntok, tokens); } else if (strcmp(cmd, "stop-ingest") == 0) { if (ntok < 2) { printf("usage: stop-ingest \n"); } else { cmd_stop_ingest(conn, &cs, &req_id, tokens[1]); } } else if (strcmp(cmd, "start-display") == 0) { cmd_start_display(conn, &cs, &req_id, ntok, tokens); } else if (strcmp(cmd, "stop-display") == 0) { if (ntok < 2) { printf("usage: stop-display \n"); } else { cmd_stop_display(conn, &cs, &req_id, tokens[1]); } } else { printf("unknown command: %s (type 'help' for commands)\n", cmd); } } if (conn) { transport_conn_close(conn); } if (disc) { discovery_destroy(disc); } sem_destroy(&cs.sem); return 0; }