feat: node-to-node MJPEG streaming CLIs and shared V4L2 format header
Add stream_send_cli (V4L2 capture → TCP → VIDEO_FRAME) and stream_recv_cli (TCP → threaded frame slot → GLFW display) to exercise end-to-end streaming between two nodes on the same machine or across the network. Add include/stream_stats.h (header-only rolling-window fps/Mbps tracker) and include/v4l2_fmt.h (header-only V4L2 format enumeration shared between v4l2_view_cli and stream_send_cli). Refactor v4l2_view_cli to use the shared header. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
336
dev/cli/stream_recv_cli.c
Normal file
336
dev/cli/stream_recv_cli.c
Normal file
@@ -0,0 +1,336 @@
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
#include <time.h>
|
||||
|
||||
#include "xorg.h"
|
||||
#include "transport.h"
|
||||
#include "protocol.h"
|
||||
#include "stream_stats.h"
|
||||
#include "error.h"
|
||||
|
||||
#define DEFAULT_PORT 7700
|
||||
#define DEFAULT_WIN_W 1280
|
||||
#define DEFAULT_WIN_H 720
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Frame slot — single-frame handoff from transport thread to GL thread */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
typedef struct {
|
||||
pthread_mutex_t mutex;
|
||||
pthread_cond_t cond;
|
||||
uint8_t *payload; /* transport-malloc'd; we own */
|
||||
const uint8_t *data; /* points into payload */
|
||||
uint32_t data_len;
|
||||
uint16_t stream_id;
|
||||
int ready; /* 1 = new frame available */
|
||||
int done; /* 1 = sender disconnected */
|
||||
} Frame_Slot;
|
||||
|
||||
static void frame_slot_init(Frame_Slot *s)
|
||||
{
|
||||
pthread_mutex_init(&s->mutex, NULL);
|
||||
pthread_cond_init(&s->cond, NULL);
|
||||
s->payload = NULL;
|
||||
s->data = NULL;
|
||||
s->data_len = 0;
|
||||
s->ready = 0;
|
||||
s->done = 0;
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Receiver state */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
typedef struct {
|
||||
Frame_Slot *slot;
|
||||
int stream_id_filter; /* 0 = accept any */
|
||||
pthread_mutex_t conn_mutex;
|
||||
struct Transport_Conn *conn; /* current sender connection */
|
||||
} Recv_State;
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Transport callbacks */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
static void on_connect(struct Transport_Conn *conn, void *userdata)
|
||||
{
|
||||
Recv_State *rs = userdata;
|
||||
fprintf(stderr, "stream_recv_cli: sender connected\n");
|
||||
pthread_mutex_lock(&rs->conn_mutex);
|
||||
rs->conn = conn;
|
||||
pthread_mutex_unlock(&rs->conn_mutex);
|
||||
}
|
||||
|
||||
static void on_disconnect(struct Transport_Conn *conn, void *userdata)
|
||||
{
|
||||
(void)conn;
|
||||
Recv_State *rs = userdata;
|
||||
fprintf(stderr, "stream_recv_cli: sender disconnected\n");
|
||||
|
||||
pthread_mutex_lock(&rs->conn_mutex);
|
||||
rs->conn = NULL;
|
||||
pthread_mutex_unlock(&rs->conn_mutex);
|
||||
|
||||
Frame_Slot *slot = rs->slot;
|
||||
pthread_mutex_lock(&slot->mutex);
|
||||
slot->done = 1;
|
||||
pthread_cond_signal(&slot->cond);
|
||||
pthread_mutex_unlock(&slot->mutex);
|
||||
}
|
||||
|
||||
static void on_frame(struct Transport_Conn *conn,
|
||||
struct Transport_Frame *frame,
|
||||
void *userdata)
|
||||
{
|
||||
Recv_State *rs = userdata;
|
||||
|
||||
if (frame->message_type == PROTO_MSG_VIDEO_FRAME) {
|
||||
struct Proto_Video_Frame vf;
|
||||
struct App_Error err = proto_read_video_frame(
|
||||
frame->payload, frame->payload_length, &vf);
|
||||
if (!APP_IS_OK(err)) {
|
||||
free(frame->payload);
|
||||
return;
|
||||
}
|
||||
if (rs->stream_id_filter && vf.stream_id != (uint16_t)rs->stream_id_filter) {
|
||||
free(frame->payload);
|
||||
return;
|
||||
}
|
||||
|
||||
Frame_Slot *slot = rs->slot;
|
||||
pthread_mutex_lock(&slot->mutex);
|
||||
if (slot->ready) {
|
||||
free(slot->payload); /* drop stale frame — main thread is behind */
|
||||
}
|
||||
slot->payload = frame->payload; /* take ownership */
|
||||
slot->data = vf.data;
|
||||
slot->data_len = vf.data_len;
|
||||
slot->stream_id = vf.stream_id;
|
||||
slot->ready = 1;
|
||||
pthread_cond_signal(&slot->cond);
|
||||
pthread_mutex_unlock(&slot->mutex);
|
||||
/* frame->payload is now owned by slot; do not free here */
|
||||
|
||||
} else if (frame->message_type == PROTO_MSG_CONTROL_REQUEST) {
|
||||
struct Proto_Request_Header hdr;
|
||||
struct App_Error err = proto_read_request_header(
|
||||
frame->payload, frame->payload_length, &hdr);
|
||||
if (APP_IS_OK(err) && hdr.command == PROTO_CMD_STREAM_OPEN) {
|
||||
struct Proto_Stream_Open so;
|
||||
err = proto_read_stream_open(
|
||||
frame->payload, frame->payload_length, &so);
|
||||
if (APP_IS_OK(err)) {
|
||||
fprintf(stderr,
|
||||
"stream_recv_cli: STREAM_OPEN stream_id=%u format=%u origin=%u\n",
|
||||
so.stream_id, so.format, so.origin);
|
||||
proto_write_control_response(conn, hdr.request_id,
|
||||
PROTO_STATUS_OK, NULL, 0);
|
||||
}
|
||||
}
|
||||
free(frame->payload);
|
||||
|
||||
} else {
|
||||
free(frame->payload);
|
||||
}
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Usage */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
static void usage(void)
|
||||
{
|
||||
fprintf(stderr,
|
||||
"usage: stream_recv_cli [--port PORT] [--stream-id N]\n"
|
||||
" [--scale stretch|fit|fill|1:1]\n"
|
||||
" [--anchor center|topleft]\n"
|
||||
" [--x N] [--y N]\n"
|
||||
"\n"
|
||||
"Listens for an incoming TCP stream and displays VIDEO_FRAME messages.\n"
|
||||
"Accepts MJPEG streams. Shows per-stream fps and Mbps as an overlay.\n"
|
||||
"\n"
|
||||
"defaults: port=7700 stream-id=0 (any) fit center x=0 y=0\n");
|
||||
}
|
||||
|
||||
/* ------------------------------------------------------------------ */
|
||||
/* Main */
|
||||
/* ------------------------------------------------------------------ */
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
uint16_t port = DEFAULT_PORT;
|
||||
int stream_id_filter = 0;
|
||||
int win_x = 0;
|
||||
int win_y = 0;
|
||||
Xorg_Scale scale = XORG_SCALE_FIT;
|
||||
Xorg_Anchor anchor = XORG_ANCHOR_CENTER;
|
||||
|
||||
for (int i = 1; i < argc; i++) {
|
||||
if (strcmp(argv[i], "--port") == 0 && i + 1 < argc) {
|
||||
port = (uint16_t)atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "--stream-id") == 0 && i + 1 < argc) {
|
||||
stream_id_filter = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "--x") == 0 && i + 1 < argc) {
|
||||
win_x = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "--y") == 0 && i + 1 < argc) {
|
||||
win_y = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "--scale") == 0 && i + 1 < argc) {
|
||||
i++;
|
||||
if (strcmp(argv[i], "stretch") == 0) { scale = XORG_SCALE_STRETCH; }
|
||||
else if (strcmp(argv[i], "fit") == 0) { scale = XORG_SCALE_FIT; }
|
||||
else if (strcmp(argv[i], "fill") == 0) { scale = XORG_SCALE_FILL; }
|
||||
else if (strcmp(argv[i], "1:1") == 0) { scale = XORG_SCALE_1_1; }
|
||||
else { fprintf(stderr, "unknown scale: %s\n", argv[i]); usage(); return 1; }
|
||||
} else if (strcmp(argv[i], "--anchor") == 0 && i + 1 < argc) {
|
||||
i++;
|
||||
if (strcmp(argv[i], "center") == 0) { anchor = XORG_ANCHOR_CENTER; }
|
||||
else if (strcmp(argv[i], "topleft") == 0) { anchor = XORG_ANCHOR_TOP_LEFT; }
|
||||
else { fprintf(stderr, "unknown anchor: %s\n", argv[i]); usage(); return 1; }
|
||||
} else {
|
||||
usage(); return 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!xorg_available()) {
|
||||
fprintf(stderr, "stream_recv_cli: built without HAVE_GLFW — viewer not available\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------- */
|
||||
/* Frame slot and receiver state */
|
||||
/* ---------------------------------------------------------------- */
|
||||
|
||||
Frame_Slot slot;
|
||||
frame_slot_init(&slot);
|
||||
|
||||
Recv_State rs = {0};
|
||||
rs.slot = &slot;
|
||||
rs.stream_id_filter = stream_id_filter;
|
||||
pthread_mutex_init(&rs.conn_mutex, NULL);
|
||||
|
||||
/* ---------------------------------------------------------------- */
|
||||
/* Transport server */
|
||||
/* ---------------------------------------------------------------- */
|
||||
|
||||
struct Transport_Server_Config cfg = {
|
||||
.port = port,
|
||||
.max_connections = 1,
|
||||
.max_payload = TRANSPORT_DEFAULT_MAX_PAYLOAD,
|
||||
.on_frame = on_frame,
|
||||
.on_connect = on_connect,
|
||||
.on_disconnect = on_disconnect,
|
||||
.userdata = &rs,
|
||||
};
|
||||
|
||||
struct Transport_Server *server = NULL;
|
||||
struct App_Error err = transport_server_create(&server, &cfg);
|
||||
if (!APP_IS_OK(err)) { app_error_print(&err); return 1; }
|
||||
|
||||
err = transport_server_start(server);
|
||||
if (!APP_IS_OK(err)) {
|
||||
app_error_print(&err);
|
||||
transport_server_destroy(server);
|
||||
return 1;
|
||||
}
|
||||
|
||||
fprintf(stderr, "stream_recv_cli: listening on port %u\n", port);
|
||||
|
||||
/* ---------------------------------------------------------------- */
|
||||
/* Open viewer */
|
||||
/* ---------------------------------------------------------------- */
|
||||
|
||||
Xorg_Viewer *v = xorg_viewer_open(win_x, win_y,
|
||||
DEFAULT_WIN_W, DEFAULT_WIN_H,
|
||||
"stream_recv_cli");
|
||||
if (!v) {
|
||||
fprintf(stderr, "stream_recv_cli: failed to open viewer window\n");
|
||||
transport_server_destroy(server);
|
||||
return 1;
|
||||
}
|
||||
xorg_viewer_set_scale(v, scale);
|
||||
xorg_viewer_set_anchor(v, anchor);
|
||||
xorg_viewer_set_overlay_text(v, 0, 10, 10,
|
||||
"waiting for stream...", 1.0f, 1.0f, 0.8f);
|
||||
|
||||
/* ---------------------------------------------------------------- */
|
||||
/* Render loop */
|
||||
/* ---------------------------------------------------------------- */
|
||||
|
||||
Stream_Stats stats;
|
||||
stream_stats_init(&stats, (uint16_t)stream_id_filter);
|
||||
|
||||
while (xorg_viewer_handle_events(v)) {
|
||||
/*
|
||||
* Wait for the next frame with a 100ms deadline so we can poll
|
||||
* window events even when no frames arrive.
|
||||
*/
|
||||
struct timespec deadline;
|
||||
clock_gettime(CLOCK_REALTIME, &deadline);
|
||||
deadline.tv_nsec += 100000000LL;
|
||||
if (deadline.tv_nsec >= 1000000000LL) {
|
||||
deadline.tv_sec++;
|
||||
deadline.tv_nsec -= 1000000000LL;
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&slot.mutex);
|
||||
while (!slot.ready && !slot.done) {
|
||||
int rc = pthread_cond_timedwait(&slot.cond, &slot.mutex, &deadline);
|
||||
if (rc != 0) { break; } /* timeout — poll events */
|
||||
}
|
||||
|
||||
if (!slot.ready) {
|
||||
int done = slot.done;
|
||||
pthread_mutex_unlock(&slot.mutex);
|
||||
if (done) { break; }
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Take ownership of the frame. */
|
||||
uint8_t *payload = slot.payload;
|
||||
const uint8_t *data = slot.data;
|
||||
uint32_t data_len = slot.data_len;
|
||||
uint16_t stream_id = slot.stream_id;
|
||||
slot.payload = NULL;
|
||||
slot.data = NULL;
|
||||
slot.ready = 0;
|
||||
pthread_mutex_unlock(&slot.mutex);
|
||||
|
||||
xorg_viewer_push_mjpeg(v, data, data_len);
|
||||
stream_stats_record_frame(&stats, data_len);
|
||||
free(payload);
|
||||
|
||||
if (stream_stats_tick(&stats)) {
|
||||
char info[64];
|
||||
snprintf(info, sizeof(info), "stream %u %.1f fps %.2f Mbps",
|
||||
stream_id, stats.fps, stats.mbps);
|
||||
xorg_viewer_set_overlay_text(v, 0, 10, 10, info, 1.0f, 1.0f, 0.8f);
|
||||
}
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------- */
|
||||
/* Cleanup */
|
||||
/* ---------------------------------------------------------------- */
|
||||
|
||||
/* Close the active connection if still open. */
|
||||
pthread_mutex_lock(&rs.conn_mutex);
|
||||
if (rs.conn) {
|
||||
transport_conn_close(rs.conn);
|
||||
rs.conn = NULL;
|
||||
}
|
||||
pthread_mutex_unlock(&rs.conn_mutex);
|
||||
|
||||
xorg_viewer_close(v);
|
||||
transport_server_destroy(server);
|
||||
|
||||
pthread_mutex_lock(&slot.mutex);
|
||||
free(slot.payload);
|
||||
pthread_mutex_unlock(&slot.mutex);
|
||||
pthread_mutex_destroy(&slot.mutex);
|
||||
pthread_cond_destroy(&slot.cond);
|
||||
pthread_mutex_destroy(&rs.conn_mutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
Reference in New Issue
Block a user