Add START_INGEST and STOP_INGEST protocol commands

START_INGEST carries stream_id, format/width/height/fps, dest_host:port,
transport_mode (encapsulated or opaque), and device_path. All format fields
default to 0 (auto-select). STOP_INGEST carries stream_id only.

Both commands set wanted state on the node; reconciliation is asynchronous.
Protocol doc updated with wire schemas for both commands.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-29 02:02:38 +00:00
parent 639a84b1b9
commit 6c9e0ce7dc
3 changed files with 200 additions and 0 deletions

View File

@@ -99,6 +99,8 @@ packet-beta
| `0x0005` | `GET_CONTROL` | Get a V4L2 control value | | `0x0005` | `GET_CONTROL` | Get a V4L2 control value |
| `0x0006` | `SET_CONTROL` | Set a V4L2 control value | | `0x0006` | `SET_CONTROL` | Set a V4L2 control value |
| `0x0007` | `ENUM_MONITORS` | List X11 monitors (XRandR) on the remote node | | `0x0007` | `ENUM_MONITORS` | List X11 monitors (XRandR) on the remote node |
| `0x0008` | `START_INGEST` | Set wanted state: open V4L2 device, connect outbound, begin streaming |
| `0x0009` | `STOP_INGEST` | Set wanted state: stop ingest stream and disconnect |
### `CONTROL_RESPONSE` (0x0003) ### `CONTROL_RESPONSE` (0x0003)
@@ -412,3 +414,59 @@ packet-beta
**Response** — no extra fields beyond request_id and status. **Response** — no extra fields beyond request_id and status.
For `MENU` and `INTEGER_MENU` controls, `value` must be a valid menu item `index` as returned by `ENUM_CONTROLS`. For `MENU` and `INTEGER_MENU` controls, `value` must be a valid menu item `index` as returned by `ENUM_CONTROLS`.
### `START_INGEST` (0x0008)
Sets wanted state on a source node: open the specified V4L2 device, configure the stream format, and connect outbound to the given sink.
**Request**:
```mermaid
%%{init: {'packet': {'bitsPerRow': 16}}}%%
packet-beta
0-15: "request_id"
16-31: "command = 0x0008"
32-47: "stream_id"
48-63: "format"
64-79: "width"
80-95: "height"
96-111: "fps_n"
112-127: "fps_d"
128-143: "dest_port"
144-159: "transport_mode"
160-167: "device_path_len"
168-175: "device_path …"
```
Followed by `dest_host` str8.
| Field | Description |
|---|---|
| `stream_id` | ID assigned by the controller; used in all subsequent `VIDEO_FRAME` messages |
| `format` | Codec format code (see [Codec Formats](#codec-formats)); `0` = auto-select best MJPEG |
| `width` | Capture width in pixels; `0` = auto-select |
| `height` | Capture height in pixels; `0` = auto-select |
| `fps_n` | Frame rate numerator; `0` = auto-select |
| `fps_d` | Frame rate denominator |
| `dest_port` | TCP port of the sink node to connect to |
| `transport_mode` | `0x0001` = encapsulated (framed); `0x0002` = opaque (raw byte stream) |
| `device_path` | str8 — path to the V4L2 device, e.g. `/dev/video0` |
| `dest_host` | str8 — hostname or IP of the sink node |
**Response** — no extra fields beyond request_id and status. `OK` means the wanted state was accepted; the node will reconcile asynchronously.
### `STOP_INGEST` (0x0009)
Sets wanted state: stop the ingest stream and disconnect from the sink.
**Request**:
```mermaid
%%{init: {'packet': {'bitsPerRow': 16}}}%%
packet-beta
0-15: "request_id"
16-31: "command = 0x0009"
32-47: "stream_id"
```
**Response** — no extra fields beyond request_id and status.

View File

@@ -24,6 +24,8 @@
#define PROTO_CMD_GET_CONTROL 0x0005u #define PROTO_CMD_GET_CONTROL 0x0005u
#define PROTO_CMD_SET_CONTROL 0x0006u #define PROTO_CMD_SET_CONTROL 0x0006u
#define PROTO_CMD_ENUM_MONITORS 0x0007u #define PROTO_CMD_ENUM_MONITORS 0x0007u
#define PROTO_CMD_START_INGEST 0x0008u
#define PROTO_CMD_STOP_INGEST 0x0009u
/* ------------------------------------------------------------------------- /* -------------------------------------------------------------------------
* Response status codes (carried in CONTROL_RESPONSE payload offset 2) * Response status codes (carried in CONTROL_RESPONSE payload offset 2)
@@ -66,6 +68,13 @@
#define PROTO_PIXEL_YUV420P 0x0004u #define PROTO_PIXEL_YUV420P 0x0004u
#define PROTO_PIXEL_YUV422 0x0005u #define PROTO_PIXEL_YUV422 0x0005u
/* -------------------------------------------------------------------------
* Transport mode codes (START_INGEST transport_mode field)
* ------------------------------------------------------------------------- */
#define PROTO_TRANSPORT_ENCAPSULATED 0x0001u /* framed: message_type + payload_length header */
#define PROTO_TRANSPORT_OPAQUE 0x0002u /* raw byte stream, no frame boundaries */
/* ------------------------------------------------------------------------- /* -------------------------------------------------------------------------
* Origin codes (STREAM_OPEN origin field; informational only) * Origin codes (STREAM_OPEN origin field; informational only)
* ------------------------------------------------------------------------- */ * ------------------------------------------------------------------------- */
@@ -196,6 +205,33 @@ struct Proto_Set_Control_Req {
int32_t value; int32_t value;
}; };
/*
* START_INGEST: controller tells a source node to open a V4L2 device and
* connect outbound to a sink at dest_host:dest_port.
* format/width/height/fps_n/fps_d of 0 mean auto-select.
* Strings point into the caller's payload buffer; not NUL-terminated.
*/
struct Proto_Start_Ingest {
uint16_t request_id;
uint16_t stream_id;
uint16_t format; /* PROTO_FORMAT_* code; 0 = auto (best MJPEG) */
uint16_t width; /* 0 = auto */
uint16_t height; /* 0 = auto */
uint16_t fps_n; /* 0 = auto */
uint16_t fps_d;
uint16_t dest_port;
uint16_t transport_mode; /* PROTO_TRANSPORT_ENCAPSULATED or PROTO_TRANSPORT_OPAQUE */
const char *device_path;
uint8_t device_path_len;
const char *dest_host;
uint8_t dest_host_len;
};
struct Proto_Stop_Ingest {
uint16_t request_id;
uint16_t stream_id;
};
struct Proto_Response_Header { struct Proto_Response_Header {
uint16_t request_id; uint16_t request_id;
uint16_t status; uint16_t status;
@@ -253,6 +289,18 @@ struct App_Error proto_write_set_control(struct Transport_Conn *conn,
struct App_Error proto_write_enum_monitors(struct Transport_Conn *conn, struct App_Error proto_write_enum_monitors(struct Transport_Conn *conn,
uint16_t request_id); uint16_t request_id);
/* CONTROL_REQUEST: START_INGEST */
struct App_Error proto_write_start_ingest(struct Transport_Conn *conn,
uint16_t request_id, uint16_t stream_id,
uint16_t format, uint16_t width, uint16_t height,
uint16_t fps_n, uint16_t fps_d,
uint16_t transport_mode,
const char *device_path, const char *dest_host, uint16_t dest_port);
/* CONTROL_REQUEST: STOP_INGEST */
struct App_Error proto_write_stop_ingest(struct Transport_Conn *conn,
uint16_t request_id, uint16_t stream_id);
/* /*
* CONTROL_RESPONSE: generic. * CONTROL_RESPONSE: generic.
* payload/payload_len are the command-specific bytes after request_id+status. * payload/payload_len are the command-specific bytes after request_id+status.
@@ -325,6 +373,14 @@ struct App_Error proto_read_set_control_req(
const uint8_t *payload, uint32_t length, const uint8_t *payload, uint32_t length,
struct Proto_Set_Control_Req *out); struct Proto_Set_Control_Req *out);
struct App_Error proto_read_start_ingest(
const uint8_t *payload, uint32_t length,
struct Proto_Start_Ingest *out);
struct App_Error proto_read_stop_ingest(
const uint8_t *payload, uint32_t length,
struct Proto_Stop_Ingest *out);
/* /*
* Read the common 4-byte response header (request_id + status). * Read the common 4-byte response header (request_id + status).
* For responses with no extra fields (STREAM_OPEN, STREAM_CLOSE, SET_CONTROL), * For responses with no extra fields (STREAM_OPEN, STREAM_CLOSE, SET_CONTROL),

View File

@@ -300,6 +300,54 @@ struct App_Error proto_write_enum_monitors(struct Transport_Conn *conn,
return transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, 4); return transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, 4);
} }
struct App_Error proto_write_start_ingest(struct Transport_Conn *conn,
uint16_t request_id, uint16_t stream_id,
uint16_t format, uint16_t width, uint16_t height,
uint16_t fps_n, uint16_t fps_d,
uint16_t transport_mode,
const char *device_path, const char *dest_host, uint16_t dest_port)
{
size_t dp_len = device_path ? strlen(device_path) : 0;
size_t dh_len = dest_host ? strlen(dest_host) : 0;
uint8_t dp_n = dp_len > 255u ? 255u : (uint8_t)dp_len;
uint8_t dh_n = dh_len > 255u ? 255u : (uint8_t)dh_len;
/* 20 bytes fixed + 1+dp_n (device_path str8) + 1+dh_n (dest_host str8) */
uint32_t total = 20u + 1u + dp_n + 1u + dh_n;
uint8_t *buf = malloc(total);
if (!buf) { return APP_SYSCALL_ERROR(); }
uint32_t o = 0;
put_u16(buf, o, request_id); o += 2;
put_u16(buf, o, PROTO_CMD_START_INGEST); o += 2;
put_u16(buf, o, stream_id); o += 2;
put_u16(buf, o, format); o += 2;
put_u16(buf, o, width); o += 2;
put_u16(buf, o, height); o += 2;
put_u16(buf, o, fps_n); o += 2;
put_u16(buf, o, fps_d); o += 2;
put_u16(buf, o, dest_port); o += 2;
put_u16(buf, o, transport_mode); o += 2;
put_u8 (buf, o, dp_n); o += 1;
memcpy(buf + o, device_path, dp_n); o += dp_n;
put_u8 (buf, o, dh_n); o += 1;
memcpy(buf + o, dest_host, dh_n); o += dh_n;
struct App_Error e = transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, total);
free(buf);
return e;
}
struct App_Error proto_write_stop_ingest(struct Transport_Conn *conn,
uint16_t request_id, uint16_t stream_id)
{
uint8_t buf[6];
put_u16(buf, 0, request_id);
put_u16(buf, 2, PROTO_CMD_STOP_INGEST);
put_u16(buf, 4, stream_id);
return transport_send_frame(conn, PROTO_MSG_CONTROL_REQUEST, buf, 6);
}
struct App_Error proto_write_control_response(struct Transport_Conn *conn, struct App_Error proto_write_control_response(struct Transport_Conn *conn,
uint16_t request_id, uint16_t status, uint16_t request_id, uint16_t status,
const uint8_t *payload, uint32_t payload_len) const uint8_t *payload, uint32_t payload_len)
@@ -515,6 +563,44 @@ struct App_Error proto_read_set_control_req(
return APP_OK; return APP_OK;
} }
struct App_Error proto_read_start_ingest(
const uint8_t *payload, uint32_t length,
struct Proto_Start_Ingest *out)
{
/* Fixed portion: request_id(2) cmd(2) stream_id(2) format(2) width(2)
* height(2) fps_n(2) fps_d(2) dest_port(2) transport_mode(2) = 20 bytes,
* then two str8 fields. */
struct Cursor c;
cur_init(&c, payload, length);
out->request_id = cur_u16(&c);
/* skip command word at [2..3] */
(void) cur_u16(&c);
out->stream_id = cur_u16(&c);
out->format = cur_u16(&c);
out->width = cur_u16(&c);
out->height = cur_u16(&c);
out->fps_n = cur_u16(&c);
out->fps_d = cur_u16(&c);
out->dest_port = cur_u16(&c);
out->transport_mode = cur_u16(&c);
out->device_path = cur_str8(&c, &out->device_path_len);
out->dest_host = cur_str8(&c, &out->dest_host_len);
CUR_CHECK(c);
return APP_OK;
}
struct App_Error proto_read_stop_ingest(
const uint8_t *payload, uint32_t length,
struct Proto_Stop_Ingest *out)
{
if (length < 6) { return APP_INVALID_ERROR_MSG(0, "STOP_INGEST payload too short"); }
out->request_id = get_u16(payload, 0);
out->stream_id = get_u16(payload, 4);
return APP_OK;
}
struct App_Error proto_read_response_header( struct App_Error proto_read_response_header(
const uint8_t *payload, uint32_t length, const uint8_t *payload, uint32_t length,
struct Proto_Response_Header *out) struct Proto_Response_Header *out)