Commit fcb57df1 authored by Sergey Lyubka's avatar Sergey Lyubka

creating all worker threads in mg_start().

parent 5651657a
...@@ -342,9 +342,7 @@ struct mg_context { ...@@ -342,9 +342,7 @@ struct mg_context {
struct socket *listening_sockets; struct socket *listening_sockets;
int num_threads; // Number of threads int num_threads; // Number of threads
int num_idle; // Number of idle threads
pthread_mutex_t mutex; // Protects (max|num)_threads pthread_mutex_t mutex; // Protects (max|num)_threads
pthread_cond_t thr_cond; // Condvar for thread sync pthread_cond_t thr_cond; // Condvar for thread sync
...@@ -362,7 +360,7 @@ struct mg_connection { ...@@ -362,7 +360,7 @@ struct mg_connection {
struct socket client; // Connected client struct socket client; // Connected client
time_t birth_time; // Time connection was accepted time_t birth_time; // Time connection was accepted
int64_t num_bytes_sent; // Total bytes sent to client int64_t num_bytes_sent; // Total bytes sent to client
int64_t content_len; // Content-Length header value int64_t content_len; // Content-Length header value
int64_t consumed_content; // How many bytes of content is already read int64_t consumed_content; // How many bytes of content is already read
char buf[MAX_REQUEST_SIZE]; // Buffer for received data char buf[MAX_REQUEST_SIZE]; // Buffer for received data
int request_len; // Size of the request + headers in a buffer int request_len; // Size of the request + headers in a buffer
...@@ -3417,9 +3415,8 @@ static void reset_per_request_attributes(struct mg_connection *conn) { ...@@ -3417,9 +3415,8 @@ static void reset_per_request_attributes(struct mg_connection *conn) {
conn->request_info.remote_user = NULL; conn->request_info.remote_user = NULL;
} }
conn->request_info.status_code = -1; conn->request_info.status_code = -1;
conn->num_bytes_sent = 0; conn->num_bytes_sent = conn->consumed_content = 0;
conn->content_len = -1; conn->content_len = -1;
conn->consumed_content = 0;
conn->request_len = conn->data_len = 0; conn->request_len = conn->data_len = 0;
} }
...@@ -3479,9 +3476,6 @@ static void process_new_connection(struct mg_connection *conn) { ...@@ -3479,9 +3476,6 @@ static void process_new_connection(struct mg_connection *conn) {
struct mg_request_info *ri = &conn->request_info; struct mg_request_info *ri = &conn->request_info;
const char *cl; const char *cl;
// TODO(lsm): do not clear everything, since it deletes IP address
// info set up by worker_thread().
(void) memset(&conn->request_info, 0, sizeof(conn->request_info));
reset_per_request_attributes(conn); reset_per_request_attributes(conn);
// If next request is not pipelined, read it in // If next request is not pipelined, read it in
...@@ -3491,7 +3485,7 @@ static void process_new_connection(struct mg_connection *conn) { ...@@ -3491,7 +3485,7 @@ static void process_new_connection(struct mg_connection *conn) {
} }
assert(conn->data_len >= conn->request_len); assert(conn->data_len >= conn->request_len);
if (conn->request_len <= 0) { if (conn->request_len <= 0) {
return; // Remote end closed the connection return; // Remote end closed the connection
} }
// Nul-terminate the request cause parse_http_request() uses sscanf // Nul-terminate the request cause parse_http_request() uses sscanf
...@@ -3499,7 +3493,7 @@ static void process_new_connection(struct mg_connection *conn) { ...@@ -3499,7 +3493,7 @@ static void process_new_connection(struct mg_connection *conn) {
if (!parse_http_request(conn->buf, ri)) { if (!parse_http_request(conn->buf, ri)) {
// Do not put garbage in the access log, just send it back to the client // Do not put garbage in the access log, just send it back to the client
send_http_error(conn, 400, "Bad Request", send_http_error(conn, 400, "Bad Request",
"Can not parse request: [%.*s]", conn->data_len, conn->buf); "Cannot parse HTTP request: [%.*s]", conn->data_len, conn->buf);
} else if (strcmp(ri->http_version, "1.0") && } else if (strcmp(ri->http_version, "1.0") &&
strcmp(ri->http_version, "1.1")) { strcmp(ri->http_version, "1.1")) {
// Request seems valid, but HTTP version is strange // Request seems valid, but HTTP version is strange
...@@ -3518,28 +3512,30 @@ static void process_new_connection(struct mg_connection *conn) { ...@@ -3518,28 +3512,30 @@ static void process_new_connection(struct mg_connection *conn) {
// Worker threads take accepted socket from the queue // Worker threads take accepted socket from the queue
static bool_t get_socket(struct mg_context *ctx, struct socket *sp) { static bool_t get_socket(struct mg_context *ctx, struct socket *sp) {
#if 0
struct timespec ts; struct timespec ts;
#endif
(void) pthread_mutex_lock(&ctx->mutex); (void) pthread_mutex_lock(&ctx->mutex);
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: thread %p: going idle", DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: thread %p: going idle",
__func__, (void *) pthread_self())); __func__, (void *) pthread_self()));
// If the queue is empty, wait. We're idle at this point. // If the queue is empty, wait. We're idle at this point.
ctx->num_idle++;
while (ctx->sq_head == ctx->sq_tail) { while (ctx->sq_head == ctx->sq_tail) {
#if 0
ts.tv_nsec = 0; ts.tv_nsec = 0;
ts.tv_sec = time(NULL) + 5; ts.tv_sec = time(NULL) + 5;
if (pthread_cond_timedwait(&ctx->empty_cond, &ctx->mutex, &ts) != 0) { if (pthread_cond_wait(&ctx->empty_cond, &ctx->mutex, &ts) != 0) {
// Timeout! release the mutex and return // Timeout! release the mutex and return
(void) pthread_mutex_unlock(&ctx->mutex); (void) pthread_mutex_unlock(&ctx->mutex);
return MG_FALSE; return MG_FALSE;
} }
#else
(void) pthread_cond_wait(&ctx->empty_cond, &ctx->mutex);
#endif
} }
assert(ctx->sq_head > ctx->sq_tail); assert(ctx->sq_head > ctx->sq_tail);
// We're going busy now: got a socket to process!
ctx->num_idle--;
// Copy socket from the queue and increment tail // Copy socket from the queue and increment tail
*sp = ctx->queue[ctx->sq_tail % ARRAY_SIZE(ctx->queue)]; *sp = ctx->queue[ctx->sq_tail % ARRAY_SIZE(ctx->queue)];
ctx->sq_tail++; ctx->sq_tail++;
...@@ -3597,7 +3593,6 @@ static void worker_thread(struct mg_context *ctx) { ...@@ -3597,7 +3593,6 @@ static void worker_thread(struct mg_context *ctx) {
// Signal master that we're done with connection and exiting // Signal master that we're done with connection and exiting
(void) pthread_mutex_lock(&ctx->mutex); (void) pthread_mutex_lock(&ctx->mutex);
ctx->num_threads--; ctx->num_threads--;
ctx->num_idle--;
(void) pthread_cond_signal(&ctx->thr_cond); (void) pthread_cond_signal(&ctx->thr_cond);
assert(ctx->num_threads >= 0); assert(ctx->num_threads >= 0);
(void) pthread_mutex_unlock(&ctx->mutex); (void) pthread_mutex_unlock(&ctx->mutex);
...@@ -3621,15 +3616,6 @@ static void put_socket(struct mg_context *ctx, const struct socket *sp) { ...@@ -3621,15 +3616,6 @@ static void put_socket(struct mg_context *ctx, const struct socket *sp) {
ctx->sq_head++; ctx->sq_head++;
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: queued socket %d", __func__, sp->sock)); DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: queued socket %d", __func__, sp->sock));
// If there are no idle threads, start one
if (ctx->num_idle == 0 && ctx->num_threads < atoi(ctx->config->num_threads)) {
if (start_thread(ctx, (mg_thread_func_t) worker_thread, ctx) != 0) {
cry(fc(ctx), "Cannot start thread: %d", ERRNO);
} else {
ctx->num_threads++;
}
}
(void) pthread_cond_signal(&ctx->empty_cond); (void) pthread_cond_signal(&ctx->empty_cond);
(void) pthread_mutex_unlock(&ctx->mutex); (void) pthread_mutex_unlock(&ctx->mutex);
} }
...@@ -3687,11 +3673,14 @@ static void master_thread(struct mg_context *ctx) { ...@@ -3687,11 +3673,14 @@ static void master_thread(struct mg_context *ctx) {
sleep(1); sleep(1);
#endif // _WIN32 #endif // _WIN32
} else { } else {
for (sp = ctx->listening_sockets; sp != NULL; sp = sp->next) for (sp = ctx->listening_sockets; sp != NULL; sp = sp->next) {
if (FD_ISSET(sp->sock, &read_set)) if (FD_ISSET(sp->sock, &read_set)) {
accept_new_connection(sp, ctx); accept_new_connection(sp, ctx);
}
}
} }
} }
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: master: stopping workers", __func__));
// Stop signal received: somebody called mg_stop. Quit. // Stop signal received: somebody called mg_stop. Quit.
close_all_listening_sockets(ctx); close_all_listening_sockets(ctx);
...@@ -3715,6 +3704,8 @@ static void master_thread(struct mg_context *ctx) { ...@@ -3715,6 +3704,8 @@ static void master_thread(struct mg_context *ctx) {
// Signal mg_stop() that we're done // Signal mg_stop() that we're done
ctx->stop_flag = 2; ctx->stop_flag = 2;
DEBUG_TRACE((DEBUG_MGS_PREFIX "%s: master: exiting", __func__));
} }
void mg_stop(struct mg_context *ctx) { void mg_stop(struct mg_context *ctx) {
...@@ -3733,8 +3724,9 @@ void mg_stop(struct mg_context *ctx) { ...@@ -3733,8 +3724,9 @@ void mg_stop(struct mg_context *ctx) {
#endif // _WIN32 #endif // _WIN32
} }
struct mg_context * mg_start(struct mg_config *config) { struct mg_context * mg_start(const struct mg_config *config) {
struct mg_context *ctx, fake_ctx; struct mg_context *ctx, fake_ctx;
int i;
#if defined(_WIN32) #if defined(_WIN32)
WSADATA data; WSADATA data;
...@@ -3787,5 +3779,14 @@ struct mg_context * mg_start(struct mg_config *config) { ...@@ -3787,5 +3779,14 @@ struct mg_context * mg_start(struct mg_config *config) {
// Start master (listening) thread // Start master (listening) thread
start_thread(ctx, (mg_thread_func_t) master_thread, ctx); start_thread(ctx, (mg_thread_func_t) master_thread, ctx);
// Start worker threads
for (i = 0; i < atoi(ctx->config->num_threads); i++) {
if (start_thread(ctx, (mg_thread_func_t) worker_thread, ctx) != 0) {
cry(fc(ctx), "Cannot start worker thread: %d", ERRNO);
} else {
ctx->num_threads++;
}
}
return ctx; return ctx;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment