Commit bba29d32 authored by Sergey Lyubka's avatar Sergey Lyubka

Using socketpair for inter-thread communication

parent 2f1f4088
...@@ -152,7 +152,6 @@ typedef long off_t; ...@@ -152,7 +152,6 @@ typedef long off_t;
#endif // !fileno MINGW #defines fileno #endif // !fileno MINGW #defines fileno
typedef HANDLE pthread_mutex_t; typedef HANDLE pthread_mutex_t;
typedef struct {HANDLE signal, broadcast;} pthread_cond_t;
typedef DWORD pthread_t; typedef DWORD pthread_t;
#define pid_t HANDLE // MINGW typedefs pid_t to int. Using #define here. #define pid_t HANDLE // MINGW typedefs pid_t to int. Using #define here.
...@@ -252,6 +251,8 @@ typedef int SOCKET; ...@@ -252,6 +251,8 @@ typedef int SOCKET;
#define MAX_REQUEST_SIZE 16384 #define MAX_REQUEST_SIZE 16384
#define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0])) #define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0]))
typedef SOCKET sock_t;
#ifdef DEBUG_TRACE #ifdef DEBUG_TRACE
#undef DEBUG_TRACE #undef DEBUG_TRACE
#define DEBUG_TRACE(x) #define DEBUG_TRACE(x)
...@@ -407,19 +408,11 @@ struct mg_context { ...@@ -407,19 +408,11 @@ struct mg_context {
char *config[NUM_OPTIONS]; // Mongoose configuration parameters char *config[NUM_OPTIONS]; // Mongoose configuration parameters
mg_event_handler_t event_handler; // User-defined callback function mg_event_handler_t event_handler; // User-defined callback function
void *user_data; // User-defined data void *user_data; // User-defined data
struct socket *listening_sockets; struct socket *listening_sockets;
int num_listening_sockets; int num_listening_sockets;
int num_threads; // Number of threads
sock_t ctl[2]; // Socket pair for inter-thread communication
volatile int num_threads; // Number of threads
pthread_mutex_t mutex; // Protects (max|num)_threads
pthread_cond_t cond; // Condvar for tracking workers terminations
struct socket queue[MGSQLEN]; // Accepted sockets
volatile int sq_head; // Head of the socket queue
volatile int sq_tail; // Tail of the socket queue
pthread_cond_t sq_full; // Signaled when socket is produced
pthread_cond_t sq_empty; // Signaled when socket is consumed
}; };
struct mg_connection { struct mg_connection {
......
...@@ -12,6 +12,38 @@ static int call_user(int type, struct mg_connection *conn, void *p) { ...@@ -12,6 +12,38 @@ static int call_user(int type, struct mg_connection *conn, void *p) {
0 : conn->ctx->event_handler(&conn->event); 0 : conn->ctx->event_handler(&conn->event);
} }
static int mg_socketpair(sock_t sp[2]) {
struct sockaddr_in sa;
sock_t sock, ret = -1;
socklen_t len = sizeof(sa);
sp[0] = sp[1] = INVALID_SOCKET;
(void) memset(&sa, 0, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_port = htons(0);
sa.sin_addr.s_addr = htonl(0x7f000001);
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
!bind(sock, (struct sockaddr *) &sa, len) &&
!listen(sock, 1) &&
!getsockname(sock, (struct sockaddr *) &sa, &len) &&
(sp[0] = socket(AF_INET, SOCK_STREAM, 6)) != -1 &&
!connect(sp[0], (struct sockaddr *) &sa, len) &&
(sp[1] = accept(sock,(struct sockaddr *) &sa, &len)) != INVALID_SOCKET) {
set_close_on_exec(sp[0]);
set_close_on_exec(sp[1]);
ret = 0;
} else {
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
sp[0] = sp[1] = INVALID_SOCKET;
}
closesocket(sock);
return ret;
}
static FILE *mg_fopen(const char *path, const char *mode) { static FILE *mg_fopen(const char *path, const char *mode) {
#ifdef _WIN32 #ifdef _WIN32
wchar_t wbuf[PATH_MAX], wmode[20]; wchar_t wbuf[PATH_MAX], wmode[20];
...@@ -1081,31 +1113,7 @@ static void process_new_connection(struct mg_connection *conn) { ...@@ -1081,31 +1113,7 @@ static void process_new_connection(struct mg_connection *conn) {
// Worker threads take accepted socket from the queue // Worker threads take accepted socket from the queue
static int consume_socket(struct mg_context *ctx, struct socket *sp) { static int consume_socket(struct mg_context *ctx, struct socket *sp) {
(void) pthread_mutex_lock(&ctx->mutex); recv(ctx->ctl[1], (void *) sp, sizeof(*sp), 0);
DEBUG_TRACE(("going idle"));
// If the queue is empty, wait. We're idle at this point.
while (ctx->sq_head == ctx->sq_tail && ctx->stop_flag == 0) {
pthread_cond_wait(&ctx->sq_full, &ctx->mutex);
}
// If we're stopping, sq_head may be equal to sq_tail.
if (ctx->sq_head > ctx->sq_tail) {
// Copy socket from the queue and increment tail
*sp = ctx->queue[ctx->sq_tail % ARRAY_SIZE(ctx->queue)];
ctx->sq_tail++;
DEBUG_TRACE(("grabbed socket %d, going busy", sp->sock));
// Wrap pointers if needed
while (ctx->sq_tail > (int) ARRAY_SIZE(ctx->queue)) {
ctx->sq_tail -= ARRAY_SIZE(ctx->queue);
ctx->sq_head -= ARRAY_SIZE(ctx->queue);
}
}
(void) pthread_cond_signal(&ctx->sq_empty);
(void) pthread_mutex_unlock(&ctx->mutex);
return !ctx->stop_flag; return !ctx->stop_flag;
} }
...@@ -1154,11 +1162,7 @@ static void *worker_thread(void *thread_func_param) { ...@@ -1154,11 +1162,7 @@ static void *worker_thread(void *thread_func_param) {
} }
// Signal master that we're done with connection and exiting // Signal master that we're done with connection and exiting
(void) pthread_mutex_lock(&ctx->mutex); send(ctx->ctl[1], "x", 1, 0);
ctx->num_threads--;
(void) pthread_cond_signal(&ctx->cond);
assert(ctx->num_threads >= 0);
(void) pthread_mutex_unlock(&ctx->mutex);
DEBUG_TRACE(("exiting")); DEBUG_TRACE(("exiting"));
return NULL; return NULL;
...@@ -1166,23 +1170,7 @@ static void *worker_thread(void *thread_func_param) { ...@@ -1166,23 +1170,7 @@ static void *worker_thread(void *thread_func_param) {
// Master thread adds accepted socket to a queue // Master thread adds accepted socket to a queue
static void produce_socket(struct mg_context *ctx, const struct socket *sp) { static void produce_socket(struct mg_context *ctx, const struct socket *sp) {
(void) pthread_mutex_lock(&ctx->mutex); send(ctx->ctl[0], (void *) sp, sizeof(*sp), 0);
// If the queue is full, wait
while (ctx->stop_flag == 0 &&
ctx->sq_head - ctx->sq_tail >= (int) ARRAY_SIZE(ctx->queue)) {
(void) pthread_cond_wait(&ctx->sq_empty, &ctx->mutex);
}
if (ctx->sq_head - ctx->sq_tail < (int) ARRAY_SIZE(ctx->queue)) {
// Copy socket to the queue and increment head
ctx->queue[ctx->sq_head % ARRAY_SIZE(ctx->queue)] = *sp;
ctx->sq_head++;
DEBUG_TRACE(("queued socket %d", sp->sock));
}
(void) pthread_cond_signal(&ctx->sq_full);
(void) pthread_mutex_unlock(&ctx->mutex);
} }
static int set_sock_timeout(SOCKET sock, int milliseconds) { static int set_sock_timeout(SOCKET sock, int milliseconds) {
...@@ -1272,20 +1260,16 @@ static void *master_thread(void *thread_func_param) { ...@@ -1272,20 +1260,16 @@ static void *master_thread(void *thread_func_param) {
close_all_listening_sockets(ctx); close_all_listening_sockets(ctx);
// Wakeup workers that are waiting for connections to handle. // Wakeup workers that are waiting for connections to handle.
pthread_cond_broadcast(&ctx->sq_full); for (i = 0; i < ctx->num_threads; i++) {
struct socket dummy;
send(ctx->ctl[0], (void *) &dummy, sizeof(dummy), 0);
}
// Wait until all threads finish // Wait until all threads finish
(void) pthread_mutex_lock(&ctx->mutex); for (i = 0; i < ctx->num_threads; i++) {
while (ctx->num_threads > 0) { char ch;
(void) pthread_cond_wait(&ctx->cond, &ctx->mutex); recv(ctx->ctl[0], &ch, 1, 0);
} }
(void) pthread_mutex_unlock(&ctx->mutex);
// All threads exited, no sync is needed. Destroy mutex and condvars
(void) pthread_mutex_destroy(&ctx->mutex);
(void) pthread_cond_destroy(&ctx->cond);
(void) pthread_cond_destroy(&ctx->sq_empty);
(void) pthread_cond_destroy(&ctx->sq_full);
#if !defined(NO_SSL) #if !defined(NO_SSL)
uninitialize_ssl(ctx); uninitialize_ssl(ctx);
...@@ -1406,10 +1390,7 @@ struct mg_context *mg_start(const char **options, ...@@ -1406,10 +1390,7 @@ struct mg_context *mg_start(const char **options,
(void) signal(SIGPIPE, SIG_IGN); (void) signal(SIGPIPE, SIG_IGN);
#endif // !_WIN32 #endif // !_WIN32
(void) pthread_mutex_init(&ctx->mutex, NULL); mg_socketpair(ctx->ctl);
(void) pthread_cond_init(&ctx->cond, NULL);
(void) pthread_cond_init(&ctx->sq_empty, NULL);
(void) pthread_cond_init(&ctx->sq_full, NULL);
// Start master (listening) thread // Start master (listening) thread
mg_start_thread(master_thread, ctx); mg_start_thread(master_thread, ctx);
......
...@@ -23,34 +23,6 @@ static int pthread_mutex_unlock(pthread_mutex_t *mutex) { ...@@ -23,34 +23,6 @@ static int pthread_mutex_unlock(pthread_mutex_t *mutex) {
return ReleaseMutex(*mutex) == 0 ? -1 : 0; return ReleaseMutex(*mutex) == 0 ? -1 : 0;
} }
static int pthread_cond_init(pthread_cond_t *cv, const void *unused) {
(void) unused;
cv->signal = CreateEvent(NULL, FALSE, FALSE, NULL);
cv->broadcast = CreateEvent(NULL, TRUE, FALSE, NULL);
return cv->signal != NULL && cv->broadcast != NULL ? 0 : -1;
}
static int pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mutex) {
HANDLE handles[] = {cv->signal, cv->broadcast};
ReleaseMutex(*mutex);
WaitForMultipleObjects(2, handles, FALSE, INFINITE);
return WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0? 0 : -1;
}
static int pthread_cond_signal(pthread_cond_t *cv) {
return SetEvent(cv->signal) == 0 ? -1 : 0;
}
static int pthread_cond_broadcast(pthread_cond_t *cv) {
// Implementation with PulseEvent() has race condition, see
// http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
return PulseEvent(cv->broadcast) == 0 ? -1 : 0;
}
static int pthread_cond_destroy(pthread_cond_t *cv) {
return CloseHandle(cv->signal) && CloseHandle(cv->broadcast) ? 0 : -1;
}
// For Windows, change all slashes to backslashes in path names. // For Windows, change all slashes to backslashes in path names.
static void change_slashes_to_backslashes(char *path) { static void change_slashes_to_backslashes(char *path) {
int i; int i;
......
...@@ -152,7 +152,6 @@ typedef long off_t; ...@@ -152,7 +152,6 @@ typedef long off_t;
#endif // !fileno MINGW #defines fileno #endif // !fileno MINGW #defines fileno
typedef HANDLE pthread_mutex_t; typedef HANDLE pthread_mutex_t;
typedef struct {HANDLE signal, broadcast;} pthread_cond_t;
typedef DWORD pthread_t; typedef DWORD pthread_t;
#define pid_t HANDLE // MINGW typedefs pid_t to int. Using #define here. #define pid_t HANDLE // MINGW typedefs pid_t to int. Using #define here.
...@@ -252,6 +251,8 @@ typedef int SOCKET; ...@@ -252,6 +251,8 @@ typedef int SOCKET;
#define MAX_REQUEST_SIZE 16384 #define MAX_REQUEST_SIZE 16384
#define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0])) #define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0]))
typedef SOCKET sock_t;
#ifdef DEBUG_TRACE #ifdef DEBUG_TRACE
#undef DEBUG_TRACE #undef DEBUG_TRACE
#define DEBUG_TRACE(x) #define DEBUG_TRACE(x)
...@@ -407,19 +408,11 @@ struct mg_context { ...@@ -407,19 +408,11 @@ struct mg_context {
char *config[NUM_OPTIONS]; // Mongoose configuration parameters char *config[NUM_OPTIONS]; // Mongoose configuration parameters
mg_event_handler_t event_handler; // User-defined callback function mg_event_handler_t event_handler; // User-defined callback function
void *user_data; // User-defined data void *user_data; // User-defined data
struct socket *listening_sockets; struct socket *listening_sockets;
int num_listening_sockets; int num_listening_sockets;
int num_threads; // Number of threads
sock_t ctl[2]; // Socket pair for inter-thread communication
volatile int num_threads; // Number of threads
pthread_mutex_t mutex; // Protects (max|num)_threads
pthread_cond_t cond; // Condvar for tracking workers terminations
struct socket queue[MGSQLEN]; // Accepted sockets
volatile int sq_head; // Head of the socket queue
volatile int sq_tail; // Tail of the socket queue
pthread_cond_t sq_full; // Signaled when socket is produced
pthread_cond_t sq_empty; // Signaled when socket is consumed
}; };
struct mg_connection { struct mg_connection {
...@@ -1537,34 +1530,6 @@ static int pthread_mutex_unlock(pthread_mutex_t *mutex) { ...@@ -1537,34 +1530,6 @@ static int pthread_mutex_unlock(pthread_mutex_t *mutex) {
return ReleaseMutex(*mutex) == 0 ? -1 : 0; return ReleaseMutex(*mutex) == 0 ? -1 : 0;
} }
static int pthread_cond_init(pthread_cond_t *cv, const void *unused) {
(void) unused;
cv->signal = CreateEvent(NULL, FALSE, FALSE, NULL);
cv->broadcast = CreateEvent(NULL, TRUE, FALSE, NULL);
return cv->signal != NULL && cv->broadcast != NULL ? 0 : -1;
}
static int pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mutex) {
HANDLE handles[] = {cv->signal, cv->broadcast};
ReleaseMutex(*mutex);
WaitForMultipleObjects(2, handles, FALSE, INFINITE);
return WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0? 0 : -1;
}
static int pthread_cond_signal(pthread_cond_t *cv) {
return SetEvent(cv->signal) == 0 ? -1 : 0;
}
static int pthread_cond_broadcast(pthread_cond_t *cv) {
// Implementation with PulseEvent() has race condition, see
// http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
return PulseEvent(cv->broadcast) == 0 ? -1 : 0;
}
static int pthread_cond_destroy(pthread_cond_t *cv) {
return CloseHandle(cv->signal) && CloseHandle(cv->broadcast) ? 0 : -1;
}
// For Windows, change all slashes to backslashes in path names. // For Windows, change all slashes to backslashes in path names.
static void change_slashes_to_backslashes(char *path) { static void change_slashes_to_backslashes(char *path) {
int i; int i;
...@@ -3918,6 +3883,38 @@ static int call_user(int type, struct mg_connection *conn, void *p) { ...@@ -3918,6 +3883,38 @@ static int call_user(int type, struct mg_connection *conn, void *p) {
0 : conn->ctx->event_handler(&conn->event); 0 : conn->ctx->event_handler(&conn->event);
} }
static int mg_socketpair(sock_t sp[2]) {
struct sockaddr_in sa;
sock_t sock, ret = -1;
socklen_t len = sizeof(sa);
sp[0] = sp[1] = INVALID_SOCKET;
(void) memset(&sa, 0, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_port = htons(0);
sa.sin_addr.s_addr = htonl(0x7f000001);
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) != INVALID_SOCKET &&
!bind(sock, (struct sockaddr *) &sa, len) &&
!listen(sock, 1) &&
!getsockname(sock, (struct sockaddr *) &sa, &len) &&
(sp[0] = socket(AF_INET, SOCK_STREAM, 6)) != -1 &&
!connect(sp[0], (struct sockaddr *) &sa, len) &&
(sp[1] = accept(sock,(struct sockaddr *) &sa, &len)) != INVALID_SOCKET) {
set_close_on_exec(sp[0]);
set_close_on_exec(sp[1]);
ret = 0;
} else {
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
sp[0] = sp[1] = INVALID_SOCKET;
}
closesocket(sock);
return ret;
}
static FILE *mg_fopen(const char *path, const char *mode) { static FILE *mg_fopen(const char *path, const char *mode) {
#ifdef _WIN32 #ifdef _WIN32
wchar_t wbuf[PATH_MAX], wmode[20]; wchar_t wbuf[PATH_MAX], wmode[20];
...@@ -4987,31 +4984,7 @@ static void process_new_connection(struct mg_connection *conn) { ...@@ -4987,31 +4984,7 @@ static void process_new_connection(struct mg_connection *conn) {
// Worker threads take accepted socket from the queue // Worker threads take accepted socket from the queue
static int consume_socket(struct mg_context *ctx, struct socket *sp) { static int consume_socket(struct mg_context *ctx, struct socket *sp) {
(void) pthread_mutex_lock(&ctx->mutex); recv(ctx->ctl[1], (void *) sp, sizeof(*sp), 0);
DEBUG_TRACE(("going idle"));
// If the queue is empty, wait. We're idle at this point.
while (ctx->sq_head == ctx->sq_tail && ctx->stop_flag == 0) {
pthread_cond_wait(&ctx->sq_full, &ctx->mutex);
}
// If we're stopping, sq_head may be equal to sq_tail.
if (ctx->sq_head > ctx->sq_tail) {
// Copy socket from the queue and increment tail
*sp = ctx->queue[ctx->sq_tail % ARRAY_SIZE(ctx->queue)];
ctx->sq_tail++;
DEBUG_TRACE(("grabbed socket %d, going busy", sp->sock));
// Wrap pointers if needed
while (ctx->sq_tail > (int) ARRAY_SIZE(ctx->queue)) {
ctx->sq_tail -= ARRAY_SIZE(ctx->queue);
ctx->sq_head -= ARRAY_SIZE(ctx->queue);
}
}
(void) pthread_cond_signal(&ctx->sq_empty);
(void) pthread_mutex_unlock(&ctx->mutex);
return !ctx->stop_flag; return !ctx->stop_flag;
} }
...@@ -5060,11 +5033,7 @@ static void *worker_thread(void *thread_func_param) { ...@@ -5060,11 +5033,7 @@ static void *worker_thread(void *thread_func_param) {
} }
// Signal master that we're done with connection and exiting // Signal master that we're done with connection and exiting
(void) pthread_mutex_lock(&ctx->mutex); send(ctx->ctl[1], "x", 1, 0);
ctx->num_threads--;
(void) pthread_cond_signal(&ctx->cond);
assert(ctx->num_threads >= 0);
(void) pthread_mutex_unlock(&ctx->mutex);
DEBUG_TRACE(("exiting")); DEBUG_TRACE(("exiting"));
return NULL; return NULL;
...@@ -5072,23 +5041,7 @@ static void *worker_thread(void *thread_func_param) { ...@@ -5072,23 +5041,7 @@ static void *worker_thread(void *thread_func_param) {
// Master thread adds accepted socket to a queue // Master thread adds accepted socket to a queue
static void produce_socket(struct mg_context *ctx, const struct socket *sp) { static void produce_socket(struct mg_context *ctx, const struct socket *sp) {
(void) pthread_mutex_lock(&ctx->mutex); send(ctx->ctl[0], (void *) sp, sizeof(*sp), 0);
// If the queue is full, wait
while (ctx->stop_flag == 0 &&
ctx->sq_head - ctx->sq_tail >= (int) ARRAY_SIZE(ctx->queue)) {
(void) pthread_cond_wait(&ctx->sq_empty, &ctx->mutex);
}
if (ctx->sq_head - ctx->sq_tail < (int) ARRAY_SIZE(ctx->queue)) {
// Copy socket to the queue and increment head
ctx->queue[ctx->sq_head % ARRAY_SIZE(ctx->queue)] = *sp;
ctx->sq_head++;
DEBUG_TRACE(("queued socket %d", sp->sock));
}
(void) pthread_cond_signal(&ctx->sq_full);
(void) pthread_mutex_unlock(&ctx->mutex);
} }
static int set_sock_timeout(SOCKET sock, int milliseconds) { static int set_sock_timeout(SOCKET sock, int milliseconds) {
...@@ -5178,20 +5131,16 @@ static void *master_thread(void *thread_func_param) { ...@@ -5178,20 +5131,16 @@ static void *master_thread(void *thread_func_param) {
close_all_listening_sockets(ctx); close_all_listening_sockets(ctx);
// Wakeup workers that are waiting for connections to handle. // Wakeup workers that are waiting for connections to handle.
pthread_cond_broadcast(&ctx->sq_full); for (i = 0; i < ctx->num_threads; i++) {
struct socket dummy;
send(ctx->ctl[0], (void *) &dummy, sizeof(dummy), 0);
}
// Wait until all threads finish // Wait until all threads finish
(void) pthread_mutex_lock(&ctx->mutex); for (i = 0; i < ctx->num_threads; i++) {
while (ctx->num_threads > 0) { char ch;
(void) pthread_cond_wait(&ctx->cond, &ctx->mutex); recv(ctx->ctl[0], &ch, 1, 0);
} }
(void) pthread_mutex_unlock(&ctx->mutex);
// All threads exited, no sync is needed. Destroy mutex and condvars
(void) pthread_mutex_destroy(&ctx->mutex);
(void) pthread_cond_destroy(&ctx->cond);
(void) pthread_cond_destroy(&ctx->sq_empty);
(void) pthread_cond_destroy(&ctx->sq_full);
#if !defined(NO_SSL) #if !defined(NO_SSL)
uninitialize_ssl(ctx); uninitialize_ssl(ctx);
...@@ -5312,10 +5261,7 @@ struct mg_context *mg_start(const char **options, ...@@ -5312,10 +5261,7 @@ struct mg_context *mg_start(const char **options,
(void) signal(SIGPIPE, SIG_IGN); (void) signal(SIGPIPE, SIG_IGN);
#endif // !_WIN32 #endif // !_WIN32
(void) pthread_mutex_init(&ctx->mutex, NULL); mg_socketpair(ctx->ctl);
(void) pthread_cond_init(&ctx->cond, NULL);
(void) pthread_cond_init(&ctx->sq_empty, NULL);
(void) pthread_cond_init(&ctx->sq_full, NULL);
// Start master (listening) thread // Start master (listening) thread
mg_start_thread(master_thread, ctx); mg_start_thread(master_thread, ctx);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment