Commit 98fb44f8 authored by Sergey Lyubka's avatar Sergey Lyubka

Refactored data buffering. Updated websocket example to have 2 message roundtrips.

parent edffc46b
...@@ -7,37 +7,53 @@ ...@@ -7,37 +7,53 @@
static void *callback(enum mg_event event, struct mg_connection *conn) { static void *callback(enum mg_event event, struct mg_connection *conn) {
if (event == MG_WEBSOCKET_READY) { if (event == MG_WEBSOCKET_READY) {
static const char *hello = "hello from mongoose! waiting for message ..."; unsigned char buf[40];
char frame[2]; buf[0] = 0x81;
buf[1] = snprintf((char *) buf + 2, sizeof(buf) - 2, "%s", "server ready");
// Prepare websocket frame. mg_write(conn, buf, 2 + buf[1]);
frame[0] = 0x81; // text frame return ""; // MG_WEBSOCKET_READY return value is ignored
frame[1] = strlen(hello); // length is < 126
// Write frame and a text message
mg_write(conn, frame, sizeof(frame));
mg_write(conn, hello, strlen(hello));
return "";
} else if (event == MG_WEBSOCKET_MESSAGE) { } else if (event == MG_WEBSOCKET_MESSAGE) {
unsigned char buf[500], reply[500]; unsigned char buf[200], reply[200];
int len, msg_len, i, mask_len, xor; int n, i, mask_len, xor, msg_len, len;
// Read message from the client and echo it back // Read message from the client.
if ((len = mg_read(conn, buf, sizeof(buf))) > 8) { // Accept only small (<126 bytes) messages.
msg_len = buf[1] & 127; len = 0;
mask_len = (buf[1] & 128) ? 4 : 0; msg_len = mask_len = 0;
if (msg_len < 126) { for (;;) {
reply[0] = 0x81; // text, FIN set if ((n = mg_read(conn, buf + len, sizeof(buf) - len)) <= 0) {
reply[1] = msg_len; return ""; // Read error, close websocket
for (i = 0; i < msg_len; i++) { }
xor = mask_len == 0 ? 0 : buf[2 + (i % 4)]; len += n;
reply[i + 2] = buf[i + 2 + mask_len] ^ xor; if (len >= 2) {
msg_len = buf[1] & 127;
mask_len = (buf[1] & 128) ? 4 : 0;
if (msg_len > 125) {
return ""; // Message is too long, close websocket
}
// If we've buffered the whole message, exit the loop
if (len >= 2 + mask_len + msg_len) {
break;
} }
mg_write(conn, reply, 2 + msg_len);
} }
} }
return ""; // Return non-NULL: stop websocket conversation // Prepare frame
reply[0] = 0x81; // text, FIN set
reply[1] = msg_len;
// Copy message from request to reply, applying the mask if required.
for (i = 0; i < msg_len; i++) {
xor = mask_len == 0 ? 0 : buf[2 + (i % 4)];
reply[i + 2] = buf[i + 2 + mask_len] ^ xor;
}
// Echo the message back to the client
mg_write(conn, reply, 2 + msg_len);
// Return non-NULL means stoping websocket conversation.
// Close the conversation if client has sent us "exit" string.
return memcmp(reply + 2, "exit", 4) == 0 ? "" : NULL;
} else { } else {
return NULL; return NULL;
} }
......
...@@ -2,6 +2,11 @@ ...@@ -2,6 +2,11 @@
<meta charset="utf-8" /> <meta charset="utf-8" />
<title>WebSocket Test</title> <title>WebSocket Test</title>
<script language="javascript" type="text/javascript"> <script language="javascript" type="text/javascript">
// This javascript code creates a websocket to the URI "/foo",
// sends a message to it, waits for the reply, and sends an "exit" message.
// Server must terminate the conversation after receiving "exit" message.
var writeToScreen = function(message) { var writeToScreen = function(message) {
var div = document.createElement('div'); var div = document.createElement('div');
div.innerHTML = message; div.innerHTML = message;
...@@ -22,7 +27,7 @@ ...@@ -22,7 +27,7 @@
websocket.onmessage = function(ev) { websocket.onmessage = function(ev) {
writeToScreen('<span style="color: blue;">RESPONSE: ' + ev.data + writeToScreen('<span style="color: blue;">RESPONSE: ' + ev.data +
' </span>'); ' </span>');
websocket.close(); websocket.send('exit');
}; };
websocket.onerror = function(ev) { websocket.onerror = function(ev) {
writeToScreen('<span style="color: red; ">ERROR: </span> ' + ev.data); writeToScreen('<span style="color: red; ">ERROR: </span> ' + ev.data);
......
...@@ -495,8 +495,6 @@ struct mg_connection { ...@@ -495,8 +495,6 @@ struct mg_connection {
int64_t consumed_content; // How many bytes of content have been read int64_t consumed_content; // How many bytes of content have been read
char *buf; // Buffer for received data char *buf; // Buffer for received data
char *path_info; // PATH_INFO part of the URL char *path_info; // PATH_INFO part of the URL
char *body; // Pointer to not-read yet buffered body data
char *next_request; // Pointer to the buffered next request
char *log_message; // Placeholder for the mongoose error log message char *log_message; // Placeholder for the mongoose error log message
int must_close; // 1 if connection must be closed int must_close; // 1 if connection must be closed
int buf_size; // Buffer size int buf_size; // Buffer size
...@@ -1456,13 +1454,10 @@ static int pull(FILE *fp, struct mg_connection *conn, char *buf, int len) { ...@@ -1456,13 +1454,10 @@ static int pull(FILE *fp, struct mg_connection *conn, char *buf, int len) {
int mg_read(struct mg_connection *conn, void *buf, size_t len) { int mg_read(struct mg_connection *conn, void *buf, size_t len) {
int n, buffered_len, nread; int n, buffered_len, nread;
const char *body;
assert(conn->next_request != NULL &&
conn->body != NULL &&
conn->next_request >= conn->body);
nread = 0; nread = 0;
if (conn->consumed_content < conn->content_len) { if (conn->consumed_content < conn->content_len) {
// Adjust number of bytes to read. // Adjust number of bytes to read.
int64_t to_read = conn->content_len - conn->consumed_content; int64_t to_read = conn->content_len - conn->consumed_content;
if (to_read < (int64_t) len) { if (to_read < (int64_t) len) {
...@@ -1470,14 +1465,14 @@ int mg_read(struct mg_connection *conn, void *buf, size_t len) { ...@@ -1470,14 +1465,14 @@ int mg_read(struct mg_connection *conn, void *buf, size_t len) {
} }
// Return buffered data // Return buffered data
buffered_len = conn->next_request - conn->body; body = conn->buf + conn->request_len + conn->consumed_content;
buffered_len = &conn->buf[conn->data_len] - body;
if (buffered_len > 0) { if (buffered_len > 0) {
if (len < (size_t) buffered_len) { if (len < (size_t) buffered_len) {
buffered_len = (int) len; buffered_len = (int) len;
} }
memcpy(buf, conn->body, (size_t) buffered_len); memcpy(buf, body, (size_t) buffered_len);
len -= buffered_len; len -= buffered_len;
conn->body += buffered_len;
conn->consumed_content += buffered_len; conn->consumed_content += buffered_len;
nread += buffered_len; nread += buffered_len;
buf = (char *) buf + buffered_len; buf = (char *) buf + buffered_len;
...@@ -2863,7 +2858,7 @@ static int is_not_modified(const struct mg_connection *conn, ...@@ -2863,7 +2858,7 @@ static int is_not_modified(const struct mg_connection *conn,
static int forward_body_data(struct mg_connection *conn, FILE *fp, static int forward_body_data(struct mg_connection *conn, FILE *fp,
SOCKET sock, SSL *ssl) { SOCKET sock, SSL *ssl) {
const char *expect; const char *expect, *body;
char buf[MG_BUF_LEN]; char buf[MG_BUF_LEN];
int to_read, nread, buffered_len, success = 0; int to_read, nread, buffered_len, success = 0;
...@@ -2879,7 +2874,8 @@ static int forward_body_data(struct mg_connection *conn, FILE *fp, ...@@ -2879,7 +2874,8 @@ static int forward_body_data(struct mg_connection *conn, FILE *fp,
(void) mg_printf(conn, "%s", "HTTP/1.1 100 Continue\r\n\r\n"); (void) mg_printf(conn, "%s", "HTTP/1.1 100 Continue\r\n\r\n");
} }
buffered_len = conn->next_request - conn->body; body = conn->buf + conn->request_len + conn->consumed_content;
buffered_len = &conn->buf[conn->data_len] - body;
assert(buffered_len >= 0); assert(buffered_len >= 0);
assert(conn->consumed_content == 0); assert(conn->consumed_content == 0);
...@@ -2887,9 +2883,8 @@ static int forward_body_data(struct mg_connection *conn, FILE *fp, ...@@ -2887,9 +2883,8 @@ static int forward_body_data(struct mg_connection *conn, FILE *fp,
if ((int64_t) buffered_len > conn->content_len) { if ((int64_t) buffered_len > conn->content_len) {
buffered_len = (int) conn->content_len; buffered_len = (int) conn->content_len;
} }
push(fp, sock, ssl, conn->body, (int64_t) buffered_len); push(fp, sock, ssl, body, (int64_t) buffered_len);
conn->consumed_content += buffered_len; conn->consumed_content += buffered_len;
conn->body += buffered_len;
} }
nread = 0; nread = 0;
...@@ -3663,8 +3658,8 @@ static void send_websocket_handshake(struct mg_connection *conn) { ...@@ -3663,8 +3658,8 @@ static void send_websocket_handshake(struct mg_connection *conn) {
} }
static void read_websocket(struct mg_connection *conn) { static void read_websocket(struct mg_connection *conn) {
unsigned char *mask, *buf = (unsigned char *) conn->body; unsigned char *mask, *buf = (unsigned char *) conn->buf + conn->request_len;
int n, len, mask_len, body_len; int n, len, mask_len, body_len, discard_len;
for (;;) { for (;;) {
if ((body_len = conn->data_len - conn->request_len) >= 2) { if ((body_len = conn->data_len - conn->request_len) >= 2) {
...@@ -3685,10 +3680,13 @@ static void read_websocket(struct mg_connection *conn) { ...@@ -3685,10 +3680,13 @@ static void read_websocket(struct mg_connection *conn) {
} }
if (conn->content_len > 0) { if (conn->content_len > 0) {
conn->next_request = conn->buf + conn->data_len;
if (call_user(conn, MG_WEBSOCKET_MESSAGE) != NULL) { if (call_user(conn, MG_WEBSOCKET_MESSAGE) != NULL) {
break; // Callback signalled to exit break; // Callback signalled to exit
} }
discard_len = conn->content_len > body_len ? body_len : conn->content_len;
memmove(buf, buf + discard_len, conn->data_len - discard_len);
conn->data_len -= discard_len;
conn->content_len = conn->consumed_content = 0;
} else { } else {
if (wait_until_socket_is_readable(conn) == 0) { if (wait_until_socket_is_readable(conn) == 0) {
break; break;
...@@ -4184,12 +4182,10 @@ static int set_acl_option(struct mg_context *ctx) { ...@@ -4184,12 +4182,10 @@ static int set_acl_option(struct mg_context *ctx) {
} }
static void reset_per_request_attributes(struct mg_connection *conn) { static void reset_per_request_attributes(struct mg_connection *conn) {
conn->path_info = conn->body = conn->next_request = conn->log_message = NULL; conn->path_info = conn->log_message = NULL;
conn->num_bytes_sent = conn->consumed_content = 0; conn->num_bytes_sent = conn->consumed_content = 0;
conn->content_len = -1;
conn->request_len = conn->data_len = 0;
conn->must_close = 0;
conn->status_code = -1; conn->status_code = -1;
conn->must_close = conn->request_len = 0;
} }
static void close_socket_gracefully(struct mg_connection *conn) { static void close_socket_gracefully(struct mg_connection *conn) {
...@@ -4337,7 +4333,7 @@ static int is_valid_uri(const char *uri) { ...@@ -4337,7 +4333,7 @@ static int is_valid_uri(const char *uri) {
static void process_new_connection(struct mg_connection *conn) { static void process_new_connection(struct mg_connection *conn) {
struct mg_request_info *ri = &conn->request_info; struct mg_request_info *ri = &conn->request_info;
int keep_alive_enabled, buffered_len; int keep_alive_enabled, discard_len;
const char *cl; const char *cl;
keep_alive_enabled = !strcmp(conn->ctx->config[ENABLE_KEEP_ALIVE], "yes"); keep_alive_enabled = !strcmp(conn->ctx->config[ENABLE_KEEP_ALIVE], "yes");
...@@ -4353,8 +4349,6 @@ static void process_new_connection(struct mg_connection *conn) { ...@@ -4353,8 +4349,6 @@ static void process_new_connection(struct mg_connection *conn) {
} if (conn->request_len <= 0) { } if (conn->request_len <= 0) {
return; // Remote end closed the connection return; // Remote end closed the connection
} }
conn->body = conn->next_request = conn->buf + conn->request_len;
if (parse_http_request(conn->buf, conn->buf_size, ri) <= 0 || if (parse_http_request(conn->buf, conn->buf_size, ri) <= 0 ||
!is_valid_uri(ri->uri)) { !is_valid_uri(ri->uri)) {
// Do not put garbage in the access log, just send it back to the client // Do not put garbage in the access log, just send it back to the client
...@@ -4368,19 +4362,14 @@ static void process_new_connection(struct mg_connection *conn) { ...@@ -4368,19 +4362,14 @@ static void process_new_connection(struct mg_connection *conn) {
log_access(conn); log_access(conn);
} else { } else {
// Request is valid, handle it // Request is valid, handle it
cl = get_header(ri, "Content-Length"); if ((cl = get_header(ri, "Content-Length")) != NULL) {
conn->content_len = cl == NULL ? -1 : strtoll(cl, NULL, 10); conn->content_len = strtoll(cl, NULL, 10);
} else if (!mg_strcasecmp(ri->request_method, "POST") ||
// Set pointer to the next buffered request !mg_strcasecmp(ri->request_method, "PUT")) {
buffered_len = conn->data_len - conn->request_len; conn->content_len = -1;
assert(buffered_len >= 0);
if (conn->content_len <= 0) {
} else if (conn->content_len < (int64_t) buffered_len) {
conn->next_request += conn->content_len;
} else { } else {
conn->next_request += buffered_len; conn->content_len = 0;
} }
conn->birth_time = time(NULL); conn->birth_time = time(NULL);
handle_request(conn); handle_request(conn);
call_user(conn, MG_REQUEST_COMPLETE); call_user(conn, MG_REQUEST_COMPLETE);
...@@ -4391,12 +4380,17 @@ static void process_new_connection(struct mg_connection *conn) { ...@@ -4391,12 +4380,17 @@ static void process_new_connection(struct mg_connection *conn) {
} }
// Discard all buffered data for this request // Discard all buffered data for this request
assert(conn->next_request >= conn->buf); discard_len = conn->content_len >= 0 &&
assert(conn->data_len >= conn->next_request - conn->buf); conn->request_len + conn->content_len < conn->data_len ?
conn->data_len -= conn->next_request - conn->buf; conn->request_len + conn->content_len : conn->data_len;
memmove(conn->buf, conn->next_request, (size_t) conn->data_len); memmove(conn->buf, conn->buf + discard_len, conn->data_len - discard_len);
conn->data_len -= discard_len;
assert(conn->data_len >= 0);
assert(conn->data_len <= conn->buf_size);
} while (conn->ctx->stop_flag == 0 && } while (conn->ctx->stop_flag == 0 &&
keep_alive_enabled && keep_alive_enabled &&
conn->content_len >= 0 &&
should_keep_alive(conn)); should_keep_alive(conn));
} }
......
#define USE_WEBSOCKET
#include "mongoose.c" #include "mongoose.c"
#define FATAL(str, line) do { \ #define FATAL(str, line) do { \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment