Commit 0e49214c authored by Sergey Lyubka's avatar Sergey Lyubka

Added mutex protection for write calls

parent 914606a0
......@@ -59,10 +59,15 @@ typedef unsigned int uint32_t;
typedef unsigned short uint16_t;
typedef unsigned __int64 uint64_t;
typedef __int64 int64_t;
typedef CRITICAL_SECTION mutex_t;
#pragma comment(lib, "ws2_32.lib")
#define snprintf _snprintf
#define vsnprintf _vsnprintf
#define INT64_FMT "I64d"
#define mutex_init(x) InitializeCriticalSection(x)
#define mutex_destroy(x) DeleteCriticalSection(x)
#define mutex_lock(x) EnterCriticalSection(x)
#define mutex_unlock(x) LeaveCriticalSection(x)
#ifndef va_copy
#define va_copy(x,y) x = y
#endif // MINGW #defines va_copy
......@@ -78,6 +83,11 @@ typedef __int64 int64_t;
#include <sys/select.h>
#define closesocket(x) close(x)
typedef int sock_t;
typedef pthread_mutex_t mutex_t;
#define mutex_init(x) pthread_mutex_init(x, NULL)
#define mutex_destroy(x) pthread_mutex_destroy(x)
#define mutex_lock(x) pthread_mutex_lock(x)
#define mutex_unlock(x) pthread_mutex_unlock(x)
#define INVALID_SOCKET ((sock_t) -1)
#define INT64_FMT PRId64
#endif
......@@ -102,9 +112,13 @@ struct linked_list_link { struct linked_list_link *prev, *next; };
#define MAX_REQUEST_SIZE 16384
#define IOBUF_SIZE 8192
#define MAX_PATH_SIZE 8192
#ifdef ENABLE_DBG
#define DBG(x) do { printf("%s::%s() ", __FILE__, __func__); \
printf x; putchar('\n'); fflush(stdout); } while(0)
//#define DBG(x)
#else
#define DBG(x)
#endif
union socket_address {
struct sockaddr sa;
......@@ -156,7 +170,7 @@ enum connection_flags { CONN_CLOSE = 1, CONN_SPOOL_DONE = 2 };
struct mg_connection {
struct linked_list_link link; // Linkage to server->active_connections
struct mg_server *server;
struct mg_event event;
struct mg_event event; // NOTE(lsm): this has conn_data attribute
sock_t client_sock; // Connected client
union socket_address csa; // Client's socket address
struct iobuf local_iobuf;
......@@ -168,6 +182,7 @@ struct mg_connection {
struct mg_request_info request_info;
char *path_info;
int request_len;
mutex_t mutex; // Guards concurrent mg_write() and mg_printf() calls
};
static const char *static_config_options[] = {
......@@ -476,8 +491,11 @@ static struct mg_connection *accept_new_connection(struct mg_server *server) {
struct mg_connection *conn = NULL;
if ((sock = accept(server->listening_sock, &sa.sa, &len)) == INVALID_SOCKET) {
#if 0
} else if (sock >= FD_SETSIZE) {
DBG((">fd_setsize"));
closesocket(sock);
#endif
} else if (!check_acl(server->config_options[ACCESS_CONTROL_LIST],
ntohl(* (uint32_t *) &sa.sin.sin_addr))) {
closesocket(sock);
......@@ -495,6 +513,7 @@ static struct mg_connection *accept_new_connection(struct mg_server *server) {
conn->local_iobuf.size = MAX_REQUEST_SIZE;
conn->remote_iobuf.buf = (char *) calloc(1, IOBUF_SIZE);
conn->remote_iobuf.size = IOBUF_SIZE;
mutex_init(&conn->mutex);
LINKED_LIST_ADD_TO_FRONT(&server->active_connections, &conn->link);
DBG(("added conn %p", conn));
}
......@@ -507,6 +526,7 @@ static void close_conn(struct mg_connection *conn) {
LINKED_LIST_REMOVE(&conn->link);
closesocket(conn->client_sock);
free(conn->remote_iobuf.buf);
mutex_destroy(&conn->mutex);
free(conn);
}
......@@ -756,12 +776,12 @@ static int convert_uri_to_file_name(struct mg_connection *conn, char *buf,
}
static int spool(struct iobuf *io, const void *buf, int len) {
char *ptr = io->buf;
char *p = io->buf;
if (len <= 0) {
} else if (len < io->size - io->len ||
(ptr = realloc(io->buf, io->len + len - io->size)) != NULL) {
io->buf = ptr;
(p = (char *) realloc(io->buf, io->len + len - io->size)) != 0) {
io->buf = p;
memcpy(io->buf + io->len, buf, len);
io->len += len;
} else {
......@@ -796,13 +816,21 @@ int mg_printf(struct mg_connection *conn, const char *fmt, ...) {
va_list ap;
int ret;
va_start(ap, fmt);
mutex_lock(&conn->mutex);
ret = vspool(&conn->remote_iobuf, fmt, ap);
mutex_unlock(&conn->mutex);
va_end(ap);
send(conn->server->ctl[1], ".", 1, 0); // Wake up select call
return ret;
}
int mg_write(struct mg_connection *conn, const void *buf, int len) {
return spool(&conn->remote_iobuf, buf, len);
int ret;
mutex_lock(&conn->mutex);
ret = spool(&conn->remote_iobuf, buf, len);
mutex_unlock(&conn->mutex);
send(conn->server->ctl[1], ".", 1, 0); // Wake up select call
return ret;
}
static int is_error(int n) {
......@@ -983,6 +1011,13 @@ void mg_poll_server(struct mg_server *server, unsigned int milliseconds) {
tv.tv_usec = (milliseconds % 1000) * 1000;
if (select(max_fd + 1, &read_set, &write_set, NULL, &tv) > 0) {
// If control socket is set, just read from it. It meant to wake up
// this select loop when another thread writes to any connection
if (FD_ISSET(server->ctl[0], &read_set)) {
char buf[500];
recv(server->ctl[0], buf, sizeof(buf), 0);
}
// Accept new connections
if (FD_ISSET(server->listening_sock, &read_set)) {
while ((conn = accept_new_connection(server)) != NULL) {
......@@ -1080,6 +1115,8 @@ struct mg_server *mg_create_server(const char *opts[], mg_event_handler_t func,
return server;
}
// End of library, start of the application code
static int event_handler(struct mg_event *ev) {
mg_printf(ev->conn, "%s", "HTTP/1.0 200 OK\r\n\r\n:-)\n");
return 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment