#include #include #include #include #include #include "xorg.h" #include "transport.h" #include "protocol.h" #include "stream_stats.h" #include "error.h" #define DEFAULT_PORT 7700 #define DEFAULT_WIN_W 1280 #define DEFAULT_WIN_H 720 /* ------------------------------------------------------------------ */ /* Frame slot — single-frame handoff from transport thread to GL thread */ /* ------------------------------------------------------------------ */ typedef struct { pthread_mutex_t mutex; pthread_cond_t cond; uint8_t *payload; /* transport-malloc'd; we own */ const uint8_t *data; /* points into payload */ uint32_t data_len; uint16_t stream_id; int ready; /* 1 = new frame available */ int done; /* 1 = sender disconnected */ } Frame_Slot; static void frame_slot_init(Frame_Slot *s) { pthread_mutex_init(&s->mutex, NULL); pthread_cond_init(&s->cond, NULL); s->payload = NULL; s->data = NULL; s->data_len = 0; s->ready = 0; s->done = 0; } /* ------------------------------------------------------------------ */ /* Receiver state */ /* ------------------------------------------------------------------ */ typedef struct { Frame_Slot *slot; int stream_id_filter; /* 0 = accept any */ pthread_mutex_t conn_mutex; struct Transport_Conn *conn; /* current sender connection */ } Recv_State; /* ------------------------------------------------------------------ */ /* Transport callbacks */ /* ------------------------------------------------------------------ */ static void on_connect(struct Transport_Conn *conn, void *userdata) { Recv_State *rs = userdata; fprintf(stderr, "stream_recv_cli: sender connected\n"); pthread_mutex_lock(&rs->conn_mutex); rs->conn = conn; pthread_mutex_unlock(&rs->conn_mutex); } static void on_disconnect(struct Transport_Conn *conn, void *userdata) { (void)conn; Recv_State *rs = userdata; fprintf(stderr, "stream_recv_cli: sender disconnected\n"); pthread_mutex_lock(&rs->conn_mutex); rs->conn = NULL; pthread_mutex_unlock(&rs->conn_mutex); Frame_Slot *slot = rs->slot; pthread_mutex_lock(&slot->mutex); slot->done = 1; pthread_cond_signal(&slot->cond); pthread_mutex_unlock(&slot->mutex); } static void on_frame(struct Transport_Conn *conn, struct Transport_Frame *frame, void *userdata) { Recv_State *rs = userdata; if (frame->message_type == PROTO_MSG_VIDEO_FRAME) { struct Proto_Video_Frame vf; struct App_Error err = proto_read_video_frame( frame->payload, frame->payload_length, &vf); if (!APP_IS_OK(err)) { free(frame->payload); return; } if (rs->stream_id_filter && vf.stream_id != (uint16_t)rs->stream_id_filter) { free(frame->payload); return; } Frame_Slot *slot = rs->slot; pthread_mutex_lock(&slot->mutex); if (slot->ready) { free(slot->payload); /* drop stale frame — main thread is behind */ } slot->payload = frame->payload; /* take ownership */ slot->data = vf.data; slot->data_len = vf.data_len; slot->stream_id = vf.stream_id; slot->ready = 1; pthread_cond_signal(&slot->cond); pthread_mutex_unlock(&slot->mutex); /* frame->payload is now owned by slot; do not free here */ } else if (frame->message_type == PROTO_MSG_CONTROL_REQUEST) { struct Proto_Request_Header hdr; struct App_Error err = proto_read_request_header( frame->payload, frame->payload_length, &hdr); if (APP_IS_OK(err) && hdr.command == PROTO_CMD_STREAM_OPEN) { struct Proto_Stream_Open so; err = proto_read_stream_open( frame->payload, frame->payload_length, &so); if (APP_IS_OK(err)) { fprintf(stderr, "stream_recv_cli: STREAM_OPEN stream_id=%u format=%u origin=%u\n", so.stream_id, so.format, so.origin); proto_write_control_response(conn, hdr.request_id, PROTO_STATUS_OK, NULL, 0); } } free(frame->payload); } else { free(frame->payload); } } /* ------------------------------------------------------------------ */ /* Usage */ /* ------------------------------------------------------------------ */ static void usage(void) { fprintf(stderr, "usage: stream_recv_cli [--port PORT] [--stream-id N]\n" " [--scale stretch|fit|fill|1:1]\n" " [--anchor center|topleft]\n" " [--x N] [--y N]\n" "\n" "Listens for an incoming TCP stream and displays VIDEO_FRAME messages.\n" "Accepts MJPEG streams. Shows per-stream fps and Mbps as an overlay.\n" "\n" "defaults: port=7700 stream-id=0 (any) fit center x=0 y=0\n"); } /* ------------------------------------------------------------------ */ /* Main */ /* ------------------------------------------------------------------ */ int main(int argc, char **argv) { uint16_t port = DEFAULT_PORT; int stream_id_filter = 0; int win_x = 0; int win_y = 0; Xorg_Scale scale = XORG_SCALE_FIT; Xorg_Anchor anchor = XORG_ANCHOR_CENTER; for (int i = 1; i < argc; i++) { if (strcmp(argv[i], "--port") == 0 && i + 1 < argc) { port = (uint16_t)atoi(argv[++i]); } else if (strcmp(argv[i], "--stream-id") == 0 && i + 1 < argc) { stream_id_filter = atoi(argv[++i]); } else if (strcmp(argv[i], "--x") == 0 && i + 1 < argc) { win_x = atoi(argv[++i]); } else if (strcmp(argv[i], "--y") == 0 && i + 1 < argc) { win_y = atoi(argv[++i]); } else if (strcmp(argv[i], "--scale") == 0 && i + 1 < argc) { i++; if (strcmp(argv[i], "stretch") == 0) { scale = XORG_SCALE_STRETCH; } else if (strcmp(argv[i], "fit") == 0) { scale = XORG_SCALE_FIT; } else if (strcmp(argv[i], "fill") == 0) { scale = XORG_SCALE_FILL; } else if (strcmp(argv[i], "1:1") == 0) { scale = XORG_SCALE_1_1; } else { fprintf(stderr, "unknown scale: %s\n", argv[i]); usage(); return 1; } } else if (strcmp(argv[i], "--anchor") == 0 && i + 1 < argc) { i++; if (strcmp(argv[i], "center") == 0) { anchor = XORG_ANCHOR_CENTER; } else if (strcmp(argv[i], "topleft") == 0) { anchor = XORG_ANCHOR_TOP_LEFT; } else { fprintf(stderr, "unknown anchor: %s\n", argv[i]); usage(); return 1; } } else { usage(); return 1; } } if (!xorg_available()) { fprintf(stderr, "stream_recv_cli: built without HAVE_GLFW — viewer not available\n"); return 1; } /* ---------------------------------------------------------------- */ /* Frame slot and receiver state */ /* ---------------------------------------------------------------- */ Frame_Slot slot; frame_slot_init(&slot); Recv_State rs = {0}; rs.slot = &slot; rs.stream_id_filter = stream_id_filter; pthread_mutex_init(&rs.conn_mutex, NULL); /* ---------------------------------------------------------------- */ /* Transport server */ /* ---------------------------------------------------------------- */ struct Transport_Server_Config cfg = { .port = port, .max_connections = 1, .max_payload = TRANSPORT_DEFAULT_MAX_PAYLOAD, .on_frame = on_frame, .on_connect = on_connect, .on_disconnect = on_disconnect, .userdata = &rs, }; struct Transport_Server *server = NULL; struct App_Error err = transport_server_create(&server, &cfg); if (!APP_IS_OK(err)) { app_error_print(&err); return 1; } err = transport_server_start(server); if (!APP_IS_OK(err)) { app_error_print(&err); transport_server_destroy(server); return 1; } fprintf(stderr, "stream_recv_cli: listening on port %u\n", port); /* ---------------------------------------------------------------- */ /* Open viewer */ /* ---------------------------------------------------------------- */ Xorg_Viewer *v = xorg_viewer_open(win_x, win_y, DEFAULT_WIN_W, DEFAULT_WIN_H, "stream_recv_cli"); if (!v) { fprintf(stderr, "stream_recv_cli: failed to open viewer window\n"); transport_server_destroy(server); return 1; } xorg_viewer_set_scale(v, scale); xorg_viewer_set_anchor(v, anchor); xorg_viewer_set_overlay_text(v, 0, 10, 10, "waiting for stream...", 1.0f, 1.0f, 0.8f); /* ---------------------------------------------------------------- */ /* Render loop */ /* ---------------------------------------------------------------- */ Stream_Stats stats; stream_stats_init(&stats, (uint16_t)stream_id_filter); while (xorg_viewer_handle_events(v)) { /* * Wait for the next frame with a 100ms deadline so we can poll * window events even when no frames arrive. */ struct timespec deadline; clock_gettime(CLOCK_REALTIME, &deadline); deadline.tv_nsec += 100000000LL; if (deadline.tv_nsec >= 1000000000LL) { deadline.tv_sec++; deadline.tv_nsec -= 1000000000LL; } pthread_mutex_lock(&slot.mutex); while (!slot.ready && !slot.done) { int rc = pthread_cond_timedwait(&slot.cond, &slot.mutex, &deadline); if (rc != 0) { break; } /* timeout — poll events */ } if (!slot.ready) { int done = slot.done; pthread_mutex_unlock(&slot.mutex); if (done) { break; } continue; } /* Take ownership of the frame. */ uint8_t *payload = slot.payload; const uint8_t *data = slot.data; uint32_t data_len = slot.data_len; uint16_t stream_id = slot.stream_id; slot.payload = NULL; slot.data = NULL; slot.ready = 0; pthread_mutex_unlock(&slot.mutex); xorg_viewer_push_mjpeg(v, data, data_len); stream_stats_record_frame(&stats, data_len); free(payload); if (stream_stats_tick(&stats)) { char info[64]; snprintf(info, sizeof(info), "stream %u %.1f fps %.2f Mbps", stream_id, stats.fps, stats.mbps); xorg_viewer_set_overlay_text(v, 0, 10, 10, info, 1.0f, 1.0f, 0.8f); } } /* ---------------------------------------------------------------- */ /* Cleanup */ /* ---------------------------------------------------------------- */ /* Close the active connection if still open. */ pthread_mutex_lock(&rs.conn_mutex); if (rs.conn) { transport_conn_close(rs.conn); rs.conn = NULL; } pthread_mutex_unlock(&rs.conn_mutex); xorg_viewer_close(v); transport_server_destroy(server); pthread_mutex_lock(&slot.mutex); free(slot.payload); pthread_mutex_unlock(&slot.mutex); pthread_mutex_destroy(&slot.mutex); pthread_cond_destroy(&slot.cond); pthread_mutex_destroy(&rs.conn_mutex); return 0; }