Commit 8442a9ce authored by Sergey Lyubka's avatar Sergey Lyubka

Fix stuck master thread in produce_socket() by always signalling sq_empty from the worker threads

parent 5d70ffe8
...@@ -3970,29 +3970,25 @@ static int consume_socket(struct mg_context *ctx, struct socket *sp) { ...@@ -3970,29 +3970,25 @@ static int consume_socket(struct mg_context *ctx, struct socket *sp) {
while (ctx->sq_head == ctx->sq_tail && ctx->stop_flag == 0) { while (ctx->sq_head == ctx->sq_tail && ctx->stop_flag == 0) {
pthread_cond_wait(&ctx->sq_full, &ctx->mutex); pthread_cond_wait(&ctx->sq_full, &ctx->mutex);
} }
// Master thread could wake us up without putting a socket.
// If this happens, it is time to exit.
if (ctx->stop_flag) {
(void) pthread_mutex_unlock(&ctx->mutex);
return 0;
}
assert(ctx->sq_head > ctx->sq_tail);
// Copy socket from the queue and increment tail
*sp = ctx->queue[ctx->sq_tail % ARRAY_SIZE(ctx->queue)];
ctx->sq_tail++;
DEBUG_TRACE(("grabbed socket %d, going busy", sp->sock));
// Wrap pointers if needed // If we're stopping, sq_head may be equal to sq_tail.
while (ctx->sq_tail > (int) ARRAY_SIZE(ctx->queue)) { if (ctx->sq_head > ctx->sq_tail) {
ctx->sq_tail -= ARRAY_SIZE(ctx->queue); // Copy socket from the queue and increment tail
ctx->sq_head -= ARRAY_SIZE(ctx->queue); *sp = ctx->queue[ctx->sq_tail % ARRAY_SIZE(ctx->queue)];
ctx->sq_tail++;
DEBUG_TRACE(("grabbed socket %d, going busy", sp->sock));
// Wrap pointers if needed
while (ctx->sq_tail > (int) ARRAY_SIZE(ctx->queue)) {
ctx->sq_tail -= ARRAY_SIZE(ctx->queue);
ctx->sq_head -= ARRAY_SIZE(ctx->queue);
}
} }
(void) pthread_cond_signal(&ctx->sq_empty); (void) pthread_cond_signal(&ctx->sq_empty);
(void) pthread_mutex_unlock(&ctx->mutex); (void) pthread_mutex_unlock(&ctx->mutex);
return 1; return !ctx->stop_flag;
} }
static void worker_thread(struct mg_context *ctx) { static void worker_thread(struct mg_context *ctx) {
...@@ -4004,7 +4000,9 @@ static void worker_thread(struct mg_context *ctx) { ...@@ -4004,7 +4000,9 @@ static void worker_thread(struct mg_context *ctx) {
conn->buf = (char *) (conn + 1); conn->buf = (char *) (conn + 1);
assert(conn != NULL); assert(conn != NULL);
while (ctx->stop_flag == 0 && consume_socket(ctx, &conn->client)) { // Call consume_socket() even when ctx->stop_flag > 0, to let it signal
// sq_empty condvar to wake up the master waiting in produce_socket()
while (consume_socket(ctx, &conn->client)) {
conn->birth_time = time(NULL); conn->birth_time = time(NULL);
conn->ctx = ctx; conn->ctx = ctx;
...@@ -4041,15 +4039,17 @@ static void produce_socket(struct mg_context *ctx, const struct socket *sp) { ...@@ -4041,15 +4039,17 @@ static void produce_socket(struct mg_context *ctx, const struct socket *sp) {
(void) pthread_mutex_lock(&ctx->mutex); (void) pthread_mutex_lock(&ctx->mutex);
// If the queue is full, wait // If the queue is full, wait
while (ctx->sq_head - ctx->sq_tail >= (int) ARRAY_SIZE(ctx->queue)) { while (ctx->stop_flag == 0 &&
ctx->sq_head - ctx->sq_tail >= (int) ARRAY_SIZE(ctx->queue)) {
(void) pthread_cond_wait(&ctx->sq_empty, &ctx->mutex); (void) pthread_cond_wait(&ctx->sq_empty, &ctx->mutex);
} }
assert(ctx->sq_head - ctx->sq_tail < (int) ARRAY_SIZE(ctx->queue));
// Copy socket to the queue and increment head if (ctx->sq_head - ctx->sq_tail < (int) ARRAY_SIZE(ctx->queue)) {
ctx->queue[ctx->sq_head % ARRAY_SIZE(ctx->queue)] = *sp; // Copy socket to the queue and increment head
ctx->sq_head++; ctx->queue[ctx->sq_head % ARRAY_SIZE(ctx->queue)] = *sp;
DEBUG_TRACE(("queued socket %d", sp->sock)); ctx->sq_head++;
DEBUG_TRACE(("queued socket %d", sp->sock));
}
(void) pthread_cond_signal(&ctx->sq_full); (void) pthread_cond_signal(&ctx->sq_full);
(void) pthread_mutex_unlock(&ctx->mutex); (void) pthread_mutex_unlock(&ctx->mutex);
...@@ -4106,7 +4106,7 @@ static void master_thread(struct mg_context *ctx) { ...@@ -4106,7 +4106,7 @@ static void master_thread(struct mg_context *ctx) {
#endif // _WIN32 #endif // _WIN32
} else { } else {
for (sp = ctx->listening_sockets; sp != NULL; sp = sp->next) { for (sp = ctx->listening_sockets; sp != NULL; sp = sp->next) {
if (FD_ISSET(sp->sock, &read_set)) { if (ctx->stop_flag == 0 && FD_ISSET(sp->sock, &read_set)) {
accept_new_connection(sp, ctx); accept_new_connection(sp, ctx);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment