Commit a0b599b3 authored by Sergey Lyubka's avatar Sergey Lyubka

Removed protection from mg_write(). Refactored mg_iterate_over_connections()

parent d43ccda9
......@@ -67,6 +67,7 @@ typedef struct _stati64 file_stat_t;
#define mutex_destroy(x) DeleteCriticalSection(x)
#define mutex_lock(x) EnterCriticalSection(x)
#define mutex_unlock(x) LeaveCriticalSection(x)
#define get_thread_id() ((unsigned long) GetCurrentThreadId())
#define S_ISDIR(x) ((x) & _S_IFDIR)
#define sleep(x) Sleep((x) * 1000)
#define stat(x, y) _stati64((x), (y))
......@@ -102,6 +103,7 @@ typedef struct stat file_stat_t;
#define mutex_destroy(x) pthread_mutex_destroy(x)
#define mutex_lock(x) pthread_mutex_lock(x)
#define mutex_unlock(x) pthread_mutex_unlock(x)
#define get_thread_id() ((unsigned long) pthread_self())
#define INVALID_SOCKET ((sock_t) -1)
#define INT64_FMT PRId64
#define to64(x) strtoll(x, NULL, 10)
......@@ -1286,15 +1288,7 @@ static int convert_uri_to_file_name(struct connection *conn, char *buf,
}
int mg_write(struct mg_connection *c, const void *buf, int len) {
struct connection *conn = (struct connection *) c;
int ret;
mutex_lock(&conn->mutex);
ret = spool(&conn->remote_iobuf, buf, len);
mutex_unlock(&conn->mutex);
send(conn->server->ctl[1], ".", 1, 0); // Wake up select call
return ret;
return spool(&((struct connection *) c)->remote_iobuf, buf, len);
}
#if !defined(NO_WEBSOCKET) || !defined(NO_AUTH)
......@@ -3274,6 +3268,19 @@ static void transfer_file_data(struct connection *conn) {
}
}
static void execute_iteration(struct mg_server *server) {
struct ll *lp, *tmp;
struct connection *conn;
void *msg[2];
recv(server->ctl[1], (void *) msg, sizeof(msg), 0);
LINKED_LIST_FOREACH(&server->active_connections, lp, tmp) {
conn = LINKED_LIST_ENTRY(lp, struct connection, link);
((void (*)(struct mg_connection *, void *)) msg[0])
((struct mg_connection *) conn, msg[1]);
}
}
void add_to_set(sock_t sock, fd_set *set, sock_t *max_fd) {
FD_SET(sock, set);
if (sock > *max_fd) {
......@@ -3295,7 +3302,7 @@ unsigned int mg_poll_server(struct mg_server *server, int milliseconds) {
FD_ZERO(&read_set);
FD_ZERO(&write_set);
add_to_set(server->listening_sock, &read_set, &max_fd);
add_to_set(server->ctl[0], &read_set, &max_fd);
add_to_set(server->ctl[1], &read_set, &max_fd);
LINKED_LIST_FOREACH(&server->active_connections, lp, tmp) {
conn = LINKED_LIST_ENTRY(lp, struct connection, link);
......@@ -3316,11 +3323,8 @@ unsigned int mg_poll_server(struct mg_server *server, int milliseconds) {
tv.tv_usec = (milliseconds % 1000) * 1000;
if (select(max_fd + 1, &read_set, &write_set, NULL, &tv) > 0) {
// If control socket is set, just read from it. It meant to wake up
// this select loop when another thread writes to any connection
if (FD_ISSET(server->ctl[0], &read_set)) {
char buf[500];
recv(server->ctl[0], buf, sizeof(buf), 0);
if (FD_ISSET(server->ctl[1], &read_set)) {
execute_iteration(server);
}
// Accept new connections
......@@ -3381,24 +3385,13 @@ void mg_destroy_server(struct mg_server **server) {
}
}
// Apply function to all active connections. Return number of active
// connections. Function could be NULL.
int mg_iterate_over_connections(struct mg_server *server,
void (*func)(struct mg_connection *, void *),
void *param) {
struct ll *lp, *tmp;
struct connection *conn;
int num_connections = 0;
LINKED_LIST_FOREACH(&server->active_connections, lp, tmp) {
num_connections++;
conn = LINKED_LIST_ENTRY(lp, struct connection, link);
if (conn->endpoint_type == EP_USER && func != NULL) {
func((struct mg_connection *) conn, param);
}
}
return num_connections;
// Apply function to all active connections.
void mg_iterate_over_connections(struct mg_server *server,
void (*func)(struct mg_connection *, void *),
void *param) {
// Send closure (function + parameter) to the IO thread to execute
void *msg[2] = { (void *) func, param };
send(server->ctl[0], (void *) msg, sizeof(msg), 0);
}
void mg_add_uri_handler(struct mg_server *server, const char *uri,
......
......@@ -69,9 +69,9 @@ const char **mg_get_valid_option_names(void);
const char *mg_get_option(const struct mg_server *server, const char *name);
void mg_set_listening_socket(struct mg_server *, int sock);
int mg_get_listening_socket(struct mg_server *);
int mg_iterate_over_connections(struct mg_server *,
void (*func)(struct mg_connection *, void *),
void *param);
void mg_iterate_over_connections(struct mg_server *,
void (*func)(struct mg_connection *, void *),
void *param);
// Connection management functions
int mg_write(struct mg_connection *, const void *buf, int len);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment