Add transport module: TCP framing, thread-per-connection, inbound limit
transport_server_create/start: binds TCP, spawns accept thread, closes excess inbound connections when max_connections is reached. transport_connect: outbound TCP, spawns read thread before returning. transport_send_frame: packs 8-byte header with serial put_*, then writes header + payload under a per-connection mutex (thread-safe). Read thread: reads header, validates payload_length <= max_payload, mallocs payload, calls on_frame (callback owns and must free payload). On error or disconnect calls on_disconnect then frees conn. transport_cli: server mode echoes received frames; client mode sends 3 test frames and prints echoes. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,19 +1,26 @@
|
|||||||
ROOT := $(abspath ../..)
|
ROOT := $(abspath ../..)
|
||||||
include $(ROOT)/common.mk
|
include $(ROOT)/common.mk
|
||||||
|
|
||||||
CLI_BUILD = $(BUILD)/cli
|
CLI_BUILD = $(BUILD)/cli
|
||||||
COMMON_OBJ = $(BUILD)/common/error.o
|
COMMON_OBJ = $(BUILD)/common/error.o
|
||||||
MEDIA_CTRL_OBJ = $(BUILD)/media_ctrl/media_ctrl.o
|
MEDIA_CTRL_OBJ = $(BUILD)/media_ctrl/media_ctrl.o
|
||||||
V4L2_CTRL_OBJ = $(BUILD)/v4l2_ctrl/v4l2_ctrl.o
|
V4L2_CTRL_OBJ = $(BUILD)/v4l2_ctrl/v4l2_ctrl.o
|
||||||
|
SERIAL_OBJ = $(BUILD)/serial/serial.o
|
||||||
|
TRANSPORT_OBJ = $(BUILD)/transport/transport.o
|
||||||
|
|
||||||
.PHONY: all clean modules
|
.PHONY: all clean modules
|
||||||
|
|
||||||
all: modules $(CLI_BUILD)/media_ctrl_cli $(CLI_BUILD)/v4l2_ctrl_cli
|
all: modules \
|
||||||
|
$(CLI_BUILD)/media_ctrl_cli \
|
||||||
|
$(CLI_BUILD)/v4l2_ctrl_cli \
|
||||||
|
$(CLI_BUILD)/transport_cli
|
||||||
|
|
||||||
modules:
|
modules:
|
||||||
$(MAKE) -C $(ROOT)/src/modules/common
|
$(MAKE) -C $(ROOT)/src/modules/common
|
||||||
$(MAKE) -C $(ROOT)/src/modules/media_ctrl
|
$(MAKE) -C $(ROOT)/src/modules/media_ctrl
|
||||||
$(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl
|
$(MAKE) -C $(ROOT)/src/modules/v4l2_ctrl
|
||||||
|
$(MAKE) -C $(ROOT)/src/modules/serial
|
||||||
|
$(MAKE) -C $(ROOT)/src/modules/transport
|
||||||
|
|
||||||
$(CLI_BUILD)/media_ctrl_cli: media_ctrl_cli.c $(COMMON_OBJ) $(MEDIA_CTRL_OBJ) | $(CLI_BUILD)
|
$(CLI_BUILD)/media_ctrl_cli: media_ctrl_cli.c $(COMMON_OBJ) $(MEDIA_CTRL_OBJ) | $(CLI_BUILD)
|
||||||
$(CC) $(CFLAGS) -o $@ $^
|
$(CC) $(CFLAGS) -o $@ $^
|
||||||
@@ -21,8 +28,14 @@ $(CLI_BUILD)/media_ctrl_cli: media_ctrl_cli.c $(COMMON_OBJ) $(MEDIA_CTRL_OBJ) |
|
|||||||
$(CLI_BUILD)/v4l2_ctrl_cli: v4l2_ctrl_cli.c $(COMMON_OBJ) $(V4L2_CTRL_OBJ) | $(CLI_BUILD)
|
$(CLI_BUILD)/v4l2_ctrl_cli: v4l2_ctrl_cli.c $(COMMON_OBJ) $(V4L2_CTRL_OBJ) | $(CLI_BUILD)
|
||||||
$(CC) $(CFLAGS) -o $@ $^
|
$(CC) $(CFLAGS) -o $@ $^
|
||||||
|
|
||||||
|
$(CLI_BUILD)/transport_cli: transport_cli.c $(COMMON_OBJ) $(SERIAL_OBJ) $(TRANSPORT_OBJ) | $(CLI_BUILD)
|
||||||
|
$(CC) $(CFLAGS) -o $@ $^ -lpthread
|
||||||
|
|
||||||
$(CLI_BUILD):
|
$(CLI_BUILD):
|
||||||
mkdir -p $@
|
mkdir -p $@
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
rm -f $(CLI_BUILD)/media_ctrl_cli $(CLI_BUILD)/v4l2_ctrl_cli
|
rm -f \
|
||||||
|
$(CLI_BUILD)/media_ctrl_cli \
|
||||||
|
$(CLI_BUILD)/v4l2_ctrl_cli \
|
||||||
|
$(CLI_BUILD)/transport_cli
|
||||||
|
|||||||
159
dev/cli/transport_cli.c
Normal file
159
dev/cli/transport_cli.c
Normal file
@@ -0,0 +1,159 @@
|
|||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "transport.h"
|
||||||
|
#include "error.h"
|
||||||
|
|
||||||
|
/* -- server ---------------------------------------------------------------- */
|
||||||
|
|
||||||
|
static void server_on_frame(struct Transport_Conn *conn,
|
||||||
|
struct Transport_Frame *frame, void *userdata)
|
||||||
|
{
|
||||||
|
(void)userdata;
|
||||||
|
|
||||||
|
printf("recv type=0x%04x channel=%u length=%u",
|
||||||
|
frame->message_type, frame->channel_id, frame->payload_length);
|
||||||
|
|
||||||
|
if (frame->payload_length > 0) {
|
||||||
|
uint32_t show = frame->payload_length < 8 ? frame->payload_length : 8;
|
||||||
|
printf(" [");
|
||||||
|
for (uint32_t i = 0; i < show; i++) {
|
||||||
|
printf("%02x", frame->payload[i]);
|
||||||
|
}
|
||||||
|
if (frame->payload_length > 8) { printf("..."); }
|
||||||
|
printf("]");
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
|
||||||
|
struct App_Error err = transport_send_frame(conn,
|
||||||
|
frame->message_type, frame->channel_id,
|
||||||
|
frame->payload, frame->payload_length);
|
||||||
|
|
||||||
|
if (!APP_IS_OK(err)) {
|
||||||
|
fprintf(stderr, "echo failed (errno %d)\n", err.detail.syscall.err_no);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(frame->payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void server_on_connect(struct Transport_Conn *conn, void *userdata) {
|
||||||
|
(void)conn; (void)userdata;
|
||||||
|
printf("client connected\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
static void server_on_disconnect(struct Transport_Conn *conn, void *userdata) {
|
||||||
|
(void)conn; (void)userdata;
|
||||||
|
printf("client disconnected\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
static void cmd_server(int argc, char **argv) {
|
||||||
|
if (argc < 1) {
|
||||||
|
fprintf(stderr, "usage: transport_cli server <port> [max_connections]\n");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint16_t port = (uint16_t)atoi(argv[0]);
|
||||||
|
int max_conn = argc >= 2 ? atoi(argv[1]) : 8;
|
||||||
|
|
||||||
|
struct Transport_Server_Config config = {
|
||||||
|
.port = port,
|
||||||
|
.max_connections = max_conn,
|
||||||
|
.max_payload = TRANSPORT_DEFAULT_MAX_PAYLOAD,
|
||||||
|
.on_frame = server_on_frame,
|
||||||
|
.on_connect = server_on_connect,
|
||||||
|
.on_disconnect = server_on_disconnect,
|
||||||
|
.userdata = NULL,
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Transport_Server *server;
|
||||||
|
struct App_Error err = transport_server_create(&server, &config);
|
||||||
|
if (!APP_IS_OK(err)) {
|
||||||
|
fprintf(stderr, "transport_server_create: errno %d\n", err.detail.syscall.err_no);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
err = transport_server_start(server);
|
||||||
|
if (!APP_IS_OK(err)) {
|
||||||
|
fprintf(stderr, "transport_server_start: errno %d\n", err.detail.syscall.err_no);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("listening on port %u max_connections=%d\n", port, max_conn);
|
||||||
|
pause();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* -- client ---------------------------------------------------------------- */
|
||||||
|
|
||||||
|
static void client_on_frame(struct Transport_Conn *conn,
|
||||||
|
struct Transport_Frame *frame, void *userdata)
|
||||||
|
{
|
||||||
|
(void)conn; (void)userdata;
|
||||||
|
printf("echo type=0x%04x channel=%u length=%u\n",
|
||||||
|
frame->message_type, frame->channel_id, frame->payload_length);
|
||||||
|
free(frame->payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void client_on_disconnect(struct Transport_Conn *conn, void *userdata) {
|
||||||
|
(void)conn; (void)userdata;
|
||||||
|
printf("disconnected\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
static void cmd_client(int argc, char **argv) {
|
||||||
|
if (argc < 2) {
|
||||||
|
fprintf(stderr, "usage: transport_cli client <host> <port>\n");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
const char *host = argv[0];
|
||||||
|
uint16_t port = (uint16_t)atoi(argv[1]);
|
||||||
|
|
||||||
|
struct Transport_Conn *conn;
|
||||||
|
struct App_Error err = transport_connect(&conn, host, port,
|
||||||
|
TRANSPORT_DEFAULT_MAX_PAYLOAD,
|
||||||
|
client_on_frame,
|
||||||
|
client_on_disconnect,
|
||||||
|
NULL);
|
||||||
|
|
||||||
|
if (!APP_IS_OK(err)) {
|
||||||
|
fprintf(stderr, "transport_connect: errno %d\n", err.detail.syscall.err_no);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("connected to %s:%u\n", host, port);
|
||||||
|
|
||||||
|
for (uint16_t i = 0; i < 3; i++) {
|
||||||
|
uint8_t payload[] = { 0xde, 0xad, 0xbe, 0xef, (uint8_t)i };
|
||||||
|
err = transport_send_frame(conn, 0x0001, i, payload, sizeof(payload));
|
||||||
|
if (!APP_IS_OK(err)) {
|
||||||
|
fprintf(stderr, "send failed on frame %u (errno %d)\n",
|
||||||
|
i, err.detail.syscall.err_no);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
printf("sent type=0x0001 channel=%u length=5\n", i);
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(1);
|
||||||
|
transport_conn_close(conn);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* -- main ------------------------------------------------------------------ */
|
||||||
|
|
||||||
|
int main(int argc, char **argv) {
|
||||||
|
if (argc < 2) {
|
||||||
|
fprintf(stderr, "usage: transport_cli <server|client> ...\n");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (strcmp(argv[1], "server") == 0) {
|
||||||
|
cmd_server(argc - 2, argv + 2);
|
||||||
|
} else if (strcmp(argv[1], "client") == 0) {
|
||||||
|
cmd_client(argc - 2, argv + 2);
|
||||||
|
} else {
|
||||||
|
fprintf(stderr, "unknown command: %s\n", argv[1]);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
89
include/transport.h
Normal file
89
include/transport.h
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <stdint.h>
|
||||||
|
#include "error.h"
|
||||||
|
|
||||||
|
#define TRANSPORT_FRAME_HEADER_SIZE 8u
|
||||||
|
#define TRANSPORT_DEFAULT_MAX_PAYLOAD (16u * 1024u * 1024u)
|
||||||
|
|
||||||
|
struct Transport_Conn;
|
||||||
|
struct Transport_Server;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* A received frame. payload is malloc'd by the transport layer;
|
||||||
|
* the on_frame callback takes ownership and must free it.
|
||||||
|
* payload is NULL when payload_length is 0.
|
||||||
|
*/
|
||||||
|
struct Transport_Frame {
|
||||||
|
uint16_t message_type;
|
||||||
|
uint16_t channel_id;
|
||||||
|
uint32_t payload_length;
|
||||||
|
uint8_t *payload;
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef void (*Transport_Frame_Cb)(
|
||||||
|
struct Transport_Conn *conn,
|
||||||
|
struct Transport_Frame *frame,
|
||||||
|
void *userdata);
|
||||||
|
|
||||||
|
typedef void (*Transport_Connect_Cb)(
|
||||||
|
struct Transport_Conn *conn,
|
||||||
|
void *userdata);
|
||||||
|
|
||||||
|
typedef void (*Transport_Disconnect_Cb)(
|
||||||
|
struct Transport_Conn *conn,
|
||||||
|
void *userdata);
|
||||||
|
|
||||||
|
struct Transport_Server_Config {
|
||||||
|
uint16_t port;
|
||||||
|
int max_connections; /* inbound limit; excess connections are closed immediately */
|
||||||
|
uint32_t max_payload; /* max payload bytes; frames exceeding this disconnect the peer */
|
||||||
|
Transport_Frame_Cb on_frame; /* required */
|
||||||
|
Transport_Connect_Cb on_connect; /* optional; called before read loop starts */
|
||||||
|
Transport_Disconnect_Cb on_disconnect; /* optional; called from read thread before conn is freed */
|
||||||
|
void *userdata;
|
||||||
|
};
|
||||||
|
|
||||||
|
/* Create a server (does not start listening yet). */
|
||||||
|
struct App_Error transport_server_create(struct Transport_Server **out,
|
||||||
|
struct Transport_Server_Config *config);
|
||||||
|
|
||||||
|
/* Bind, listen, and spawn the accept thread. */
|
||||||
|
struct App_Error transport_server_start(struct Transport_Server *server);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Stop accepting new connections and free the server.
|
||||||
|
* Active connections continue until they disconnect naturally.
|
||||||
|
*/
|
||||||
|
void transport_server_destroy(struct Transport_Server *server);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Connect outbound to host:port and spawn a read thread.
|
||||||
|
* on_disconnect is optional. on_frame is required.
|
||||||
|
* The conn is freed when the read thread exits.
|
||||||
|
* Do not use conn after on_disconnect has been called.
|
||||||
|
*/
|
||||||
|
struct App_Error transport_connect(struct Transport_Conn **out,
|
||||||
|
const char *host,
|
||||||
|
uint16_t port,
|
||||||
|
uint32_t max_payload,
|
||||||
|
Transport_Frame_Cb on_frame,
|
||||||
|
Transport_Disconnect_Cb on_disconnect,
|
||||||
|
void *userdata);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Send a frame. Thread-safe.
|
||||||
|
* payload may be NULL when length is 0.
|
||||||
|
*/
|
||||||
|
struct App_Error transport_send_frame(struct Transport_Conn *conn,
|
||||||
|
uint16_t message_type,
|
||||||
|
uint16_t channel_id,
|
||||||
|
const uint8_t *payload,
|
||||||
|
uint32_t length);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Close the connection fd. The read thread will detect the error,
|
||||||
|
* call on_disconnect, then free the conn.
|
||||||
|
* Do not use conn after calling this.
|
||||||
|
*/
|
||||||
|
void transport_conn_close(struct Transport_Conn *conn);
|
||||||
17
src/modules/transport/Makefile
Normal file
17
src/modules/transport/Makefile
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
ROOT := $(abspath ../../..)
|
||||||
|
include $(ROOT)/common.mk
|
||||||
|
|
||||||
|
MODULE_BUILD = $(BUILD)/transport
|
||||||
|
|
||||||
|
.PHONY: all clean
|
||||||
|
|
||||||
|
all: $(MODULE_BUILD)/transport.o
|
||||||
|
|
||||||
|
$(MODULE_BUILD)/transport.o: transport.c $(ROOT)/include/transport.h | $(MODULE_BUILD)
|
||||||
|
$(CC) $(CFLAGS) -c -o $@ $<
|
||||||
|
|
||||||
|
$(MODULE_BUILD):
|
||||||
|
mkdir -p $@
|
||||||
|
|
||||||
|
clean:
|
||||||
|
rm -f $(MODULE_BUILD)/transport.o
|
||||||
311
src/modules/transport/transport.c
Normal file
311
src/modules/transport/transport.c
Normal file
@@ -0,0 +1,311 @@
|
|||||||
|
#include <stdlib.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <stdatomic.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <netdb.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <errno.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
|
||||||
|
#include "transport.h"
|
||||||
|
#include "serial.h"
|
||||||
|
|
||||||
|
struct Transport_Conn {
|
||||||
|
int fd;
|
||||||
|
pthread_mutex_t write_mutex;
|
||||||
|
uint32_t max_payload;
|
||||||
|
Transport_Frame_Cb on_frame;
|
||||||
|
Transport_Disconnect_Cb on_disconnect;
|
||||||
|
void *userdata;
|
||||||
|
struct Transport_Server *server; /* NULL for outbound connections */
|
||||||
|
};
|
||||||
|
|
||||||
|
struct Transport_Server {
|
||||||
|
int listen_fd;
|
||||||
|
struct Transport_Server_Config config;
|
||||||
|
pthread_t accept_thread;
|
||||||
|
pthread_mutex_t count_mutex;
|
||||||
|
int connection_count;
|
||||||
|
atomic_int running;
|
||||||
|
};
|
||||||
|
|
||||||
|
static int write_exact(int fd, const uint8_t *buf, size_t n) {
|
||||||
|
while (n > 0) {
|
||||||
|
ssize_t r = write(fd, buf, n);
|
||||||
|
if (r <= 0) { return -1; }
|
||||||
|
buf += r;
|
||||||
|
n -= (size_t)r;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int read_exact(int fd, uint8_t *buf, size_t n) {
|
||||||
|
while (n > 0) {
|
||||||
|
ssize_t r = read(fd, buf, n);
|
||||||
|
if (r <= 0) { return -1; }
|
||||||
|
buf += r;
|
||||||
|
n -= (size_t)r;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *conn_read_thread_fn(void *arg) {
|
||||||
|
struct Transport_Conn *conn = arg;
|
||||||
|
uint8_t header_buf[TRANSPORT_FRAME_HEADER_SIZE];
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
if (read_exact(conn->fd, header_buf, TRANSPORT_FRAME_HEADER_SIZE) != 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Transport_Frame frame;
|
||||||
|
frame.message_type = get_u16(header_buf, 0);
|
||||||
|
frame.channel_id = get_u16(header_buf, 2);
|
||||||
|
frame.payload_length = get_u32(header_buf, 4);
|
||||||
|
|
||||||
|
if (frame.payload_length > conn->max_payload) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (frame.payload_length > 0) {
|
||||||
|
frame.payload = malloc(frame.payload_length);
|
||||||
|
if (!frame.payload) { break; }
|
||||||
|
if (read_exact(conn->fd, frame.payload, frame.payload_length) != 0) {
|
||||||
|
free(frame.payload);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
frame.payload = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
conn->on_frame(conn, &frame, conn->userdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (conn->on_disconnect) {
|
||||||
|
conn->on_disconnect(conn, conn->userdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (conn->server) {
|
||||||
|
pthread_mutex_lock(&conn->server->count_mutex);
|
||||||
|
conn->server->connection_count--;
|
||||||
|
pthread_mutex_unlock(&conn->server->count_mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
close(conn->fd);
|
||||||
|
pthread_mutex_destroy(&conn->write_mutex);
|
||||||
|
free(conn);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct Transport_Conn *conn_create(int fd, uint32_t max_payload,
|
||||||
|
Transport_Frame_Cb on_frame, Transport_Disconnect_Cb on_disconnect,
|
||||||
|
void *userdata, struct Transport_Server *server)
|
||||||
|
{
|
||||||
|
struct Transport_Conn *conn = malloc(sizeof(*conn));
|
||||||
|
if (!conn) { return NULL; }
|
||||||
|
|
||||||
|
conn->fd = fd;
|
||||||
|
conn->max_payload = max_payload;
|
||||||
|
conn->on_frame = on_frame;
|
||||||
|
conn->on_disconnect = on_disconnect;
|
||||||
|
conn->userdata = userdata;
|
||||||
|
conn->server = server;
|
||||||
|
pthread_mutex_init(&conn->write_mutex, NULL);
|
||||||
|
|
||||||
|
return conn;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int conn_start_thread(struct Transport_Conn *conn) {
|
||||||
|
pthread_t thread;
|
||||||
|
pthread_attr_t attr;
|
||||||
|
pthread_attr_init(&attr);
|
||||||
|
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
|
||||||
|
int r = pthread_create(&thread, &attr, conn_read_thread_fn, conn);
|
||||||
|
pthread_attr_destroy(&attr);
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *accept_thread_fn(void *arg) {
|
||||||
|
struct Transport_Server *server = arg;
|
||||||
|
|
||||||
|
while (atomic_load(&server->running)) {
|
||||||
|
int fd = accept(server->listen_fd, NULL, NULL);
|
||||||
|
if (fd < 0) {
|
||||||
|
if (!atomic_load(&server->running)) { break; }
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_lock(&server->count_mutex);
|
||||||
|
if (server->connection_count >= server->config.max_connections) {
|
||||||
|
pthread_mutex_unlock(&server->count_mutex);
|
||||||
|
close(fd);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
server->connection_count++;
|
||||||
|
pthread_mutex_unlock(&server->count_mutex);
|
||||||
|
|
||||||
|
struct Transport_Conn *conn = conn_create(fd,
|
||||||
|
server->config.max_payload,
|
||||||
|
server->config.on_frame,
|
||||||
|
server->config.on_disconnect,
|
||||||
|
server->config.userdata,
|
||||||
|
server);
|
||||||
|
|
||||||
|
if (!conn) {
|
||||||
|
pthread_mutex_lock(&server->count_mutex);
|
||||||
|
server->connection_count--;
|
||||||
|
pthread_mutex_unlock(&server->count_mutex);
|
||||||
|
close(fd);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (server->config.on_connect) {
|
||||||
|
server->config.on_connect(conn, server->config.userdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (conn_start_thread(conn) != 0) {
|
||||||
|
pthread_mutex_lock(&server->count_mutex);
|
||||||
|
server->connection_count--;
|
||||||
|
pthread_mutex_unlock(&server->count_mutex);
|
||||||
|
close(conn->fd);
|
||||||
|
pthread_mutex_destroy(&conn->write_mutex);
|
||||||
|
free(conn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct App_Error transport_server_create(struct Transport_Server **out,
|
||||||
|
struct Transport_Server_Config *config)
|
||||||
|
{
|
||||||
|
struct Transport_Server *server = malloc(sizeof(*server));
|
||||||
|
if (!server) { return APP_SYSCALL_ERROR(); }
|
||||||
|
|
||||||
|
server->config = *config;
|
||||||
|
server->connection_count = 0;
|
||||||
|
server->listen_fd = -1;
|
||||||
|
atomic_init(&server->running, 0);
|
||||||
|
pthread_mutex_init(&server->count_mutex, NULL);
|
||||||
|
|
||||||
|
*out = server;
|
||||||
|
return APP_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct App_Error transport_server_start(struct Transport_Server *server) {
|
||||||
|
int fd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
|
||||||
|
if (fd < 0) { return APP_SYSCALL_ERROR(); }
|
||||||
|
|
||||||
|
int opt = 1;
|
||||||
|
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
|
||||||
|
|
||||||
|
struct sockaddr_in addr = {0};
|
||||||
|
addr.sin_family = AF_INET;
|
||||||
|
addr.sin_addr.s_addr = htonl(INADDR_ANY);
|
||||||
|
addr.sin_port = htons(server->config.port);
|
||||||
|
|
||||||
|
if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
|
||||||
|
close(fd);
|
||||||
|
return APP_SYSCALL_ERROR();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (listen(fd, SOMAXCONN) < 0) {
|
||||||
|
close(fd);
|
||||||
|
return APP_SYSCALL_ERROR();
|
||||||
|
}
|
||||||
|
|
||||||
|
server->listen_fd = fd;
|
||||||
|
atomic_store(&server->running, 1);
|
||||||
|
|
||||||
|
if (pthread_create(&server->accept_thread, NULL, accept_thread_fn, server) != 0) {
|
||||||
|
atomic_store(&server->running, 0);
|
||||||
|
close(fd);
|
||||||
|
server->listen_fd = -1;
|
||||||
|
return APP_SYSCALL_ERROR();
|
||||||
|
}
|
||||||
|
|
||||||
|
return APP_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
void transport_server_destroy(struct Transport_Server *server) {
|
||||||
|
atomic_store(&server->running, 0);
|
||||||
|
close(server->listen_fd);
|
||||||
|
pthread_join(server->accept_thread, NULL);
|
||||||
|
pthread_mutex_destroy(&server->count_mutex);
|
||||||
|
free(server);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct App_Error transport_connect(struct Transport_Conn **out,
|
||||||
|
const char *host, uint16_t port,
|
||||||
|
uint32_t max_payload,
|
||||||
|
Transport_Frame_Cb on_frame,
|
||||||
|
Transport_Disconnect_Cb on_disconnect,
|
||||||
|
void *userdata)
|
||||||
|
{
|
||||||
|
char port_str[8];
|
||||||
|
snprintf(port_str, sizeof(port_str), "%u", port);
|
||||||
|
|
||||||
|
struct addrinfo hints = {0};
|
||||||
|
hints.ai_family = AF_UNSPEC;
|
||||||
|
hints.ai_socktype = SOCK_STREAM;
|
||||||
|
|
||||||
|
struct addrinfo *res;
|
||||||
|
if (getaddrinfo(host, port_str, &hints, &res) != 0) {
|
||||||
|
return APP_SYSCALL_ERROR();
|
||||||
|
}
|
||||||
|
|
||||||
|
int fd = -1;
|
||||||
|
for (struct addrinfo *ai = res; ai; ai = ai->ai_next) {
|
||||||
|
fd = socket(ai->ai_family, ai->ai_socktype | SOCK_CLOEXEC, ai->ai_protocol);
|
||||||
|
if (fd < 0) { continue; }
|
||||||
|
if (connect(fd, ai->ai_addr, ai->ai_addrlen) == 0) { break; }
|
||||||
|
close(fd);
|
||||||
|
fd = -1;
|
||||||
|
}
|
||||||
|
freeaddrinfo(res);
|
||||||
|
|
||||||
|
if (fd < 0) { return APP_SYSCALL_ERROR(); }
|
||||||
|
|
||||||
|
struct Transport_Conn *conn = conn_create(fd, max_payload,
|
||||||
|
on_frame, on_disconnect, userdata, NULL);
|
||||||
|
|
||||||
|
if (!conn) {
|
||||||
|
close(fd);
|
||||||
|
return APP_SYSCALL_ERROR();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (conn_start_thread(conn) != 0) {
|
||||||
|
close(fd);
|
||||||
|
pthread_mutex_destroy(&conn->write_mutex);
|
||||||
|
free(conn);
|
||||||
|
return APP_SYSCALL_ERROR();
|
||||||
|
}
|
||||||
|
|
||||||
|
*out = conn;
|
||||||
|
return APP_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct App_Error transport_send_frame(struct Transport_Conn *conn,
|
||||||
|
uint16_t message_type, uint16_t channel_id,
|
||||||
|
const uint8_t *payload, uint32_t length)
|
||||||
|
{
|
||||||
|
uint8_t header[TRANSPORT_FRAME_HEADER_SIZE];
|
||||||
|
put_u16(header, 0, message_type);
|
||||||
|
put_u16(header, 2, channel_id);
|
||||||
|
put_u32(header, 4, length);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&conn->write_mutex);
|
||||||
|
int ok = write_exact(conn->fd, header, TRANSPORT_FRAME_HEADER_SIZE);
|
||||||
|
if (ok == 0 && length > 0) {
|
||||||
|
ok = write_exact(conn->fd, payload, length);
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&conn->write_mutex);
|
||||||
|
|
||||||
|
if (ok != 0) { return APP_SYSCALL_ERROR(); }
|
||||||
|
return APP_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
void transport_conn_close(struct Transport_Conn *conn) {
|
||||||
|
close(conn->fd);
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user