Commit 8d1f6377 authored by Sergey Lyubka's avatar Sergey Lyubka

Changed websocket_data() handler API. Buffering and passing whole websocket message.

parent 22dddc2e
...@@ -12,46 +12,37 @@ static void websocket_ready_handler(struct mg_connection *conn) { ...@@ -12,46 +12,37 @@ static void websocket_ready_handler(struct mg_connection *conn) {
mg_write(conn, buf, 2 + buf[1]); mg_write(conn, buf, 2 + buf[1]);
} }
static int websocket_data_handler(struct mg_connection *conn) { // Arguments:
unsigned char buf[200], reply[200]; // flags: first byte of websocket frame, see websocket RFC,
int n, i, mask_len, xor, msg_len, len; // http://tools.ietf.org/html/rfc6455, section 5.2
// data, data_len: payload data. Mask, if any, is already applied.
static int websocket_data_handler(struct mg_connection *conn, int flags,
char *data, size_t data_len) {
unsigned char reply[200];
size_t i;
// Read message from the client. (void) flags;
// Accept only small (<126 bytes) messages.
len = 0; printf("rcv: [%.*s]\n", (int) data_len, data);
msg_len = mask_len = 0;
for (;;) { // Truncate echoed message, to simplify output code.
if ((n = mg_read(conn, buf + len, sizeof(buf) - len)) <= 0) { if (data_len > 125) {
return 0; // Read error, close websocket data_len = 125;
}
len += n;
if (len >= 2) {
msg_len = buf[1] & 127;
mask_len = (buf[1] & 128) ? 4 : 0;
if (msg_len > 125) {
return 0; // Message is too long, close websocket
}
// If we've buffered the whole message, exit the loop
if (len >= 2 + mask_len + msg_len) {
break;
}
}
} }
// Prepare frame // Prepare frame
reply[0] = 0x81; // text, FIN set reply[0] = 0x81; // text, FIN set
reply[1] = msg_len; reply[1] = data_len;
// Copy message from request to reply, applying the mask if required. // Copy message from request to reply, applying the mask if required.
for (i = 0; i < msg_len; i++) { for (i = 0; i < data_len; i++) {
xor = mask_len == 0 ? 0 : buf[2 + (i % 4)]; reply[i + 2] = data[i];
reply[i + 2] = buf[i + 2 + mask_len] ^ xor;
} }
// Echo the message back to the client // Echo the message back to the client
mg_write(conn, reply, 2 + msg_len); mg_write(conn, reply, 2 + data_len);
// Returnint zero means stoping websocket conversation. // Returning zero means stoping websocket conversation.
// Close the conversation if client has sent us "exit" string. // Close the conversation if client has sent us "exit" string.
return memcmp(reply + 2, "exit", 4); return memcmp(reply + 2, "exit", 4);
} }
......
...@@ -3786,37 +3786,76 @@ static void send_websocket_handshake(struct mg_connection *conn) { ...@@ -3786,37 +3786,76 @@ static void send_websocket_handshake(struct mg_connection *conn) {
static void read_websocket(struct mg_connection *conn) { static void read_websocket(struct mg_connection *conn) {
unsigned char *buf = (unsigned char *) conn->buf + conn->request_len; unsigned char *buf = (unsigned char *) conn->buf + conn->request_len;
int n, len, mask_len, body_len, discard_len; int n;
size_t i, len, mask_len, data_len, header_len, body_len;
char mem[4 * 1024], *data;
assert(conn->content_len == 0);
for (;;) { for (;;) {
header_len = 0;
if ((body_len = conn->data_len - conn->request_len) >= 2) { if ((body_len = conn->data_len - conn->request_len) >= 2) {
len = buf[1] & 127; len = buf[1] & 127;
mask_len = buf[1] & 128 ? 4 : 0; mask_len = buf[1] & 128 ? 4 : 0;
if (len < 126) { if (len < 126 && body_len >= mask_len) {
conn->content_len = 2 + mask_len + len; data_len = len;
} else if (len == 126 && body_len >= 4) { header_len = 2 + mask_len;
conn->content_len = 4 + mask_len + ((((int) buf[2]) << 8) + buf[3]); } else if (len == 126 && body_len >= 4 + mask_len) {
} else if (body_len >= 10) { header_len = 4 + mask_len;
conn->content_len = 10 + mask_len + data_len = ((((int) buf[2]) << 8) + buf[3]);
(((uint64_t) htonl(* (uint32_t *) &buf[2])) << 32) + } else if (body_len >= 10 + mask_len) {
header_len = 10 + mask_len;
data_len = (((uint64_t) htonl(* (uint32_t *) &buf[2])) << 32) +
htonl(* (uint32_t *) &buf[6]); htonl(* (uint32_t *) &buf[6]);
} }
} }
if (conn->content_len > 0) { if (header_len > 0) {
if (conn->ctx->callbacks.websocket_data != NULL && // Allocate space to hold websocket payload
conn->ctx->callbacks.websocket_data(conn) == 0) { data = mem;
break; // Callback signalled to exit if (data_len > sizeof(mem) && (data = malloc(data_len)) == NULL) {
// Allocation failed, exit the loop and then close the connection
// TODO: notify user about the failure
break;
} }
discard_len = conn->content_len > body_len ?
body_len : (int) conn->content_len; // Read frame payload into the allocated buffer.
memmove(buf, buf + discard_len, conn->data_len - discard_len); assert(body_len >= header_len);
conn->data_len -= discard_len; if (data_len + header_len > body_len) {
conn->content_len = conn->consumed_content = 0; len = body_len - header_len;
memcpy(data, buf + header_len, len);
// TODO: handle pull error
pull(NULL, conn, data + len, data_len - len);
conn->data_len = 0;
} else {
len = data_len + header_len;
memcpy(data, buf + header_len, data_len);
memmove(buf, buf + len, body_len - len);
conn->data_len -= len;
}
// Apply mask if necessary
if (mask_len > 0) {
for (i = 0; i < data_len; i++) {
data[i] ^= buf[header_len - mask_len + (i % 4)];
}
}
// Exit the loop if callback signalled to exit,
// or "connection close" opcode received.
if ((conn->ctx->callbacks.websocket_data != NULL &&
!conn->ctx->callbacks.websocket_data(conn, buf[0], data, data_len)) ||
(buf[0] & 0xf) == 8) { // Opcode == 8, connection close
break;
}
if (data != mem) {
free(data);
}
// Not breaking the loop, process next websocket frame.
} else { } else {
n = pull(NULL, conn, conn->buf + conn->data_len, // Buffering websocket request
conn->buf_size - conn->data_len); if ((n = pull(NULL, conn, conn->buf + conn->data_len,
if (n <= 0) { conn->buf_size - conn->data_len)) <= 0) {
break; break;
} }
conn->data_len += n; conn->data_len += n;
......
...@@ -62,7 +62,8 @@ struct mg_callbacks { ...@@ -62,7 +62,8 @@ struct mg_callbacks {
int (*init_ssl)(void *ssl_context, void *user_data); int (*init_ssl)(void *ssl_context, void *user_data);
int (*websocket_connect)(const struct mg_connection *); int (*websocket_connect)(const struct mg_connection *);
void (*websocket_ready)(struct mg_connection *); void (*websocket_ready)(struct mg_connection *);
int (*websocket_data)(struct mg_connection *); int (*websocket_data)(struct mg_connection *, int flags,
char *data, size_t data_len);
const char * (*open_file)(const struct mg_connection *, const char * (*open_file)(const struct mg_connection *,
const char *path, size_t *data_len); const char *path, size_t *data_len);
void (*init_lua)(struct mg_connection *, void *lua_context); void (*init_lua)(struct mg_connection *, void *lua_context);
...@@ -90,7 +91,7 @@ struct mg_callbacks { ...@@ -90,7 +91,7 @@ struct mg_callbacks {
// }; // };
// struct mg_context *ctx = mg_start(&my_func, NULL, options); // struct mg_context *ctx = mg_start(&my_func, NULL, options);
// //
// Please refer to http://code.google.com/p/mongoose/wiki/MongooseManual // Refer to https://github.com/valenok/mongoose/blob/master/UserManual.md
// for the list of valid option and their possible values. // for the list of valid option and their possible values.
// //
// Return: // Return:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment