Commit fb4e7015 authored by Sergey Lyubka's avatar Sergey Lyubka

Added mg_wakeup_server_ex() for pushing data

parent ccff4abf
......@@ -3,6 +3,7 @@
// All rights reserved
//
// This library is dual-licensed: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 2 as
// published by the Free Software Foundation. For the terms of this
// license, see <http://www.gnu.org/licenses/>.
......@@ -213,6 +214,7 @@ void ns_server_init(struct ns_server *, void *server_data, ns_callback_t);
void ns_server_free(struct ns_server *);
int ns_server_poll(struct ns_server *, int milli);
void ns_server_wakeup(struct ns_server *);
void ns_server_wakeup_ex(struct ns_server *, ns_callback_t, void *, size_t);
void ns_iterate(struct ns_server *, ns_callback_t cb, void *param);
struct ns_connection *ns_add_sock(struct ns_server *, sock_t sock, void *p);
......@@ -268,6 +270,11 @@ int ns_hexdump(const void *buf, int len, char *dst, int dst_len);
#define NS_FREE free
#endif
struct ctl_msg {
ns_callback_t callback;
char message[1024 * 8];
};
void iobuf_init(struct iobuf *iobuf, size_t size) {
iobuf->len = iobuf->size = 0;
iobuf->buf = NULL;
......@@ -871,9 +878,12 @@ int ns_server_poll(struct ns_server *server, int milli) {
// Read possible wakeup calls
if (server->ctl[1] != INVALID_SOCKET &&
FD_ISSET(server->ctl[1], &read_set)) {
unsigned char ch;
recv(server->ctl[1], &ch, 1, 0);
send(server->ctl[1], &ch, 1, 0);
struct ctl_msg ctl_msg;
int len = recv(server->ctl[1], &ctl_msg, sizeof(ctl_msg), 0);
send(server->ctl[1], ctl_msg.message, 1, 0);
if (len >= (int) sizeof(ctl_msg.callback) && ctl_msg.callback != NULL) {
ns_iterate(server, ctl_msg.callback, ctl_msg.message);
}
}
for (conn = server->active_connections; conn != NULL; conn = tmp_conn) {
......@@ -980,14 +990,22 @@ void ns_iterate(struct ns_server *server, ns_callback_t cb, void *param) {
}
}
void ns_server_wakeup(struct ns_server *server) {
unsigned char ch = 0;
if (server->ctl[0] != INVALID_SOCKET) {
send(server->ctl[0], &ch, 1, 0);
recv(server->ctl[0], &ch, 1, 0);
void ns_server_wakeup_ex(struct ns_server *server, ns_callback_t cb,
void *data, size_t len) {
struct ctl_msg ctl_msg;
if (server->ctl[0] != INVALID_SOCKET && data != NULL &&
len < sizeof(ctl_msg.message)) {
ctl_msg.callback = cb;
memcpy(ctl_msg.message, data, len);
send(server->ctl[0], &ctl_msg, offsetof(struct ctl_msg, message) + len, 0);
recv(server->ctl[0], &len, 1, 0);
}
}
void ns_server_wakeup(struct ns_server *server) {
ns_server_wakeup_ex(server, NULL, (void *) "", 0);
}
void ns_server_init(struct ns_server *s, void *server_data, ns_callback_t cb) {
memset(s, 0, sizeof(*s));
s->listening_sock = s->ctl[0] = s->ctl[1] = INVALID_SOCKET;
......@@ -1004,7 +1022,7 @@ void ns_server_init(struct ns_server *s, void *server_data, ns_callback_t cb) {
#ifndef NS_DISABLE_SOCKETPAIR
do {
ns_socketpair(s->ctl);
ns_socketpair2(s->ctl, SOCK_DGRAM);
} while (s->ctl[0] == INVALID_SOCKET);
#endif
......@@ -1173,7 +1191,6 @@ enum {
#endif
#ifdef NS_ENABLE_SSL
SSL_CERTIFICATE,
SSL_CA_CERTIFICATE,
#endif
URL_REWRITES,
NUM_OPTIONS
......@@ -1214,7 +1231,6 @@ static const char *static_config_options[] = {
#endif
#ifdef NS_ENABLE_SSL
"ssl_certificate", NULL,
"ssl_ca_certificate", NULL,
#endif
"url_rewrites", NULL,
NULL
......@@ -4545,10 +4561,6 @@ const char *mg_set_option(struct mg_server *server, const char *name,
} else if (res == -1) {
error_msg = "SSL_CTX_new() failed";
}
} else if (ind == SSL_CA_CERTIFICATE) {
if (ns_set_ssl_ca_cert(&server->ns_server, value) != 0) {
error_msg = "Error setting CA cert";
}
#endif
}
......@@ -4716,8 +4728,38 @@ static void mg_ev_handler(struct ns_connection *nc, enum ns_event ev, void *p) {
}
}
static void iter2(struct ns_connection *nc, enum ns_event ev, void *param) {
mg_handler_t func = NULL;
struct connection *conn = (struct connection *) nc->connection_data;
const char *msg = (const char *) param;
int n;
(void) ev;
DBG(("%p [%s]", conn, msg));
if (sscanf(msg, "%p %n", &func, &n) && func != NULL) {
conn->mg_conn.callback_param = (void *) (msg + n);
func(&conn->mg_conn, MG_POLL);
}
}
void mg_wakeup_server_ex(struct mg_server *server, mg_handler_t cb,
const char *fmt, ...) {
va_list ap;
char buf[8 * 1024];
int len;
// Encode callback (cb) into a buffer
len = snprintf(buf, sizeof(buf), "%p ", cb);
va_start(ap, fmt);
len += vsnprintf(buf + len, sizeof(buf) - len, fmt, ap);
va_end(ap);
// "len + 1" is to include terminating \0 in the message
ns_server_wakeup_ex(&server->ns_server, iter2, buf, len + 1);
}
void mg_wakeup_server(struct mg_server *server) {
ns_server_wakeup(&server->ns_server);
ns_server_wakeup_ex(&server->ns_server, NULL, (void *) "", 0);
}
void mg_set_listening_socket(struct mg_server *server, int sock) {
......
......@@ -93,6 +93,7 @@ void mg_set_listening_socket(struct mg_server *, int sock);
int mg_get_listening_socket(struct mg_server *);
void mg_iterate_over_connections(struct mg_server *, mg_handler_t, void *);
void mg_wakeup_server(struct mg_server *);
void mg_wakeup_server_ex(struct mg_server *, mg_handler_t, const char *, ...);
struct mg_connection *mg_connect(struct mg_server *, const char *, int, int);
// Connection management functions
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment