From 6c9e0ce7dc7740554c0ca0ef542216b2889431af Mon Sep 17 00:00:00 2001 From: mikael-lovqvists-claude-agent Date: Sun, 29 Mar 2026 02:02:38 +0000 Subject: [PATCH] Add START_INGEST and STOP_INGEST protocol commands START_INGEST carries stream_id, format/width/height/fps, dest_host:port, transport_mode (encapsulated or opaque), and device_path. All format fields default to 0 (auto-select). STOP_INGEST carries stream_id only. Both commands set wanted state on the node; reconciliation is asynchronous. Protocol doc updated with wire schemas for both commands. Co-Authored-By: Claude Sonnet 4.6 --- docs/protocol.md | 58 ++++++++++++++++++++++ include/protocol.h | 56 +++++++++++++++++++++ src/modules/protocol/protocol.c | 86 +++++++++++++++++++++++++++++++++ 3 files changed, 200 insertions(+) diff --git a/docs/protocol.md b/docs/protocol.md index ccde312..4e973b4 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -99,6 +99,8 @@ packet-beta | `0x0005` | `GET_CONTROL` | Get a V4L2 control value | | `0x0006` | `SET_CONTROL` | Set a V4L2 control value | | `0x0007` | `ENUM_MONITORS` | List X11 monitors (XRandR) on the remote node | +| `0x0008` | `START_INGEST` | Set wanted state: open V4L2 device, connect outbound, begin streaming | +| `0x0009` | `STOP_INGEST` | Set wanted state: stop ingest stream and disconnect | ### `CONTROL_RESPONSE` (0x0003) @@ -412,3 +414,59 @@ packet-beta **Response** — no extra fields beyond request_id and status. For `MENU` and `INTEGER_MENU` controls, `value` must be a valid menu item `index` as returned by `ENUM_CONTROLS`. + +### `START_INGEST` (0x0008) + +Sets wanted state on a source node: open the specified V4L2 device, configure the stream format, and connect outbound to the given sink. + +**Request**: + +```mermaid +%%{init: {'packet': {'bitsPerRow': 16}}}%% +packet-beta +0-15: "request_id" +16-31: "command = 0x0008" +32-47: "stream_id" +48-63: "format" +64-79: "width" +80-95: "height" +96-111: "fps_n" +112-127: "fps_d" +128-143: "dest_port" +144-159: "transport_mode" +160-167: "device_path_len" +168-175: "device_path …" +``` + +Followed by `dest_host` str8. + +| Field | Description | +|---|---| +| `stream_id` | ID assigned by the controller; used in all subsequent `VIDEO_FRAME` messages | +| `format` | Codec format code (see [Codec Formats](#codec-formats)); `0` = auto-select best MJPEG | +| `width` | Capture width in pixels; `0` = auto-select | +| `height` | Capture height in pixels; `0` = auto-select | +| `fps_n` | Frame rate numerator; `0` = auto-select | +| `fps_d` | Frame rate denominator | +| `dest_port` | TCP port of the sink node to connect to | +| `transport_mode` | `0x0001` = encapsulated (framed); `0x0002` = opaque (raw byte stream) | +| `device_path` | str8 — path to the V4L2 device, e.g. `/dev/video0` | +| `dest_host` | str8 — hostname or IP of the sink node | + +**Response** — no extra fields beyond request_id and status. `OK` means the wanted state was accepted; the node will reconcile asynchronously. + +### `STOP_INGEST` (0x0009) + +Sets wanted state: stop the ingest stream and disconnect from the sink. + +**Request**: + +```mermaid +%%{init: {'packet': {'bitsPerRow': 16}}}%% +packet-beta +0-15: "request_id" +16-31: "command = 0x0009" +32-47: "stream_id" +``` + +**Response** — no extra fields beyond request_id and status. diff --git a/include/protocol.h b/include/protocol.h index 7855cbf..7ac2726 100644 --- a/include/protocol.h +++ b/include/protocol.h @@ -24,6 +24,8 @@ #define PROTO_CMD_GET_CONTROL 0x0005u #define PROTO_CMD_SET_CONTROL 0x0006u #define PROTO_CMD_ENUM_MONITORS 0x0007u +#define PROTO_CMD_START_INGEST 0x0008u +#define PROTO_CMD_STOP_INGEST 0x0009u /* ------------------------------------------------------------------------- * Response status codes (carried in CONTROL_RESPONSE payload offset 2) @@ -66,6 +68,13 @@ #define PROTO_PIXEL_YUV420P 0x0004u #define PROTO_PIXEL_YUV422 0x0005u +/* ------------------------------------------------------------------------- + * Transport mode codes (START_INGEST transport_mode field) + * ------------------------------------------------------------------------- */ + +#define PROTO_TRANSPORT_ENCAPSULATED 0x0001u /* framed: message_type + payload_length header */ +#define PROTO_TRANSPORT_OPAQUE 0x0002u /* raw byte stream, no frame boundaries */ + /* ------------------------------------------------------------------------- * Origin codes (STREAM_OPEN origin field; informational only) * ------------------------------------------------------------------------- */ @@ -196,6 +205,33 @@ struct Proto_Set_Control_Req { int32_t value; }; +/* + * START_INGEST: controller tells a source node to open a V4L2 device and + * connect outbound to a sink at dest_host:dest_port. + * format/width/height/fps_n/fps_d of 0 mean auto-select. + * Strings point into the caller's payload buffer; not NUL-terminated. + */ +struct Proto_Start_Ingest { + uint16_t request_id; + uint16_t stream_id; + uint16_t format; /* PROTO_FORMAT_* code; 0 = auto (best MJPEG) */ + uint16_t width; /* 0 = auto */ + uint16_t height; /* 0 = auto */ + uint16_t fps_n; /* 0 = auto */ + uint16_t fps_d; + uint16_t dest_port; + uint16_t transport_mode; /* PROTO_TRANSPORT_ENCAPSULATED or PROTO_TRANSPORT_OPAQUE */ + const char *device_path; + uint8_t device_path_len; + const char *dest_host; + uint8_t dest_host_len; +}; + +struct Proto_Stop_Ingest { + uint16_t request_id; + uint16_t stream_id; +}; + struct Proto_Response_Header { uint16_t request_id; uint16_t status; @@ -253,6 +289,18 @@ struct App_Error proto_write_set_control(struct Transport_Conn *conn, struct App_Error proto_write_enum_monitors(struct Transport_Conn *conn, uint16_t request_id); +/* CONTROL_REQUEST: START_INGEST */ +struct App_Error proto_write_start_ingest(struct Transport_Conn *conn, + uint16_t request_id, uint16_t stream_id, + uint16_t format, uint16_t width, uint16_t height, + uint16_t fps_n, uint16_t fps_d, + uint16_t transport_mode, + const char *device_path, const char *dest_host, uint16_t dest_port); + +/* CONTROL_REQUEST: STOP_INGEST */ +struct App_Error proto_write_stop_ingest(struct Transport_Conn *conn, + uint16_t request_id, uint16_t stream_id); + /* * CONTROL_RESPONSE: generic. * payload/payload_len are the command-specific bytes after request_id+status. @@ -325,6 +373,14 @@ struct App_Error proto_read_set_control_req( const uint8_t *payload, uint32_t length, struct Proto_Set_Control_Req *out); +struct App_Error proto_read_start_ingest( + const uint8_t *payload, uint32_t length, + struct Proto_Start_Ingest *out); + +struct App_Error proto_read_stop_ingest( + const uint8_t *payload, uint32_t length, + struct Proto_Stop_Ingest *out); + /* * Read the common 4-byte response header (request_id + status). * For responses with no extra fields (STREAM_OPEN, STREAM_CLOSE, SET_CONTROL), diff --git a/src/modules/protocol/protocol.c b/src/modules/protocol/protocol.c index 69bf750..1d7a0fb 100644 --- a/src/modules/protocol/protocol.c +++ b/src/modules/protocol/protocol.c @@ -300,6 +300,54 @@ struct App_Error proto_write_enum_monitors(struct Transport_Conn *conn, return transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, 4); } +struct App_Error proto_write_start_ingest(struct Transport_Conn *conn, + uint16_t request_id, uint16_t stream_id, + uint16_t format, uint16_t width, uint16_t height, + uint16_t fps_n, uint16_t fps_d, + uint16_t transport_mode, + const char *device_path, const char *dest_host, uint16_t dest_port) +{ + size_t dp_len = device_path ? strlen(device_path) : 0; + size_t dh_len = dest_host ? strlen(dest_host) : 0; + uint8_t dp_n = dp_len > 255u ? 255u : (uint8_t)dp_len; + uint8_t dh_n = dh_len > 255u ? 255u : (uint8_t)dh_len; + + /* 20 bytes fixed + 1+dp_n (device_path str8) + 1+dh_n (dest_host str8) */ + uint32_t total = 20u + 1u + dp_n + 1u + dh_n; + uint8_t *buf = malloc(total); + if (!buf) { return APP_SYSCALL_ERROR(); } + + uint32_t o = 0; + put_u16(buf, o, request_id); o += 2; + put_u16(buf, o, PROTO_CMD_START_INGEST); o += 2; + put_u16(buf, o, stream_id); o += 2; + put_u16(buf, o, format); o += 2; + put_u16(buf, o, width); o += 2; + put_u16(buf, o, height); o += 2; + put_u16(buf, o, fps_n); o += 2; + put_u16(buf, o, fps_d); o += 2; + put_u16(buf, o, dest_port); o += 2; + put_u16(buf, o, transport_mode); o += 2; + put_u8 (buf, o, dp_n); o += 1; + memcpy(buf + o, device_path, dp_n); o += dp_n; + put_u8 (buf, o, dh_n); o += 1; + memcpy(buf + o, dest_host, dh_n); o += dh_n; + + struct App_Error e = transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, total); + free(buf); + return e; +} + +struct App_Error proto_write_stop_ingest(struct Transport_Conn *conn, + uint16_t request_id, uint16_t stream_id) +{ + uint8_t buf[6]; + put_u16(buf, 0, request_id); + put_u16(buf, 2, PROTO_CMD_STOP_INGEST); + put_u16(buf, 4, stream_id); + return transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, 6); +} + struct App_Error proto_write_control_response(struct Transport_Conn *conn, uint16_t request_id, uint16_t status, const uint8_t *payload, uint32_t payload_len) @@ -515,6 +563,44 @@ struct App_Error proto_read_set_control_req( return APP_OK; } +struct App_Error proto_read_start_ingest( + const uint8_t *payload, uint32_t length, + struct Proto_Start_Ingest *out) +{ + /* Fixed portion: request_id(2) cmd(2) stream_id(2) format(2) width(2) + * height(2) fps_n(2) fps_d(2) dest_port(2) transport_mode(2) = 20 bytes, + * then two str8 fields. */ + struct Cursor c; + cur_init(&c, payload, length); + + out->request_id = cur_u16(&c); + /* skip command word at [2..3] */ + (void) cur_u16(&c); + out->stream_id = cur_u16(&c); + out->format = cur_u16(&c); + out->width = cur_u16(&c); + out->height = cur_u16(&c); + out->fps_n = cur_u16(&c); + out->fps_d = cur_u16(&c); + out->dest_port = cur_u16(&c); + out->transport_mode = cur_u16(&c); + out->device_path = cur_str8(&c, &out->device_path_len); + out->dest_host = cur_str8(&c, &out->dest_host_len); + CUR_CHECK(c); + + return APP_OK; +} + +struct App_Error proto_read_stop_ingest( + const uint8_t *payload, uint32_t length, + struct Proto_Stop_Ingest *out) +{ + if (length < 6) { return APP_INVALID_ERROR_MSG(0, "STOP_INGEST payload too short"); } + out->request_id = get_u16(payload, 0); + out->stream_id = get_u16(payload, 4); + return APP_OK; +} + struct App_Error proto_read_response_header( const uint8_t *payload, uint32_t length, struct Proto_Response_Header *out)