Commit aca05121 authored by Sergey Lyubka's avatar Sergey Lyubka

optimized DEBUG_TRACE(). Fixed thread sync on exit.

parent fcb57df1
...@@ -196,10 +196,15 @@ typedef int SOCKET; ...@@ -196,10 +196,15 @@ typedef int SOCKET;
#define MAX_CGI_ENVIR_VARS 64 #define MAX_CGI_ENVIR_VARS 64
#define MAX_REQUEST_SIZE 8192 #define MAX_REQUEST_SIZE 8192
#define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0])) #define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0]))
#define DEBUG_MGS_PREFIX "*** Mongoose debug *** "
#if defined(DEBUG) #if defined(DEBUG)
#define DEBUG_TRACE(x) do {printf x; putchar('\n'); fflush(stdout);} while (0) #define DEBUG_TRACE(x) do { \
printf("*** [%lu] thread %p: %s: ", \
(unsigned long) time(NULL), (void *) pthread_self(), __func__); \
printf x; \
putchar('\n'); \
fflush(stdout); \
} while (0)
#else #else
#define DEBUG_TRACE(x) #define DEBUG_TRACE(x)
#endif // DEBUG #endif // DEBUG
...@@ -344,13 +349,13 @@ struct mg_context { ...@@ -344,13 +349,13 @@ struct mg_context {
int num_threads; // Number of threads int num_threads; // Number of threads
pthread_mutex_t mutex; // Protects (max|num)_threads pthread_mutex_t mutex; // Protects (max|num)_threads
pthread_cond_t thr_cond; // Condvar for thread sync pthread_cond_t cond; // Condvar for tracking workers terminations
struct socket queue[20]; // Accepted sockets struct socket queue[20]; // Accepted sockets
int sq_head; // Head of the socket queue int sq_head; // Head of the socket queue
int sq_tail; // Tail of the socket queue int sq_tail; // Tail of the socket queue
pthread_cond_t empty_cond; // Socket queue empty condvar pthread_cond_t sq_full; // Singaled when socket is produced
pthread_cond_t full_cond; // Socket queue full condvar pthread_cond_t sq_empty; // Signaled when socket is consumed
}; };
struct mg_connection { struct mg_connection {
...@@ -622,7 +627,7 @@ static void send_http_error(struct mg_connection *conn, int status, ...@@ -622,7 +627,7 @@ static void send_http_error(struct mg_connection *conn, int status,
mg_callback_t error_handler; mg_callback_t error_handler;
bool_t handled; bool_t handled;
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: %d %s", __func__, status, reason)); DEBUG_TRACE(("%d %s", status, reason));
conn->request_info.status_code = status; conn->request_info.status_code = status;
error_handler = conn->ctx->config->http_error_handler; error_handler = conn->ctx->config->http_error_handler;
...@@ -1040,7 +1045,7 @@ static pid_t spawn_process(struct mg_connection *conn, const char *prog, ...@@ -1040,7 +1045,7 @@ static pid_t spawn_process(struct mg_connection *conn, const char *prog,
(void) mg_snprintf(conn, line, sizeof(line), "%s", dir); (void) mg_snprintf(conn, line, sizeof(line), "%s", dir);
change_slashes_to_backslashes(line); change_slashes_to_backslashes(line);
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: Running [%s]", __func__, cmdline)); DEBUG_TRACE(("Running [%s]", cmdline));
if (CreateProcessA(NULL, cmdline, NULL, NULL, MG_TRUE, if (CreateProcessA(NULL, cmdline, NULL, NULL, MG_TRUE,
CREATE_NEW_PROCESS_GROUP, envblk, line, &si, &pi) == 0) { CREATE_NEW_PROCESS_GROUP, envblk, line, &si, &pi) == 0) {
cry(conn, "%s: CreateProcess(%s): %d", cry(conn, "%s: CreateProcess(%s): %d",
...@@ -1093,7 +1098,8 @@ static int start_thread(struct mg_context *ctx, mg_thread_func_t func, ...@@ -1093,7 +1098,8 @@ static int start_thread(struct mg_context *ctx, mg_thread_func_t func,
(void) pthread_attr_init(&attr); (void) pthread_attr_init(&attr);
(void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); (void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
(void) pthread_attr_setstacksize(&attr, sizeof(struct mg_connection) * 2); // TODO(lsm): figure out why mongoose dies on Linux if next line is enabled
// (void) pthread_attr_setstacksize(&attr, sizeof(struct mg_connection) * 5);
if ((retval = pthread_create(&thread_id, &attr, func, param)) != 0) { if ((retval = pthread_create(&thread_id, &attr, func, param)) != 0) {
cry(fc(ctx), "%s: %s", __func__, strerror(retval)); cry(fc(ctx), "%s: %s", __func__, strerror(retval));
...@@ -1119,9 +1125,9 @@ static pid_t spawn_process(struct mg_connection *conn, const char *prog, ...@@ -1119,9 +1125,9 @@ static pid_t spawn_process(struct mg_connection *conn, const char *prog,
if (chdir(dir) != 0) { if (chdir(dir) != 0) {
cry(conn, "%s: chdir(%s): %s", __func__, dir, strerror(ERRNO)); cry(conn, "%s: chdir(%s): %s", __func__, dir, strerror(ERRNO));
} else if (dup2(fd_stdin, 0) == -1) { } else if (dup2(fd_stdin, 0) == -1) {
cry(conn, "%s: dup2(stdin, %d): %s", __func__, fd_stdin, strerror(ERRNO)); cry(conn, "%s: dup2(%d, 1): %s", __func__, fd_stdin, strerror(ERRNO));
} else if (dup2(fd_stdout, 1) == -1) { } else if (dup2(fd_stdout, 1) == -1) {
cry(conn, "%s: dup2(stdout, %d): %s", __func__, fd_stdout, strerror(ERRNO)); cry(conn, "%s: dup2(%d, 1): %s", __func__, fd_stdout, strerror(ERRNO));
} else { } else {
(void) dup2(fd_stdout, 2); (void) dup2(fd_stdout, 2);
(void) close(fd_stdin); (void) close(fd_stdin);
...@@ -1387,7 +1393,7 @@ static void convert_uri_to_file_name(struct mg_connection *conn, ...@@ -1387,7 +1393,7 @@ static void convert_uri_to_file_name(struct mg_connection *conn,
change_slashes_to_backslashes(buf); change_slashes_to_backslashes(buf);
#endif /* _WIN32 */ #endif /* _WIN32 */
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: [%s] -> [%s]", __func__, uri, buf)); DEBUG_TRACE(("[%s] -> [%s]", uri, buf));
} }
// Setup listening socket on given address, return socket. // Setup listening socket on given address, return socket.
...@@ -2521,6 +2527,7 @@ static bool_t handle_request_body(struct mg_connection *conn, FILE *fp) { ...@@ -2521,6 +2527,7 @@ static bool_t handle_request_body(struct mg_connection *conn, FILE *fp) {
if (conn->content_len <= (int64_t) data_len) { if (conn->content_len <= (int64_t) data_len) {
#if 0 #if 0
TODO(lsm): sort out embedded mode.
ri->post_data_len = (int) content_len; ri->post_data_len = (int) content_len;
#endif #endif
// If fp is NULL, this is embedded mode, and we do not // If fp is NULL, this is embedded mode, and we do not
...@@ -3043,7 +3050,7 @@ static void handle_request(struct mg_connection *conn) { ...@@ -3043,7 +3050,7 @@ static void handle_request(struct mg_connection *conn) {
remove_double_dots_and_double_slashes(ri->uri); remove_double_dots_and_double_slashes(ri->uri);
convert_uri_to_file_name(conn, ri->uri, path, sizeof(path)); convert_uri_to_file_name(conn, ri->uri, path, sizeof(path));
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: %s", __func__, ri->uri)); DEBUG_TRACE(("%s", ri->uri));
if (new_request_callback && new_request_callback(conn, ri) == MG_TRUE) { if (new_request_callback && new_request_callback(conn, ri) == MG_TRUE) {
// Do nothing, callback has served the request // Do nothing, callback has served the request
} else if (!check_authorization(conn, path)) { } else if (!check_authorization(conn, path)) {
...@@ -3107,7 +3114,6 @@ static void close_all_listening_sockets(struct mg_context *ctx) { ...@@ -3107,7 +3114,6 @@ static void close_all_listening_sockets(struct mg_context *ctx) {
(void) closesocket(sp->sock); (void) closesocket(sp->sock);
free(sp); free(sp);
} }
ctx->listening_sockets = NULL;
} }
static enum mg_error_t set_ports_option(struct mg_context *ctx) { static enum mg_error_t set_ports_option(struct mg_context *ctx) {
...@@ -3442,8 +3448,6 @@ static void close_socket_gracefully(SOCKET sock) { ...@@ -3442,8 +3448,6 @@ static void close_socket_gracefully(SOCKET sock) {
} }
static void close_connection(struct mg_connection *conn) { static void close_connection(struct mg_connection *conn) {
reset_per_request_attributes(conn);
if (conn->ssl) { if (conn->ssl) {
SSL_free(conn->ssl); SSL_free(conn->ssl);
conn->ssl = NULL; conn->ssl = NULL;
...@@ -3511,37 +3515,26 @@ static void process_new_connection(struct mg_connection *conn) { ...@@ -3511,37 +3515,26 @@ static void process_new_connection(struct mg_connection *conn) {
} }
// Worker threads take accepted socket from the queue // Worker threads take accepted socket from the queue
static bool_t get_socket(struct mg_context *ctx, struct socket *sp) { static bool_t consume_socket(struct mg_context *ctx, struct socket *sp) {
#if 0
struct timespec ts;
#endif
(void) pthread_mutex_lock(&ctx->mutex); (void) pthread_mutex_lock(&ctx->mutex);
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: thread %p: going idle", DEBUG_TRACE(("going idle"));
__func__, (void *) pthread_self()));
// If the queue is empty, wait. We're idle at this point. // If the queue is empty, wait. We're idle at this point.
while (ctx->sq_head == ctx->sq_tail) { while (ctx->sq_head == ctx->sq_tail && ctx->stop_flag == 0) {
#if 0 pthread_cond_wait(&ctx->sq_full, &ctx->mutex);
ts.tv_nsec = 0; }
ts.tv_sec = time(NULL) + 5; // Master thread could wake us up without putting a socket.
if (pthread_cond_wait(&ctx->empty_cond, &ctx->mutex, &ts) != 0) { // If this happens, it is time to exit.
// Timeout! release the mutex and return if (ctx->stop_flag) {
(void) pthread_mutex_unlock(&ctx->mutex); (void) pthread_mutex_unlock(&ctx->mutex);
return MG_FALSE; return MG_FALSE;
} }
#else
(void) pthread_cond_wait(&ctx->empty_cond, &ctx->mutex);
#endif
}
assert(ctx->sq_head > ctx->sq_tail); assert(ctx->sq_head > ctx->sq_tail);
// Copy socket from the queue and increment tail // Copy socket from the queue and increment tail
*sp = ctx->queue[ctx->sq_tail % ARRAY_SIZE(ctx->queue)]; *sp = ctx->queue[ctx->sq_tail % ARRAY_SIZE(ctx->queue)];
ctx->sq_tail++; ctx->sq_tail++;
DEBUG_TRACE((DEBUG_MGS_PREFIX DEBUG_TRACE(("grabbed socket %d, going busy", sp->sock));
"%s: thread %p grabbed socket %d, going busy",
__func__, (void *) pthread_self(), sp->sock));
// Wrap pointers if needed // Wrap pointers if needed
while (ctx->sq_tail > (int) ARRAY_SIZE(ctx->queue)) { while (ctx->sq_tail > (int) ARRAY_SIZE(ctx->queue)) {
...@@ -3549,7 +3542,7 @@ static bool_t get_socket(struct mg_context *ctx, struct socket *sp) { ...@@ -3549,7 +3542,7 @@ static bool_t get_socket(struct mg_context *ctx, struct socket *sp) {
ctx->sq_head -= ARRAY_SIZE(ctx->queue); ctx->sq_head -= ARRAY_SIZE(ctx->queue);
} }
(void) pthread_cond_signal(&ctx->full_cond); (void) pthread_cond_signal(&ctx->sq_empty);
(void) pthread_mutex_unlock(&ctx->mutex); (void) pthread_mutex_unlock(&ctx->mutex);
return MG_TRUE; return MG_TRUE;
...@@ -3558,12 +3551,9 @@ static bool_t get_socket(struct mg_context *ctx, struct socket *sp) { ...@@ -3558,12 +3551,9 @@ static bool_t get_socket(struct mg_context *ctx, struct socket *sp) {
static void worker_thread(struct mg_context *ctx) { static void worker_thread(struct mg_context *ctx) {
struct mg_connection conn; struct mg_connection conn;
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: thread %p starting",
__func__, (void *) pthread_self()));
(void) memset(&conn, 0, sizeof(conn)); (void) memset(&conn, 0, sizeof(conn));
while (ctx->stop_flag == 0 && get_socket(ctx, &conn.client) == MG_TRUE) { while (ctx->stop_flag == 0 && consume_socket(ctx, &conn.client)) {
conn.birth_time = time(NULL); conn.birth_time = time(NULL);
conn.ctx = ctx; conn.ctx = ctx;
...@@ -3593,30 +3583,29 @@ static void worker_thread(struct mg_context *ctx) { ...@@ -3593,30 +3583,29 @@ static void worker_thread(struct mg_context *ctx) {
// Signal master that we're done with connection and exiting // Signal master that we're done with connection and exiting
(void) pthread_mutex_lock(&ctx->mutex); (void) pthread_mutex_lock(&ctx->mutex);
ctx->num_threads--; ctx->num_threads--;
(void) pthread_cond_signal(&ctx->thr_cond); (void) pthread_cond_signal(&ctx->cond);
assert(ctx->num_threads >= 0); assert(ctx->num_threads >= 0);
(void) pthread_mutex_unlock(&ctx->mutex); (void) pthread_mutex_unlock(&ctx->mutex);
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: thread %p exiting", DEBUG_TRACE(("exiting"));
__func__, (void *) pthread_self()));
} }
// Master thread adds accepted socket to a queue // Master thread adds accepted socket to a queue
static void put_socket(struct mg_context *ctx, const struct socket *sp) { static void produce_socket(struct mg_context *ctx, const struct socket *sp) {
(void) pthread_mutex_lock(&ctx->mutex); (void) pthread_mutex_lock(&ctx->mutex);
// If the queue is full, wait // If the queue is full, wait
while (ctx->sq_head - ctx->sq_tail >= (int) ARRAY_SIZE(ctx->queue)) { while (ctx->sq_head - ctx->sq_tail >= (int) ARRAY_SIZE(ctx->queue)) {
(void) pthread_cond_wait(&ctx->full_cond, &ctx->mutex); (void) pthread_cond_wait(&ctx->sq_empty, &ctx->mutex);
} }
assert(ctx->sq_head - ctx->sq_tail < (int) ARRAY_SIZE(ctx->queue)); assert(ctx->sq_head - ctx->sq_tail < (int) ARRAY_SIZE(ctx->queue));
// Copy socket to the queue and increment head // Copy socket to the queue and increment head
ctx->queue[ctx->sq_head % ARRAY_SIZE(ctx->queue)] = *sp; ctx->queue[ctx->sq_head % ARRAY_SIZE(ctx->queue)] = *sp;
ctx->sq_head++; ctx->sq_head++;
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: queued socket %d", __func__, sp->sock)); DEBUG_TRACE(("queued socket %d", sp->sock));
(void) pthread_cond_signal(&ctx->empty_cond); (void) pthread_cond_signal(&ctx->sq_full);
(void) pthread_mutex_unlock(&ctx->mutex); (void) pthread_mutex_unlock(&ctx->mutex);
} }
...@@ -3627,24 +3616,20 @@ static void accept_new_connection(const struct socket *listener, ...@@ -3627,24 +3616,20 @@ static void accept_new_connection(const struct socket *listener,
accepted.rsa.len = sizeof(accepted.rsa.u.sin); accepted.rsa.len = sizeof(accepted.rsa.u.sin);
accepted.lsa = listener->lsa; accepted.lsa = listener->lsa;
if ((accepted.sock = accept(listener->sock, &accepted.rsa.u.sa, accepted.sock = accept(listener->sock, &accepted.rsa.u.sa, &accepted.rsa.len);
&accepted.rsa.len)) == INVALID_SOCKET) { if (accepted.sock != INVALID_SOCKET) {
return;
}
allowed = check_acl(ctx, &accepted.rsa) == MG_SUCCESS; allowed = check_acl(ctx, &accepted.rsa) == MG_SUCCESS;
if (allowed == MG_SUCCESS) { if (allowed == MG_SUCCESS) {
// Put accepted socket structure into the queue // Put accepted socket structure into the queue
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: accepted socket %d", DEBUG_TRACE(("accepted socket %d", accepted.sock));
__func__, accepted.sock));
accepted.is_ssl = listener->is_ssl; accepted.is_ssl = listener->is_ssl;
put_socket(ctx, &accepted); produce_socket(ctx, &accepted);
} else { } else {
cry(fc(ctx), "%s: %s is not allowed to connect", cry(fc(ctx), "%s: %s is not allowed to connect",
__func__, inet_ntoa(accepted.rsa.u.sin.sin_addr)); __func__, inet_ntoa(accepted.rsa.u.sin.sin_addr));
(void) closesocket(accepted.sock); (void) closesocket(accepted.sock);
} }
}
} }
static void master_thread(struct mg_context *ctx) { static void master_thread(struct mg_context *ctx) {
...@@ -3680,15 +3665,18 @@ static void master_thread(struct mg_context *ctx) { ...@@ -3680,15 +3665,18 @@ static void master_thread(struct mg_context *ctx) {
} }
} }
} }
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: master: stopping workers", __func__)); DEBUG_TRACE(("stopping workers"));
// Stop signal received: somebody called mg_stop. Quit. // Stop signal received: somebody called mg_stop. Quit.
close_all_listening_sockets(ctx); close_all_listening_sockets(ctx);
// Wakeup workers that are waiting for connections to handle.
pthread_cond_broadcast(&ctx->sq_full);
// Wait until all threads finish // Wait until all threads finish
(void) pthread_mutex_lock(&ctx->mutex); (void) pthread_mutex_lock(&ctx->mutex);
while (ctx->num_threads > 0) { while (ctx->num_threads > 0) {
(void) pthread_cond_wait(&ctx->thr_cond, &ctx->mutex); (void) pthread_cond_wait(&ctx->cond, &ctx->mutex);
} }
(void) pthread_mutex_unlock(&ctx->mutex); (void) pthread_mutex_unlock(&ctx->mutex);
...@@ -3697,15 +3685,16 @@ static void master_thread(struct mg_context *ctx) { ...@@ -3697,15 +3685,16 @@ static void master_thread(struct mg_context *ctx) {
SSL_CTX_free(ctx->ssl_ctx); SSL_CTX_free(ctx->ssl_ctx);
} }
// All threads exited, no sync is needed. Destroy mutex and condvars
(void) pthread_mutex_destroy(&ctx->mutex); (void) pthread_mutex_destroy(&ctx->mutex);
(void) pthread_cond_destroy(&ctx->thr_cond); (void) pthread_cond_destroy(&ctx->cond);
(void) pthread_cond_destroy(&ctx->empty_cond); (void) pthread_cond_destroy(&ctx->sq_empty);
(void) pthread_cond_destroy(&ctx->full_cond); (void) pthread_cond_destroy(&ctx->sq_full);
// Signal mg_stop() that we're done // Signal mg_stop() that we're done
ctx->stop_flag = 2; ctx->stop_flag = 2;
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: master: exiting", __func__)); DEBUG_TRACE(("exiting"));
} }
void mg_stop(struct mg_context *ctx) { void mg_stop(struct mg_context *ctx) {
...@@ -3713,10 +3702,8 @@ void mg_stop(struct mg_context *ctx) { ...@@ -3713,10 +3702,8 @@ void mg_stop(struct mg_context *ctx) {
// Wait until mg_fini() stops // Wait until mg_fini() stops
while (ctx->stop_flag != 2) { while (ctx->stop_flag != 2) {
(void) sleep(1); (void) sleep(0);
} }
assert(ctx->num_threads == 0);
free(ctx); free(ctx);
#if defined(_WIN32) #if defined(_WIN32)
...@@ -3772,9 +3759,9 @@ struct mg_context * mg_start(const struct mg_config *config) { ...@@ -3772,9 +3759,9 @@ struct mg_context * mg_start(const struct mg_config *config) {
#endif // !_WIN32 #endif // !_WIN32
(void) pthread_mutex_init(&ctx->mutex, NULL); (void) pthread_mutex_init(&ctx->mutex, NULL);
(void) pthread_cond_init(&ctx->thr_cond, NULL); (void) pthread_cond_init(&ctx->cond, NULL);
(void) pthread_cond_init(&ctx->empty_cond, NULL); (void) pthread_cond_init(&ctx->sq_empty, NULL);
(void) pthread_cond_init(&ctx->full_cond, NULL); (void) pthread_cond_init(&ctx->sq_full, NULL);
// Start master (listening) thread // Start master (listening) thread
start_thread(ctx, (mg_thread_func_t) master_thread, ctx); start_thread(ctx, (mg_thread_func_t) master_thread, ctx);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment