Commit 7519b2ef authored by Dmitry Frank's avatar Dmitry Frank Committed by Cesanta Bot

Improve websocket implementation

CL: Mongoose Web Server: Websocket: Respond to Ping with Pong
CL: Mongoose Web Server: Websocket: Properly close a connection with Close frame (in response to a client's close and when protocol failure is detected)
CL: Mongoose Web Server: Websocket: Fix support of fragmented messages
CL: Mongoose Web Server: Websocket: Add support for control frames interjected in the middle of a fragmented message

PUBLISHED_FROM=e2b3794aaacc64633540c493194cccc62afa2077
parent ce8657f1
...@@ -7,8 +7,6 @@ signature: | ...@@ -7,8 +7,6 @@ signature: |
const struct mg_str *strings, int num_strings); const struct mg_str *strings, int num_strings);
--- ---
Sends multiple websocket frames. Like `mg_send_websocket_frame()`, but composes a single frame from multiple
buffers.
Like `mg_send_websocket_frame()`, but composes a frame from multiple
*buffers.
...@@ -5734,7 +5734,13 @@ struct mg_reverse_proxy_data { ...@@ -5734,7 +5734,13 @@ struct mg_reverse_proxy_data {
}; };
struct mg_ws_proto_data { struct mg_ws_proto_data {
size_t reass_len; /* Defragmented size of the frame so far. */ /*
* Defragmented size of the frame so far.
*
* First byte of nc->recv_mbuf.buf is an op, the rest of the data is
* defragmented data.
*/
size_t reass_len;
}; };
struct mg_http_proto_data { struct mg_http_proto_data {
...@@ -9569,12 +9575,23 @@ MG_INTERNAL void mg_handle_put(struct mg_connection *nc, const char *path, ...@@ -9569,12 +9575,23 @@ MG_INTERNAL void mg_handle_put(struct mg_connection *nc, const char *path,
#define MG_WEBSOCKET_PING_INTERVAL_SECONDS 5 #define MG_WEBSOCKET_PING_INTERVAL_SECONDS 5
#endif #endif
#define FLAGS_MASK_FIN (1 << 7)
#define FLAGS_MASK_OP 0x0f
static int mg_is_ws_fragment(unsigned char flags) { static int mg_is_ws_fragment(unsigned char flags) {
return (flags & 0x80) == 0 || (flags & 0x0f) == 0; return (flags & FLAGS_MASK_FIN) == 0 ||
(flags & FLAGS_MASK_OP) == WEBSOCKET_OP_CONTINUE;
} }
static int mg_is_ws_first_fragment(unsigned char flags) { static int mg_is_ws_first_fragment(unsigned char flags) {
return (flags & 0x80) == 0 && (flags & 0x0f) != 0; return (flags & FLAGS_MASK_FIN) == 0 &&
(flags & FLAGS_MASK_OP) != WEBSOCKET_OP_CONTINUE;
}
static int mg_is_ws_control_frame(unsigned char flags) {
unsigned char op = (flags & FLAGS_MASK_OP);
return op == WEBSOCKET_OP_CLOSE || op == WEBSOCKET_OP_PING ||
op == WEBSOCKET_OP_PONG;
} }
static void mg_handle_incoming_websocket_frame(struct mg_connection *nc, static void mg_handle_incoming_websocket_frame(struct mg_connection *nc,
...@@ -9591,94 +9608,166 @@ static struct mg_ws_proto_data *mg_ws_get_proto_data(struct mg_connection *nc) { ...@@ -9591,94 +9608,166 @@ static struct mg_ws_proto_data *mg_ws_get_proto_data(struct mg_connection *nc) {
return (htd != NULL ? &htd->ws_data : NULL); return (htd != NULL ? &htd->ws_data : NULL);
} }
/*
* Sends a Close websocket frame with the given data, and closes the underlying
* connection. If `len` is ~0, strlen(data) is used.
*/
static void mg_ws_close(struct mg_connection *nc, const void *data,
size_t len) {
if ((int) len == ~0) {
len = strlen((const char *) data);
}
mg_send_websocket_frame(nc, WEBSOCKET_OP_CLOSE, data, len);
nc->flags |= MG_F_SEND_AND_CLOSE;
}
static int mg_deliver_websocket_data(struct mg_connection *nc) { static int mg_deliver_websocket_data(struct mg_connection *nc) {
/* Using unsigned char *, cause of integer arithmetic below */ /* Using unsigned char *, cause of integer arithmetic below */
uint64_t i, data_len = 0, frame_len = 0, buf_len = nc->recv_mbuf.len, len, uint64_t i, data_len = 0, frame_len = 0, new_data_len = nc->recv_mbuf.len,
mask_len = 0, header_len = 0; len, mask_len = 0, header_len = 0;
unsigned char *p = (unsigned char *) nc->recv_mbuf.buf, *buf = p,
*e = p + buf_len;
struct mg_ws_proto_data *wsd = mg_ws_get_proto_data(nc); struct mg_ws_proto_data *wsd = mg_ws_get_proto_data(nc);
int ok; unsigned char *new_data = (unsigned char *) nc->recv_mbuf.buf,
int reass = buf_len > 0 && mg_is_ws_fragment(p[0]) && *e = (unsigned char *) nc->recv_mbuf.buf + nc->recv_mbuf.len;
!(nc->flags & MG_F_WEBSOCKET_NO_DEFRAG); uint8_t flags;
int ok, reass;
if (wsd->reass_len > 0) {
/*
* We already have some previously received data which we need to
* reassemble and deliver to the client code when we get the final
* fragment.
*
* NOTE: it doesn't mean that the current message must be a continuation:
* it might be a control frame (Close, Ping or Pong), which should be
* handled without breaking the fragmented message.
*/
size_t existing_len = wsd->reass_len;
assert(new_data_len >= existing_len);
/* If that's a continuation frame that must be reassembled, handle it */ new_data += existing_len;
if (reass && !mg_is_ws_first_fragment(p[0]) && buf_len >= 1 && new_data_len -= existing_len;
buf_len >= 1 + wsd->reass_len) { }
buf += 1 + wsd->reass_len;
buf_len -= 1 + wsd->reass_len; flags = new_data[0];
reass = new_data_len > 0 && mg_is_ws_fragment(flags) &&
!(nc->flags & MG_F_WEBSOCKET_NO_DEFRAG);
if (reass && mg_is_ws_control_frame(flags)) {
/*
* Control frames can't be fragmented, so if we encounter fragmented
* control frame, close connection immediately.
*/
mg_ws_close(nc, "fragmented control frames are illegal", ~0);
return 0;
} else if (new_data_len > 0 && !reass && !mg_is_ws_control_frame(flags) &&
wsd->reass_len > 0) {
/*
* When in the middle of a fragmented message, only the continuations
* and control frames are allowed.
*/
mg_ws_close(nc, "non-continuation in the middle of a fragmented message",
~0);
return 0;
} }
if (buf_len >= 2) { if (new_data_len >= 2) {
len = buf[1] & 0x7f; len = new_data[1] & 0x7f;
mask_len = buf[1] & 0x80 ? 4 : 0; mask_len = new_data[1] & FLAGS_MASK_FIN ? 4 : 0;
if (len < 126 && buf_len >= mask_len) { if (len < 126 && new_data_len >= mask_len) {
data_len = len; data_len = len;
header_len = 2 + mask_len; header_len = 2 + mask_len;
} else if (len == 126 && buf_len >= 4 + mask_len) { } else if (len == 126 && new_data_len >= 4 + mask_len) {
header_len = 4 + mask_len; header_len = 4 + mask_len;
data_len = ntohs(*(uint16_t *) &buf[2]); data_len = ntohs(*(uint16_t *) &new_data[2]);
} else if (buf_len >= 10 + mask_len) { } else if (new_data_len >= 10 + mask_len) {
header_len = 10 + mask_len; header_len = 10 + mask_len;
data_len = (((uint64_t) ntohl(*(uint32_t *) &buf[2])) << 32) + data_len = (((uint64_t) ntohl(*(uint32_t *) &new_data[2])) << 32) +
ntohl(*(uint32_t *) &buf[6]); ntohl(*(uint32_t *) &new_data[6]);
} }
} }
frame_len = header_len + data_len; frame_len = header_len + data_len;
ok = (frame_len > 0 && frame_len <= buf_len); ok = (frame_len > 0 && frame_len <= new_data_len);
/* Check for overflow */ /* Check for overflow */
if (frame_len < header_len || frame_len < data_len) { if (frame_len < header_len || frame_len < data_len) {
ok = 0; ok = 0;
nc->flags |= MG_F_CLOSE_IMMEDIATELY; mg_ws_close(nc, "overflowed message", ~0);
} }
if (ok) { if (ok) {
size_t cleanup_len = 0;
struct websocket_message wsm; struct websocket_message wsm;
wsm.size = (size_t) data_len; wsm.size = (size_t) data_len;
wsm.data = buf + header_len; wsm.data = new_data + header_len;
wsm.flags = buf[0]; wsm.flags = flags;
/* Apply mask if necessary */ /* Apply mask if necessary */
if (mask_len > 0) { if (mask_len > 0) {
for (i = 0; i < data_len; i++) { for (i = 0; i < data_len; i++) {
buf[i + header_len] ^= (buf + header_len - mask_len)[i % 4]; new_data[i + header_len] ^= (new_data + header_len - mask_len)[i % 4];
} }
} }
if (reass) { if (reass) {
/* On first fragmented frame, nullify size */ /* This is a message fragment */
if (mg_is_ws_first_fragment(wsm.flags)) {
p[0] &= ~0x0f; /* Next frames will be treated as continuation */ if (mg_is_ws_first_fragment(flags)) {
buf = p + 1; /*
wsd->reass_len = 0; * On the first fragmented frame, skip the first byte (op) and also
* reset size to 1 (op), it'll be incremented with the data len below.
*/
new_data += 1;
wsd->reass_len = 1 /* op */;
} }
/* Append this frame to the reassembled buffer */ /* Append this frame to the reassembled buffer */
memmove(buf, wsm.data, e - wsm.data); memmove(new_data, wsm.data, e - wsm.data);
wsd->reass_len += wsm.size; wsd->reass_len += wsm.size;
nc->recv_mbuf.len -= wsm.data - buf; nc->recv_mbuf.len -= wsm.data - new_data;
if (flags & FLAGS_MASK_FIN) {
/* On last fragmented frame - call user handler and remove data */
wsm.flags = FLAGS_MASK_FIN | nc->recv_mbuf.buf[0];
wsm.data = (unsigned char *) nc->recv_mbuf.buf + 1 /* op */;
wsm.size = wsd->reass_len - 1 /* op */;
cleanup_len = wsd->reass_len;
wsd->reass_len = 0;
/* On last fragmented frame - call user handler and remove data */ /* Pass reassembled message to the client code. */
if (wsm.flags & 0x80) {
wsm.data = p + 1;
wsm.size = wsd->reass_len;
mg_handle_incoming_websocket_frame(nc, &wsm); mg_handle_incoming_websocket_frame(nc, &wsm);
mbuf_remove(&nc->recv_mbuf, 1 + wsd->reass_len); mbuf_remove(&nc->recv_mbuf, cleanup_len); /* Cleanup frame */
wsd->reass_len = 0;
} }
} else { } else {
/* TODO(lsm): properly handle OOB control frames during defragmentation */ /*
* This is a complete message, not a fragment. It might happen in between
* of a fragmented message (in this case, WebSocket protocol requires
* current message to be a control frame).
*/
cleanup_len = (size_t) frame_len;
/* First of all, check if we need to react on a control frame. */
switch (flags & FLAGS_MASK_OP) {
case WEBSOCKET_OP_PING:
mg_send_websocket_frame(nc, WEBSOCKET_OP_PONG, wsm.data, wsm.size);
break;
case WEBSOCKET_OP_CLOSE:
mg_ws_close(nc, wsm.data, wsm.size);
break;
}
/* Pass received message to the client code. */
mg_handle_incoming_websocket_frame(nc, &wsm); mg_handle_incoming_websocket_frame(nc, &wsm);
mbuf_remove(&nc->recv_mbuf, (size_t) frame_len); /* Cleanup frame */
wsd->reass_len = 0;
}
/* If the frame is not reassembled - client closes and close too */ /* Cleanup frame */
if (!reass && (buf[0] & 0x0f) == WEBSOCKET_OP_CLOSE) { memmove(nc->recv_mbuf.buf + wsd->reass_len,
nc->flags |= MG_F_SEND_AND_CLOSE; nc->recv_mbuf.buf + wsd->reass_len + cleanup_len,
nc->recv_mbuf.len - wsd->reass_len - cleanup_len);
nc->recv_mbuf.len -= cleanup_len;
} }
} }
...@@ -9723,7 +9812,8 @@ static void mg_send_ws_header(struct mg_connection *nc, int op, size_t len, ...@@ -9723,7 +9812,8 @@ static void mg_send_ws_header(struct mg_connection *nc, int op, size_t len,
int header_len; int header_len;
unsigned char header[10]; unsigned char header[10];
header[0] = (op & WEBSOCKET_DONT_FIN ? 0x0 : 0x80) + (op & 0x0f); header[0] =
(op & WEBSOCKET_DONT_FIN ? 0x0 : FLAGS_MASK_FIN) | (op & FLAGS_MASK_OP);
if (len < 126) { if (len < 126) {
header[1] = (unsigned char) len; header[1] = (unsigned char) len;
header_len = 2; header_len = 2;
......
...@@ -4527,10 +4527,8 @@ void mg_send_websocket_frame(struct mg_connection *nc, int op_and_flags, ...@@ -4527,10 +4527,8 @@ void mg_send_websocket_frame(struct mg_connection *nc, int op_and_flags,
const void *data, size_t data_len); const void *data, size_t data_len);
/* /*
* Sends multiple websocket frames. * Like `mg_send_websocket_frame()`, but composes a single frame from multiple
* * buffers.
* Like `mg_send_websocket_frame()`, but composes a frame from multiple
*buffers.
*/ */
void mg_send_websocket_framev(struct mg_connection *nc, int op_and_flags, void mg_send_websocket_framev(struct mg_connection *nc, int op_and_flags,
const struct mg_str *strings, int num_strings); const struct mg_str *strings, int num_strings);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment