controller_cli: readline, discovery integration, peers/connect commands

- readline replaces fgets — line editing and command history
- Discovery runs at startup (always); discovered peers print inline as they appear
- --host is now optional; without it, starts in discovery-only mode
- New REPL commands:
    peers              list discovered nodes with index
    connect            connect to first discovered peer
    connect <idx>      connect to peer by index
    connect <host:port> connect directly
- connect switching closes the old connection before opening the new one
- Commands that require a connection print "not connected" when conn is NULL
- Makefile: add $(DISCOVERY_OBJ) and -lreadline to controller_cli link

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-29 19:26:38 +00:00
parent 54d48c9c8e
commit ba2c3cb6cd
2 changed files with 199 additions and 29 deletions

View File

@@ -3,11 +3,82 @@
#include <string.h>
#include <ctype.h>
#include <semaphore.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <readline/readline.h>
#include <readline/history.h>
#include "transport.h"
#include "protocol.h"
#include "discovery.h"
#include "error.h"
/* -------------------------------------------------------------------------
* Discovery peer table
* ------------------------------------------------------------------------- */
#define MAX_PEERS 16
struct Peer_Entry {
char host[64];
uint16_t port;
char name[DISCOVERY_MAX_NAME_LEN + 1];
};
static struct Peer_Entry peer_table[MAX_PEERS];
static int peer_count = 0;
static pthread_mutex_t peer_mutex = PTHREAD_MUTEX_INITIALIZER;
static void on_peer_found(const struct Discovery_Peer *peer, void *ud)
{
(void)ud;
pthread_mutex_lock(&peer_mutex);
if (peer_count < MAX_PEERS) {
struct in_addr a;
a.s_addr = peer->addr;
inet_ntop(AF_INET, &a, peer_table[peer_count].host,
sizeof(peer_table[0].host));
peer_table[peer_count].port = peer->tcp_port;
strncpy(peer_table[peer_count].name, peer->name, DISCOVERY_MAX_NAME_LEN);
peer_table[peer_count].name[DISCOVERY_MAX_NAME_LEN] = '\0';
peer_count++;
/* Print inline — readline will redraw the prompt */
printf("\n[discovered %d] %s %s:%u\n",
peer_count - 1,
peer_table[peer_count - 1].name,
peer_table[peer_count - 1].host,
peer->tcp_port);
rl_on_new_line();
rl_redisplay();
}
pthread_mutex_unlock(&peer_mutex);
}
static void on_peer_lost(const struct Discovery_Peer *peer, void *ud)
{
(void)ud;
struct in_addr a;
a.s_addr = peer->addr;
char host[64];
inet_ntop(AF_INET, &a, host, sizeof(host));
pthread_mutex_lock(&peer_mutex);
for (int i = 0; i < peer_count; i++) {
if (strcmp(peer_table[i].host, host) == 0 &&
peer_table[i].port == peer->tcp_port) {
printf("\n[lost] %s %s:%u\n",
peer_table[i].name, host, peer->tcp_port);
rl_on_new_line();
rl_redisplay();
memmove(&peer_table[i], &peer_table[i + 1],
(size_t)(peer_count - i - 1) * sizeof(peer_table[0]));
peer_count--;
break;
}
}
pthread_mutex_unlock(&peer_mutex);
}
/* -------------------------------------------------------------------------
* Shared state between REPL and transport read thread
* ------------------------------------------------------------------------- */
@@ -194,7 +265,9 @@ static void on_frame(struct Transport_Conn *conn,
static void on_disconnect(struct Transport_Conn *conn, void *userdata)
{
(void)conn; (void)userdata;
printf("disconnected from node\n");
printf("\ndisconnected from node\n");
rl_on_new_line();
rl_redisplay();
}
/* -------------------------------------------------------------------------
@@ -341,6 +414,8 @@ static void cmd_stop_display(struct Transport_Conn *conn,
static void cmd_help(void)
{
printf("commands:\n"
" peers list discovered nodes\n"
" connect [idx|host:port] connect to peer (no arg = first discovered)\n"
" enum-devices\n"
" enum-controls <device_index>\n"
" get-control <device_index> <control_id_hex>\n"
@@ -361,40 +436,76 @@ static void cmd_help(void)
static void usage(void)
{
fprintf(stderr,
"usage: controller_cli --host HOST [--port PORT]\n"
"usage: controller_cli [--host HOST] [--port PORT]\n"
"\n"
" Interactive controller for a video node.\n"
" --host HOST node hostname or IP (required)\n"
" --port PORT node TCP port (default 8000)\n");
" --host HOST connect directly on startup\n"
" --port PORT TCP port (default 8000; used with --host)\n"
"\n"
" Without --host: starts discovery and waits for nodes.\n"
" Use 'connect' in the REPL to connect to a discovered node.\n");
}
/* Attempt to connect/reconnect; prints result. Returns new conn or NULL. */
static struct Transport_Conn *do_connect(struct Ctrl_State *cs,
const char *host, uint16_t port,
struct Transport_Conn *old_conn)
{
if (old_conn) { transport_conn_close(old_conn); }
struct Transport_Conn *conn;
struct App_Error e = transport_connect(&conn, host, port,
TRANSPORT_DEFAULT_MAX_PAYLOAD, on_frame, on_disconnect, cs);
if (!APP_IS_OK(e)) {
app_error_print(&e);
return NULL;
}
printf("connected to %s:%u\n", host, port);
return conn;
}
int main(int argc, char **argv)
{
const char *host = NULL;
uint16_t port = 8000;
const char *init_host = NULL;
uint16_t init_port = 8000;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--host") == 0 && i + 1 < argc) {
host = argv[++i];
init_host = argv[++i];
} else if (strcmp(argv[i], "--port") == 0 && i + 1 < argc) {
port = (uint16_t)atoi(argv[++i]);
init_port = (uint16_t)atoi(argv[++i]);
} else {
usage(); return 1;
}
}
if (!host) { usage(); return 1; }
/* Connect */
/* Start discovery (always — useful even when --host given, for 'peers') */
struct Discovery *disc = NULL;
struct Discovery_Config dcfg = {0};
dcfg.site_id = 0;
dcfg.tcp_port = 0;
dcfg.function_flags = DISCOVERY_FLAG_CONTROLLER;
dcfg.name = "controller_cli";
dcfg.on_peer_found = on_peer_found;
dcfg.on_peer_lost = on_peer_lost;
if (!APP_IS_OK(discovery_create(&disc, &dcfg)) ||
!APP_IS_OK(discovery_start(disc))) {
fprintf(stderr, "warning: discovery failed to start\n");
disc = NULL;
}
struct Ctrl_State cs;
memset(&cs, 0, sizeof(cs));
sem_init(&cs.sem, 0, 0);
struct Transport_Conn *conn;
struct App_Error e = transport_connect(&conn, host, port,
TRANSPORT_DEFAULT_MAX_PAYLOAD, on_frame, on_disconnect, &cs);
if (!APP_IS_OK(e)) { app_error_print(&e); return 1; }
struct Transport_Conn *conn = NULL;
if (init_host) {
conn = do_connect(&cs, init_host, init_port, NULL);
if (!conn) { return 1; }
} else {
printf("listening for nodes — type 'peers' to list, 'connect' to connect\n");
}
printf("connected to %s:%u\n\n", host, port);
cmd_help();
printf("\n");
@@ -403,22 +514,17 @@ int main(int argc, char **argv)
char line[512];
while (1) {
printf("> ");
fflush(stdout);
if (fgets(line, sizeof(line), stdin) == NULL) { break; }
/* Strip trailing newline */
size_t len = strlen(line);
while (len > 0 && (line[len-1] == '\n' || line[len-1] == '\r')) {
line[--len] = '\0';
}
char *rl_line = readline(conn ? "> " : "(no node) > ");
if (!rl_line) { break; }
if (*rl_line) { add_history(rl_line); }
strncpy(line, rl_line, sizeof(line) - 1);
line[sizeof(line) - 1] = '\0';
free(rl_line);
/* Tokenise (up to 12 tokens) */
char *tokens[12];
int ntok = 0;
char *p = line;
while (*p && ntok < 12) {
while (*p == ' ' || *p == '\t') { p++; }
if (!*p) { break; }
@@ -432,8 +538,71 @@ int main(int argc, char **argv)
if (strcmp(cmd, "quit") == 0 || strcmp(cmd, "exit") == 0) {
break;
} else if (strcmp(cmd, "help") == 0) {
cmd_help();
} else if (strcmp(cmd, "peers") == 0) {
pthread_mutex_lock(&peer_mutex);
if (peer_count == 0) {
printf("no peers discovered yet\n");
} else {
for (int i = 0; i < peer_count; i++) {
printf(" [%d] %s %s:%u\n", i,
peer_table[i].name,
peer_table[i].host,
peer_table[i].port);
}
}
pthread_mutex_unlock(&peer_mutex);
} else if (strcmp(cmd, "connect") == 0) {
char host[64];
uint16_t port = 8000;
if (ntok < 2) {
/* No argument — connect to first discovered peer */
pthread_mutex_lock(&peer_mutex);
int ok = peer_count > 0;
if (ok) {
strncpy(host, peer_table[0].host, sizeof(host) - 1);
host[sizeof(host) - 1] = '\0';
port = peer_table[0].port;
}
pthread_mutex_unlock(&peer_mutex);
if (!ok) {
printf("no peers discovered yet — try 'peers'\n");
continue;
}
} else if (strchr(tokens[1], ':')) {
/* host:port */
char *colon = strchr(tokens[1], ':');
size_t hlen = (size_t)(colon - tokens[1]);
if (hlen >= sizeof(host)) { hlen = sizeof(host) - 1; }
memcpy(host, tokens[1], hlen);
host[hlen] = '\0';
port = (uint16_t)atoi(colon + 1);
} else {
/* numeric index into peer table */
int idx = atoi(tokens[1]);
pthread_mutex_lock(&peer_mutex);
int ok = idx >= 0 && idx < peer_count;
if (ok) {
strncpy(host, peer_table[idx].host, sizeof(host) - 1);
host[sizeof(host) - 1] = '\0';
port = peer_table[idx].port;
}
pthread_mutex_unlock(&peer_mutex);
if (!ok) {
printf("index %d out of range — try 'peers'\n", idx);
continue;
}
}
conn = do_connect(&cs, host, port, conn);
} else if (!conn) {
printf("not connected — use 'connect' to connect to a node\n");
} else if (strcmp(cmd, "enum-devices") == 0) {
cmd_enum_devices(conn, &cs, &req_id);
} else if (strcmp(cmd, "enum-controls") == 0) {
@@ -460,7 +629,8 @@ int main(int argc, char **argv)
}
}
transport_conn_close(conn);
if (conn) { transport_conn_close(conn); }
if (disc) { discovery_destroy(disc); }
sem_destroy(&cs.sem);
return 0;
}