From ff48559b120b9299b51c5164939950eb1530d8b2 Mon Sep 17 00:00:00 2001 From: mikael-lovqvists-claude-agent Date: Thu, 26 Mar 2026 21:31:55 +0000 Subject: [PATCH] Add transport module: TCP framing, thread-per-connection, inbound limit transport_server_create/start: binds TCP, spawns accept thread, closes excess inbound connections when max_connections is reached. transport_connect: outbound TCP, spawns read thread before returning. transport_send_frame: packs 8-byte header with serial put_*, then writes header + payload under a per-connection mutex (thread-safe). Read thread: reads header, validates payload_length <= max_payload, mallocs payload, calls on_frame (callback owns and must free payload). On error or disconnect calls on_disconnect then frees conn. transport_cli: server mode echoes received frames; client mode sends 3 test frames and prints echoes. Co-Authored-By: Claude Sonnet 4.6 --- dev/cli/Makefile | 25 ++- dev/cli/transport_cli.c | 159 +++++++++++++++ include/transport.h | 89 +++++++++ src/modules/transport/Makefile | 17 ++ src/modules/transport/transport.c | 311 ++++++++++++++++++++++++++++++ 5 files changed, 595 insertions(+), 6 deletions(-) create mode 100644 dev/cli/transport_cli.c create mode 100644 include/transport.h create mode 100644 src/modules/transport/Makefile create mode 100644 src/modules/transport/transport.c diff --git a/dev/cli/Makefile b/dev/cli/Makefile index ad25b27..e5b9203 100644 --- a/dev/cli/Makefile +++ b/dev/cli/Makefile @@ -1,19 +1,26 @@ ROOT := $(abspath ../..) include $(ROOT)/common.mk -CLI_BUILD = $(BUILD)/cli -COMMON_OBJ = $(BUILD)/common/error.o -MEDIA_CTRL_OBJ = $(BUILD)/media_ctrl/media_ctrl.o -V4L2_CTRL_OBJ = $(BUILD)/v4l2_ctrl/v4l2_ctrl.o +CLI_BUILD = $(BUILD)/cli +COMMON_OBJ = $(BUILD)/common/error.o +MEDIA_CTRL_OBJ = $(BUILD)/media_ctrl/media_ctrl.o +V4L2_CTRL_OBJ = $(BUILD)/v4l2_ctrl/v4l2_ctrl.o +SERIAL_OBJ = $(BUILD)/serial/serial.o +TRANSPORT_OBJ = $(BUILD)/transport/transport.o .PHONY: all clean modules -all: modules $(CLI_BUILD)/media_ctrl_cli $(CLI_BUILD)/v4l2_ctrl_cli +all: modules \ + $(CLI_BUILD)/media_ctrl_cli \ + $(CLI_BUILD)/v4l2_ctrl_cli \ + $(CLI_BUILD)/transport_cli modules: $(MAKE) -C $(ROOT)/src/modules/common $(MAKE) -C $(ROOT)/src/modules/media_ctrl $(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl + $(MAKE) -C $(ROOT)/src/modules/serial + $(MAKE) -C $(ROOT)/src/modules/transport $(CLI_BUILD)/media_ctrl_cli: media_ctrl_cli.c $(COMMON_OBJ) $(MEDIA_CTRL_OBJ) | $(CLI_BUILD) $(CC) $(CFLAGS) -o $@ $^ @@ -21,8 +28,14 @@ $(CLI_BUILD)/media_ctrl_cli: media_ctrl_cli.c $(COMMON_OBJ) $(MEDIA_CTRL_OBJ) | $(CLI_BUILD)/v4l2_ctrl_cli: v4l2_ctrl_cli.c $(COMMON_OBJ) $(V4L2_CTRL_OBJ) | $(CLI_BUILD) $(CC) $(CFLAGS) -o $@ $^ +$(CLI_BUILD)/transport_cli: transport_cli.c $(COMMON_OBJ) $(SERIAL_OBJ) $(TRANSPORT_OBJ) | $(CLI_BUILD) + $(CC) $(CFLAGS) -o $@ $^ -lpthread + $(CLI_BUILD): mkdir -p $@ clean: - rm -f $(CLI_BUILD)/media_ctrl_cli $(CLI_BUILD)/v4l2_ctrl_cli + rm -f \ + $(CLI_BUILD)/media_ctrl_cli \ + $(CLI_BUILD)/v4l2_ctrl_cli \ + $(CLI_BUILD)/transport_cli diff --git a/dev/cli/transport_cli.c b/dev/cli/transport_cli.c new file mode 100644 index 0000000..18490b2 --- /dev/null +++ b/dev/cli/transport_cli.c @@ -0,0 +1,159 @@ +#include +#include +#include +#include + +#include "transport.h" +#include "error.h" + +/* -- server ---------------------------------------------------------------- */ + +static void server_on_frame(struct Transport_Conn *conn, + struct Transport_Frame *frame, void *userdata) +{ + (void)userdata; + + printf("recv type=0x%04x channel=%u length=%u", + frame->message_type, frame->channel_id, frame->payload_length); + + if (frame->payload_length > 0) { + uint32_t show = frame->payload_length < 8 ? frame->payload_length : 8; + printf(" ["); + for (uint32_t i = 0; i < show; i++) { + printf("%02x", frame->payload[i]); + } + if (frame->payload_length > 8) { printf("..."); } + printf("]"); + } + printf("\n"); + + struct App_Error err = transport_send_frame(conn, + frame->message_type, frame->channel_id, + frame->payload, frame->payload_length); + + if (!APP_IS_OK(err)) { + fprintf(stderr, "echo failed (errno %d)\n", err.detail.syscall.err_no); + } + + free(frame->payload); +} + +static void server_on_connect(struct Transport_Conn *conn, void *userdata) { + (void)conn; (void)userdata; + printf("client connected\n"); +} + +static void server_on_disconnect(struct Transport_Conn *conn, void *userdata) { + (void)conn; (void)userdata; + printf("client disconnected\n"); +} + +static void cmd_server(int argc, char **argv) { + if (argc < 1) { + fprintf(stderr, "usage: transport_cli server [max_connections]\n"); + exit(1); + } + + uint16_t port = (uint16_t)atoi(argv[0]); + int max_conn = argc >= 2 ? atoi(argv[1]) : 8; + + struct Transport_Server_Config config = { + .port = port, + .max_connections = max_conn, + .max_payload = TRANSPORT_DEFAULT_MAX_PAYLOAD, + .on_frame = server_on_frame, + .on_connect = server_on_connect, + .on_disconnect = server_on_disconnect, + .userdata = NULL, + }; + + struct Transport_Server *server; + struct App_Error err = transport_server_create(&server, &config); + if (!APP_IS_OK(err)) { + fprintf(stderr, "transport_server_create: errno %d\n", err.detail.syscall.err_no); + exit(1); + } + + err = transport_server_start(server); + if (!APP_IS_OK(err)) { + fprintf(stderr, "transport_server_start: errno %d\n", err.detail.syscall.err_no); + exit(1); + } + + printf("listening on port %u max_connections=%d\n", port, max_conn); + pause(); +} + +/* -- client ---------------------------------------------------------------- */ + +static void client_on_frame(struct Transport_Conn *conn, + struct Transport_Frame *frame, void *userdata) +{ + (void)conn; (void)userdata; + printf("echo type=0x%04x channel=%u length=%u\n", + frame->message_type, frame->channel_id, frame->payload_length); + free(frame->payload); +} + +static void client_on_disconnect(struct Transport_Conn *conn, void *userdata) { + (void)conn; (void)userdata; + printf("disconnected\n"); +} + +static void cmd_client(int argc, char **argv) { + if (argc < 2) { + fprintf(stderr, "usage: transport_cli client \n"); + exit(1); + } + + const char *host = argv[0]; + uint16_t port = (uint16_t)atoi(argv[1]); + + struct Transport_Conn *conn; + struct App_Error err = transport_connect(&conn, host, port, + TRANSPORT_DEFAULT_MAX_PAYLOAD, + client_on_frame, + client_on_disconnect, + NULL); + + if (!APP_IS_OK(err)) { + fprintf(stderr, "transport_connect: errno %d\n", err.detail.syscall.err_no); + exit(1); + } + + printf("connected to %s:%u\n", host, port); + + for (uint16_t i = 0; i < 3; i++) { + uint8_t payload[] = { 0xde, 0xad, 0xbe, 0xef, (uint8_t)i }; + err = transport_send_frame(conn, 0x0001, i, payload, sizeof(payload)); + if (!APP_IS_OK(err)) { + fprintf(stderr, "send failed on frame %u (errno %d)\n", + i, err.detail.syscall.err_no); + break; + } + printf("sent type=0x0001 channel=%u length=5\n", i); + } + + sleep(1); + transport_conn_close(conn); +} + +/* -- main ------------------------------------------------------------------ */ + +int main(int argc, char **argv) { + if (argc < 2) { + fprintf(stderr, "usage: transport_cli ...\n"); + return 1; + } + + if (strcmp(argv[1], "server") == 0) { + cmd_server(argc - 2, argv + 2); + } else if (strcmp(argv[1], "client") == 0) { + cmd_client(argc - 2, argv + 2); + } else { + fprintf(stderr, "unknown command: %s\n", argv[1]); + return 1; + } + + return 0; +} diff --git a/include/transport.h b/include/transport.h new file mode 100644 index 0000000..376505f --- /dev/null +++ b/include/transport.h @@ -0,0 +1,89 @@ +#pragma once + +#include +#include "error.h" + +#define TRANSPORT_FRAME_HEADER_SIZE 8u +#define TRANSPORT_DEFAULT_MAX_PAYLOAD (16u * 1024u * 1024u) + +struct Transport_Conn; +struct Transport_Server; + +/* + * A received frame. payload is malloc'd by the transport layer; + * the on_frame callback takes ownership and must free it. + * payload is NULL when payload_length is 0. + */ +struct Transport_Frame { + uint16_t message_type; + uint16_t channel_id; + uint32_t payload_length; + uint8_t *payload; +}; + +typedef void (*Transport_Frame_Cb)( + struct Transport_Conn *conn, + struct Transport_Frame *frame, + void *userdata); + +typedef void (*Transport_Connect_Cb)( + struct Transport_Conn *conn, + void *userdata); + +typedef void (*Transport_Disconnect_Cb)( + struct Transport_Conn *conn, + void *userdata); + +struct Transport_Server_Config { + uint16_t port; + int max_connections; /* inbound limit; excess connections are closed immediately */ + uint32_t max_payload; /* max payload bytes; frames exceeding this disconnect the peer */ + Transport_Frame_Cb on_frame; /* required */ + Transport_Connect_Cb on_connect; /* optional; called before read loop starts */ + Transport_Disconnect_Cb on_disconnect; /* optional; called from read thread before conn is freed */ + void *userdata; +}; + +/* Create a server (does not start listening yet). */ +struct App_Error transport_server_create(struct Transport_Server **out, + struct Transport_Server_Config *config); + +/* Bind, listen, and spawn the accept thread. */ +struct App_Error transport_server_start(struct Transport_Server *server); + +/* + * Stop accepting new connections and free the server. + * Active connections continue until they disconnect naturally. + */ +void transport_server_destroy(struct Transport_Server *server); + +/* + * Connect outbound to host:port and spawn a read thread. + * on_disconnect is optional. on_frame is required. + * The conn is freed when the read thread exits. + * Do not use conn after on_disconnect has been called. + */ +struct App_Error transport_connect(struct Transport_Conn **out, + const char *host, + uint16_t port, + uint32_t max_payload, + Transport_Frame_Cb on_frame, + Transport_Disconnect_Cb on_disconnect, + void *userdata); + +/* + * Send a frame. Thread-safe. + * payload may be NULL when length is 0. + */ +struct App_Error transport_send_frame(struct Transport_Conn *conn, + uint16_t message_type, + uint16_t channel_id, + const uint8_t *payload, + uint32_t length); + +/* + * Close the connection fd. The read thread will detect the error, + * call on_disconnect, then free the conn. + * Do not use conn after calling this. + */ +void transport_conn_close(struct Transport_Conn *conn); diff --git a/src/modules/transport/Makefile b/src/modules/transport/Makefile new file mode 100644 index 0000000..0e53f17 --- /dev/null +++ b/src/modules/transport/Makefile @@ -0,0 +1,17 @@ +ROOT := $(abspath ../../..) +include $(ROOT)/common.mk + +MODULE_BUILD = $(BUILD)/transport + +.PHONY: all clean + +all: $(MODULE_BUILD)/transport.o + +$(MODULE_BUILD)/transport.o: transport.c $(ROOT)/include/transport.h | $(MODULE_BUILD) + $(CC) $(CFLAGS) -c -o $@ $< + +$(MODULE_BUILD): + mkdir -p $@ + +clean: + rm -f $(MODULE_BUILD)/transport.o diff --git a/src/modules/transport/transport.c b/src/modules/transport/transport.c new file mode 100644 index 0000000..e57e601 --- /dev/null +++ b/src/modules/transport/transport.c @@ -0,0 +1,311 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "transport.h" +#include "serial.h" + +struct Transport_Conn { + int fd; + pthread_mutex_t write_mutex; + uint32_t max_payload; + Transport_Frame_Cb on_frame; + Transport_Disconnect_Cb on_disconnect; + void *userdata; + struct Transport_Server *server; /* NULL for outbound connections */ +}; + +struct Transport_Server { + int listen_fd; + struct Transport_Server_Config config; + pthread_t accept_thread; + pthread_mutex_t count_mutex; + int connection_count; + atomic_int running; +}; + +static int write_exact(int fd, const uint8_t *buf, size_t n) { + while (n > 0) { + ssize_t r = write(fd, buf, n); + if (r <= 0) { return -1; } + buf += r; + n -= (size_t)r; + } + return 0; +} + +static int read_exact(int fd, uint8_t *buf, size_t n) { + while (n > 0) { + ssize_t r = read(fd, buf, n); + if (r <= 0) { return -1; } + buf += r; + n -= (size_t)r; + } + return 0; +} + +static void *conn_read_thread_fn(void *arg) { + struct Transport_Conn *conn = arg; + uint8_t header_buf[TRANSPORT_FRAME_HEADER_SIZE]; + + while (1) { + if (read_exact(conn->fd, header_buf, TRANSPORT_FRAME_HEADER_SIZE) != 0) { + break; + } + + struct Transport_Frame frame; + frame.message_type = get_u16(header_buf, 0); + frame.channel_id = get_u16(header_buf, 2); + frame.payload_length = get_u32(header_buf, 4); + + if (frame.payload_length > conn->max_payload) { + break; + } + + if (frame.payload_length > 0) { + frame.payload = malloc(frame.payload_length); + if (!frame.payload) { break; } + if (read_exact(conn->fd, frame.payload, frame.payload_length) != 0) { + free(frame.payload); + break; + } + } else { + frame.payload = NULL; + } + + conn->on_frame(conn, &frame, conn->userdata); + } + + if (conn->on_disconnect) { + conn->on_disconnect(conn, conn->userdata); + } + + if (conn->server) { + pthread_mutex_lock(&conn->server->count_mutex); + conn->server->connection_count--; + pthread_mutex_unlock(&conn->server->count_mutex); + } + + close(conn->fd); + pthread_mutex_destroy(&conn->write_mutex); + free(conn); + return NULL; +} + +static struct Transport_Conn *conn_create(int fd, uint32_t max_payload, + Transport_Frame_Cb on_frame, Transport_Disconnect_Cb on_disconnect, + void *userdata, struct Transport_Server *server) +{ + struct Transport_Conn *conn = malloc(sizeof(*conn)); + if (!conn) { return NULL; } + + conn->fd = fd; + conn->max_payload = max_payload; + conn->on_frame = on_frame; + conn->on_disconnect = on_disconnect; + conn->userdata = userdata; + conn->server = server; + pthread_mutex_init(&conn->write_mutex, NULL); + + return conn; +} + +static int conn_start_thread(struct Transport_Conn *conn) { + pthread_t thread; + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + int r = pthread_create(&thread, &attr, conn_read_thread_fn, conn); + pthread_attr_destroy(&attr); + return r; +} + +static void *accept_thread_fn(void *arg) { + struct Transport_Server *server = arg; + + while (atomic_load(&server->running)) { + int fd = accept(server->listen_fd, NULL, NULL); + if (fd < 0) { + if (!atomic_load(&server->running)) { break; } + continue; + } + + pthread_mutex_lock(&server->count_mutex); + if (server->connection_count >= server->config.max_connections) { + pthread_mutex_unlock(&server->count_mutex); + close(fd); + continue; + } + server->connection_count++; + pthread_mutex_unlock(&server->count_mutex); + + struct Transport_Conn *conn = conn_create(fd, + server->config.max_payload, + server->config.on_frame, + server->config.on_disconnect, + server->config.userdata, + server); + + if (!conn) { + pthread_mutex_lock(&server->count_mutex); + server->connection_count--; + pthread_mutex_unlock(&server->count_mutex); + close(fd); + continue; + } + + if (server->config.on_connect) { + server->config.on_connect(conn, server->config.userdata); + } + + if (conn_start_thread(conn) != 0) { + pthread_mutex_lock(&server->count_mutex); + server->connection_count--; + pthread_mutex_unlock(&server->count_mutex); + close(conn->fd); + pthread_mutex_destroy(&conn->write_mutex); + free(conn); + } + } + + return NULL; +} + +struct App_Error transport_server_create(struct Transport_Server **out, + struct Transport_Server_Config *config) +{ + struct Transport_Server *server = malloc(sizeof(*server)); + if (!server) { return APP_SYSCALL_ERROR(); } + + server->config = *config; + server->connection_count = 0; + server->listen_fd = -1; + atomic_init(&server->running, 0); + pthread_mutex_init(&server->count_mutex, NULL); + + *out = server; + return APP_OK; +} + +struct App_Error transport_server_start(struct Transport_Server *server) { + int fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (fd < 0) { return APP_SYSCALL_ERROR(); } + + int opt = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + + struct sockaddr_in addr = {0}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_ANY); + addr.sin_port = htons(server->config.port); + + if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + close(fd); + return APP_SYSCALL_ERROR(); + } + + if (listen(fd, SOMAXCONN) < 0) { + close(fd); + return APP_SYSCALL_ERROR(); + } + + server->listen_fd = fd; + atomic_store(&server->running, 1); + + if (pthread_create(&server->accept_thread, NULL, accept_thread_fn, server) != 0) { + atomic_store(&server->running, 0); + close(fd); + server->listen_fd = -1; + return APP_SYSCALL_ERROR(); + } + + return APP_OK; +} + +void transport_server_destroy(struct Transport_Server *server) { + atomic_store(&server->running, 0); + close(server->listen_fd); + pthread_join(server->accept_thread, NULL); + pthread_mutex_destroy(&server->count_mutex); + free(server); +} + +struct App_Error transport_connect(struct Transport_Conn **out, + const char *host, uint16_t port, + uint32_t max_payload, + Transport_Frame_Cb on_frame, + Transport_Disconnect_Cb on_disconnect, + void *userdata) +{ + char port_str[8]; + snprintf(port_str, sizeof(port_str), "%u", port); + + struct addrinfo hints = {0}; + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + struct addrinfo *res; + if (getaddrinfo(host, port_str, &hints, &res) != 0) { + return APP_SYSCALL_ERROR(); + } + + int fd = -1; + for (struct addrinfo *ai = res; ai; ai = ai->ai_next) { + fd = socket(ai->ai_family, ai->ai_socktype | SOCK_CLOEXEC, ai->ai_protocol); + if (fd < 0) { continue; } + if (connect(fd, ai->ai_addr, ai->ai_addrlen) == 0) { break; } + close(fd); + fd = -1; + } + freeaddrinfo(res); + + if (fd < 0) { return APP_SYSCALL_ERROR(); } + + struct Transport_Conn *conn = conn_create(fd, max_payload, + on_frame, on_disconnect, userdata, NULL); + + if (!conn) { + close(fd); + return APP_SYSCALL_ERROR(); + } + + if (conn_start_thread(conn) != 0) { + close(fd); + pthread_mutex_destroy(&conn->write_mutex); + free(conn); + return APP_SYSCALL_ERROR(); + } + + *out = conn; + return APP_OK; +} + +struct App_Error transport_send_frame(struct Transport_Conn *conn, + uint16_t message_type, uint16_t channel_id, + const uint8_t *payload, uint32_t length) +{ + uint8_t header[TRANSPORT_FRAME_HEADER_SIZE]; + put_u16(header, 0, message_type); + put_u16(header, 2, channel_id); + put_u32(header, 4, length); + + pthread_mutex_lock(&conn->write_mutex); + int ok = write_exact(conn->fd, header, TRANSPORT_FRAME_HEADER_SIZE); + if (ok == 0 && length > 0) { + ok = write_exact(conn->fd, payload, length); + } + pthread_mutex_unlock(&conn->write_mutex); + + if (ok != 0) { return APP_SYSCALL_ERROR(); } + return APP_OK; +} + +void transport_conn_close(struct Transport_Conn *conn) { + close(conn->fd); +}