Commit 2a4ca9d5 authored by Deomid Ryabkov's avatar Deomid Ryabkov Committed by Marko Mikulicic

Introduce a network interface API, refactor UDP

    PUBLISHED_FROM=6e961e2760b2b64e211978ede5df8ca353ea5512
parent 747e40e7
...@@ -43,6 +43,7 @@ static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) { ...@@ -43,6 +43,7 @@ static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) {
*/ */
mg_dns_send_reply(nc, &reply); mg_dns_send_reply(nc, &reply);
nc->flags |= MG_F_SEND_AND_CLOSE;
break; break;
} }
} }
......
...@@ -4,7 +4,7 @@ CFLAGS = -W -Wall -I../.. -pthread -DMG_ENABLE_SSL -DMG_ENABLE_IPV6 -DMG_ENABLE_ ...@@ -4,7 +4,7 @@ CFLAGS = -W -Wall -I../.. -pthread -DMG_ENABLE_SSL -DMG_ENABLE_IPV6 -DMG_ENABLE_
all: $(PROG) all: $(PROG)
$(PROG): $(SOURCES) $(PROG): $(SOURCES)
$(CC) $(SOURCES) -o $@ $(CFLAGS) $(CC) $(SOURCES) -g -o $@ $(CFLAGS)
$(PROG).exe: $(SOURCES) $(PROG).exe: $(SOURCES)
cl $(SOURCES) /I../.. /DMG_ENABLE_SSL /MD /Fe$@ cl $(SOURCES) /I../.. /DMG_ENABLE_SSL /MD /Fe$@
......
...@@ -66,10 +66,9 @@ ...@@ -66,10 +66,9 @@
/* Amalgamated: #include "../mongoose.h" */ /* Amalgamated: #include "../mongoose.h" */
/* internals that need to be accessible in unit tests */ /* internals that need to be accessible in unit tests */
MG_INTERNAL struct mg_connection *mg_finish_connect(struct mg_connection *nc, MG_INTERNAL struct mg_connection *mg_do_connect(struct mg_connection *nc,
int proto, int proto,
union socket_address *sa, union socket_address *sa);
struct mg_add_sock_opts);
MG_INTERNAL int mg_parse_address(const char *str, union socket_address *sa, MG_INTERNAL int mg_parse_address(const char *str, union socket_address *sa,
int *proto, char *host, size_t host_len); int *proto, char *host, size_t host_len);
...@@ -78,6 +77,8 @@ MG_INTERNAL void mg_forward(struct mg_connection *, struct mg_connection *); ...@@ -78,6 +77,8 @@ MG_INTERNAL void mg_forward(struct mg_connection *, struct mg_connection *);
MG_INTERNAL void mg_add_conn(struct mg_mgr *mgr, struct mg_connection *c); MG_INTERNAL void mg_add_conn(struct mg_mgr *mgr, struct mg_connection *c);
MG_INTERNAL void mg_remove_conn(struct mg_connection *c); MG_INTERNAL void mg_remove_conn(struct mg_connection *c);
MG_INTERNAL void mg_set_sock(struct mg_connection *nc, sock_t sock);
#ifndef MG_DISABLE_FILESYSTEM #ifndef MG_DISABLE_FILESYSTEM
MG_INTERNAL int find_index_file(char *, size_t, const char *, cs_stat_t *); MG_INTERNAL int find_index_file(char *, size_t, const char *, cs_stat_t *);
#endif #endif
...@@ -1764,8 +1765,6 @@ int json_emit(char *buf, int buf_len, const char *fmt, ...) { ...@@ -1764,8 +1765,6 @@ int json_emit(char *buf, int buf_len, const char *fmt, ...) {
#endif #endif
#define MG_CTL_MSG_MESSAGE_SIZE 8192 #define MG_CTL_MSG_MESSAGE_SIZE 8192
#define MG_READ_BUFFER_SIZE 1024
#define MG_UDP_RECEIVE_BUFFER_SIZE 1500
#define MG_VPRINTF_BUFFER_SIZE 100 #define MG_VPRINTF_BUFFER_SIZE 100
#define MG_MAX_HOST_LEN 200 #define MG_MAX_HOST_LEN 200
...@@ -1791,12 +1790,16 @@ struct ctl_msg { ...@@ -1791,12 +1790,16 @@ struct ctl_msg {
char message[MG_CTL_MSG_MESSAGE_SIZE]; char message[MG_CTL_MSG_MESSAGE_SIZE];
}; };
int mg_is_error(int n);
void mg_set_non_blocking_mode(sock_t sock);
static void mg_ev_mgr_init(struct mg_mgr *mgr); static void mg_ev_mgr_init(struct mg_mgr *mgr);
static void mg_ev_mgr_free(struct mg_mgr *mgr); static void mg_ev_mgr_free(struct mg_mgr *mgr);
static void mg_ev_mgr_add_conn(struct mg_connection *nc); static void mg_ev_mgr_add_conn(struct mg_connection *nc);
static void mg_ev_mgr_remove_conn(struct mg_connection *nc); static void mg_ev_mgr_remove_conn(struct mg_connection *nc);
MG_INTERNAL void mg_add_conn(struct mg_mgr *mgr, struct mg_connection *c) { MG_INTERNAL void mg_add_conn(struct mg_mgr *mgr, struct mg_connection *c) {
DBG(("%p %p", mgr, c));
c->mgr = mgr; c->mgr = mgr;
c->next = mgr->active_connections; c->next = mgr->active_connections;
mgr->active_connections = c; mgr->active_connections = c;
...@@ -1816,8 +1819,8 @@ MG_INTERNAL void mg_call(struct mg_connection *nc, int ev, void *ev_data) { ...@@ -1816,8 +1819,8 @@ MG_INTERNAL void mg_call(struct mg_connection *nc, int ev, void *ev_data) {
unsigned long flags_before; unsigned long flags_before;
mg_event_handler_t ev_handler; mg_event_handler_t ev_handler;
DBG(("%p flags=%lu ev=%d ev_data=%p rmbl=%d", nc, nc->flags, ev, ev_data, DBG(("%p ev=%d ev_data=%p flags=%lu rmbl=%d smbl=%d", nc, ev, ev_data,
(int) nc->recv_mbuf.len)); nc->flags, (int) nc->recv_mbuf.len, (int) nc->send_mbuf.len));
#ifndef MG_DISABLE_FILESYSTEM #ifndef MG_DISABLE_FILESYSTEM
/* LCOV_EXCL_START */ /* LCOV_EXCL_START */
...@@ -1842,42 +1845,14 @@ MG_INTERNAL void mg_call(struct mg_connection *nc, int ev, void *ev_data) { ...@@ -1842,42 +1845,14 @@ MG_INTERNAL void mg_call(struct mg_connection *nc, int ev, void *ev_data) {
(nc->flags & _MG_CALLBACK_MODIFIABLE_FLAGS_MASK); (nc->flags & _MG_CALLBACK_MODIFIABLE_FLAGS_MASK);
} }
} }
DBG(("call done, flags %d", (int) nc->flags)); DBG(("%p after flags=%d rmbl=%d smbl=%d", nc, (int) nc->flags,
} (int) nc->recv_mbuf.len, (int) nc->send_mbuf.len));
static size_t mg_out(struct mg_connection *nc, const void *buf, size_t len) {
if (nc->flags & MG_F_UDP) {
int n = sendto(nc->sock, buf, len, 0, &nc->sa.sa, sizeof(nc->sa.sin));
DBG(("%p %d %d %d %s:%hu", nc, nc->sock, n, errno,
inet_ntoa(nc->sa.sin.sin_addr), ntohs(nc->sa.sin.sin_port)));
return n < 0 ? 0 : n;
} else {
return mbuf_append(&nc->send_mbuf, buf, len);
}
} }
static void mg_destroy_conn(struct mg_connection *conn) { static void mg_destroy_conn(struct mg_connection *conn) {
if (conn->sock != INVALID_SOCKET) { mg_if_destroy_conn(conn);
closesocket(conn->sock);
/*
* avoid users accidentally double close a socket
* because it can lead to difficult to debug situations.
* It would happen only if reusing a destroyed mg_connection
* but it's not always possible to run the code through an
* address sanitizer.
*/
conn->sock = INVALID_SOCKET;
}
mbuf_free(&conn->recv_mbuf); mbuf_free(&conn->recv_mbuf);
mbuf_free(&conn->send_mbuf); mbuf_free(&conn->send_mbuf);
#ifdef MG_ENABLE_SSL
if (conn->ssl != NULL) {
SSL_free(conn->ssl);
}
if (conn->ssl_ctx != NULL) {
SSL_CTX_free(conn->ssl_ctx);
}
#endif
MG_FREE(conn); MG_FREE(conn);
} }
...@@ -1906,12 +1881,6 @@ void mg_mgr_init(struct mg_mgr *m, void *user_data) { ...@@ -1906,12 +1881,6 @@ void mg_mgr_init(struct mg_mgr *m, void *user_data) {
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
#endif #endif
#ifndef MG_DISABLE_SOCKETPAIR
do {
mg_socketpair(m->ctl, SOCK_DGRAM);
} while (m->ctl[0] == INVALID_SOCKET);
#endif
#ifdef MG_ENABLE_SSL #ifdef MG_ENABLE_SSL
{ {
static int init_done; static int init_done;
...@@ -1976,7 +1945,7 @@ int mg_vprintf(struct mg_connection *nc, const char *fmt, va_list ap) { ...@@ -1976,7 +1945,7 @@ int mg_vprintf(struct mg_connection *nc, const char *fmt, va_list ap) {
int len; int len;
if ((len = mg_avprintf(&buf, sizeof(mem), fmt, ap)) > 0) { if ((len = mg_avprintf(&buf, sizeof(mem), fmt, ap)) > 0) {
mg_out(nc, buf, len); mg_send(nc, buf, len);
} }
if (buf != mem && buf != NULL) { if (buf != mem && buf != NULL) {
MG_FREE(buf); /* LCOV_EXCL_LINE */ MG_FREE(buf); /* LCOV_EXCL_LINE */
...@@ -1994,62 +1963,6 @@ int mg_printf(struct mg_connection *conn, const char *fmt, ...) { ...@@ -1994,62 +1963,6 @@ int mg_printf(struct mg_connection *conn, const char *fmt, ...) {
return len; return len;
} }
static void mg_set_non_blocking_mode(sock_t sock) {
#ifdef _WIN32
unsigned long on = 1;
ioctlsocket(sock, FIONBIO, &on);
#elif defined(MG_CC3200)
cc3200_set_non_blocking_mode(sock);
#else
int flags = fcntl(sock, F_GETFL, 0);
fcntl(sock, F_SETFL, flags | O_NONBLOCK);
#endif
}
#ifndef MG_DISABLE_SOCKETPAIR
int mg_socketpair(sock_t sp[2], int sock_type) {
union socket_address sa;
sock_t sock;
socklen_t len = sizeof(sa.sin);
int ret = 0;
sock = sp[0] = sp[1] = INVALID_SOCKET;
(void) memset(&sa, 0, sizeof(sa));
sa.sin.sin_family = AF_INET;
sa.sin.sin_port = htons(0);
sa.sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
if ((sock = socket(AF_INET, sock_type, 0)) == INVALID_SOCKET) {
} else if (bind(sock, &sa.sa, len) != 0) {
} else if (sock_type == SOCK_STREAM && listen(sock, 1) != 0) {
} else if (getsockname(sock, &sa.sa, &len) != 0) {
} else if ((sp[0] = socket(AF_INET, sock_type, 0)) == INVALID_SOCKET) {
} else if (connect(sp[0], &sa.sa, len) != 0) {
} else if (sock_type == SOCK_DGRAM &&
(getsockname(sp[0], &sa.sa, &len) != 0 ||
connect(sock, &sa.sa, len) != 0)) {
} else if ((sp[1] = (sock_type == SOCK_DGRAM ? sock
: accept(sock, &sa.sa, &len))) ==
INVALID_SOCKET) {
} else {
mg_set_close_on_exec(sp[0]);
mg_set_close_on_exec(sp[1]);
if (sock_type == SOCK_STREAM) closesocket(sock);
ret = 1;
}
if (!ret) {
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
if (sock != INVALID_SOCKET) closesocket(sock);
sock = sp[0] = sp[1] = INVALID_SOCKET;
}
return ret;
}
#endif /* MG_DISABLE_SOCKETPAIR */
/* TODO(lsm): use non-blocking resolver */ /* TODO(lsm): use non-blocking resolver */
static int mg_resolve2(const char *host, struct in_addr *ina) { static int mg_resolve2(const char *host, struct in_addr *ina) {
#ifdef MG_ENABLE_GETADDRINFO #ifdef MG_ENABLE_GETADDRINFO
...@@ -2091,8 +2004,7 @@ MG_INTERNAL struct mg_connection *mg_create_connection( ...@@ -2091,8 +2004,7 @@ MG_INTERNAL struct mg_connection *mg_create_connection(
struct mg_add_sock_opts opts) { struct mg_add_sock_opts opts) {
struct mg_connection *conn; struct mg_connection *conn;
if ((conn = (struct mg_connection *) MG_MALLOC(sizeof(*conn))) != NULL) { if ((conn = (struct mg_connection *) MG_CALLOC(1, sizeof(*conn))) != NULL) {
memset(conn, 0, sizeof(*conn));
conn->sock = INVALID_SOCKET; conn->sock = INVALID_SOCKET;
conn->handler = callback; conn->handler = callback;
conn->mgr = mgr; conn->mgr = mgr;
...@@ -2105,26 +2017,13 @@ MG_INTERNAL struct mg_connection *mg_create_connection( ...@@ -2105,26 +2017,13 @@ MG_INTERNAL struct mg_connection *mg_create_connection(
* doesn't compile with pedantic ansi flags. * doesn't compile with pedantic ansi flags.
*/ */
conn->recv_mbuf_limit = ~0; conn->recv_mbuf_limit = ~0;
} else {
MG_SET_PTRPTR(opts.error_string, "failed create connection");
} }
return conn; return conn;
} }
/* Associate a socket to a connection and and add to the manager. */
MG_INTERNAL void mg_set_sock(struct mg_connection *nc, sock_t sock) {
#if !defined(MG_CC3200)
/* Can't get non-blocking connect to work.
* TODO(rojer): Figure out why it fails where blocking succeeds.
*/
mg_set_non_blocking_mode(sock);
#endif
mg_set_close_on_exec(sock);
nc->sock = sock;
mg_add_conn(nc->mgr, nc);
DBG(("%p %d", nc, sock));
}
/* /*
* Address format: [PROTO://][HOST]:PORT * Address format: [PROTO://][HOST]:PORT
* *
...@@ -2194,53 +2093,6 @@ MG_INTERNAL int mg_parse_address(const char *str, union socket_address *sa, ...@@ -2194,53 +2093,6 @@ MG_INTERNAL int mg_parse_address(const char *str, union socket_address *sa,
return port < 0xffffUL && str[len] == '\0' ? len : -1; return port < 0xffffUL && str[len] == '\0' ? len : -1;
} }
/* 'sa' must be an initialized address to bind to */
static sock_t mg_open_listening_socket(union socket_address *sa, int proto) {
socklen_t sa_len =
(sa->sa.sa_family == AF_INET) ? sizeof(sa->sin) : sizeof(sa->sin6);
sock_t sock = INVALID_SOCKET;
#if !defined(MG_CC3200) && !defined(MG_LWIP)
int on = 1;
#endif
if ((sock = socket(sa->sa.sa_family, proto, 0)) != INVALID_SOCKET &&
#if !defined(MG_CC3200) && \
!defined(MG_LWIP) /* CC3200 and LWIP don't support either */
#if defined(_WIN32) && defined(SO_EXCLUSIVEADDRUSE)
/* "Using SO_REUSEADDR and SO_EXCLUSIVEADDRUSE" http://goo.gl/RmrFTm */
!setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (void *) &on,
sizeof(on)) &&
#endif
#if !defined(_WIN32) || !defined(SO_EXCLUSIVEADDRUSE)
/*
* SO_RESUSEADDR is not enabled on Windows because the semantics of
* SO_REUSEADDR on UNIX and Windows is different. On Windows,
* SO_REUSEADDR allows to bind a socket to a port without error even if
* the port is already open by another program. This is not the behavior
* SO_REUSEADDR was designed for, and leads to hard-to-track failure
* scenarios. Therefore, SO_REUSEADDR was disabled on Windows unless
* SO_EXCLUSIVEADDRUSE is supported and set on a socket.
*/
!setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof(on)) &&
#endif
#endif /* !MG_CC3200 && !MG_LWIP */
!bind(sock, &sa->sa, sa_len) &&
(proto == SOCK_DGRAM || listen(sock, SOMAXCONN) == 0)) {
#if !defined(MG_CC3200) && !defined(MG_LWIP) /* TODO(rojer): Fix this. */
mg_set_non_blocking_mode(sock);
/* In case port was set to 0, get the real port number */
(void) getsockname(sock, &sa->sa, &sa_len);
#endif
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
sock = INVALID_SOCKET;
}
return sock;
}
#ifdef MG_ENABLE_SSL #ifdef MG_ENABLE_SSL
/* /*
* Certificate generation script is at * Certificate generation script is at
...@@ -2414,54 +2266,39 @@ static int mg_ssl_err(struct mg_connection *conn, int res) { ...@@ -2414,54 +2266,39 @@ static int mg_ssl_err(struct mg_connection *conn, int res) {
} }
#endif /* MG_ENABLE_SSL */ #endif /* MG_ENABLE_SSL */
static struct mg_connection *accept_conn(struct mg_connection *ls) { void mg_if_accept_tcp_cb(struct mg_connection *lc, sock_t sock,
struct mg_connection *c = NULL; union socket_address *sa, size_t sa_len) {
union socket_address sa; struct mg_add_sock_opts opts;
socklen_t len = sizeof(sa); struct mg_connection *nc;
sock_t sock = INVALID_SOCKET; (void) sa_len;
memset(&opts, 0, sizeof(opts));
/* NOTE(lsm): on Windows, sock is always > FD_SETSIZE */ nc = mg_create_connection(lc->mgr, lc->handler, opts);
if ((sock = accept(ls->sock, &sa.sa, &len)) == INVALID_SOCKET) { if (nc == NULL) {
} else if ((c = mg_add_sock(ls->mgr, sock, ls->handler)) == NULL) {
closesocket(sock); closesocket(sock);
return;
}
nc->listener = lc;
nc->proto_data = lc->proto_data;
nc->proto_handler = lc->proto_handler;
nc->user_data = lc->user_data;
nc->recv_mbuf_limit = lc->recv_mbuf_limit;
nc->sa = *sa;
mg_set_sock(nc, sock); /* XXX */
mg_add_conn(nc->mgr, nc);
#ifdef MG_ENABLE_SSL #ifdef MG_ENABLE_SSL
} else if (ls->ssl_ctx != NULL && ((c->ssl = SSL_new(ls->ssl_ctx)) == NULL || if (lc->ssl_ctx != NULL) {
SSL_set_fd(c->ssl, sock) != 1)) { nc->ssl = SSL_new(lc->ssl_ctx);
if (nc->ssl == NULL || SSL_set_fd(nc->ssl, sock) != 1) {
DBG(("SSL error")); DBG(("SSL error"));
mg_close_conn(c); mg_close_conn(nc);
c = NULL;
#endif
} else {
c->listener = ls;
c->proto_data = ls->proto_data;
c->proto_handler = ls->proto_handler;
c->user_data = ls->user_data;
c->recv_mbuf_limit = ls->recv_mbuf_limit;
c->sa = sa;
if (c->ssl == NULL) { /* SSL connections need to perform handshake. */
mg_call(c, MG_EV_ACCEPT, &sa);
}
DBG(("%p %d %p %p", c, c->sock, c->ssl_ctx, c->ssl));
} }
} else
return c;
}
static int mg_is_error(int n) {
#ifdef MG_CC3200
DBG(("n = %d, errno = %d", n, errno));
if (n < 0) errno = n;
#endif
return n == 0 || (n < 0 && errno != EINTR && errno != EINPROGRESS &&
errno != EAGAIN && errno != EWOULDBLOCK
#ifdef MG_CC3200
&& errno != SL_EALREADY
#endif
#ifdef _WIN32
&& WSAGetLastError() != WSAEINTR &&
WSAGetLastError() != WSAEWOULDBLOCK
#endif #endif
); {
mg_call(nc, MG_EV_ACCEPT, &nc->sa);
}
DBG(("%p %p %d %d, %p %p", lc, nc, nc->sock, (int) nc->flags, lc->ssl_ctx,
nc->ssl));
} }
static size_t recv_avail_size(struct mg_connection *conn, size_t max) { static size_t recv_avail_size(struct mg_connection *conn, size_t max) {
...@@ -2475,6 +2312,7 @@ static size_t recv_avail_size(struct mg_connection *conn, size_t max) { ...@@ -2475,6 +2312,7 @@ static size_t recv_avail_size(struct mg_connection *conn, size_t max) {
static void mg_ssl_begin(struct mg_connection *nc) { static void mg_ssl_begin(struct mg_connection *nc) {
int server_side = nc->listener != NULL; int server_side = nc->listener != NULL;
int res = server_side ? SSL_accept(nc->ssl) : SSL_connect(nc->ssl); int res = server_side ? SSL_accept(nc->ssl) : SSL_connect(nc->ssl);
DBG(("%p %d res %d %d %d", nc, server_side, res, errno, mg_ssl_err(nc, res)));
if (res == 1) { if (res == 1) {
nc->flags |= MG_F_SSL_HANDSHAKE_DONE; nc->flags |= MG_F_SSL_HANDSHAKE_DONE;
...@@ -2486,766 +2324,1078 @@ static void mg_ssl_begin(struct mg_connection *nc) { ...@@ -2486,766 +2324,1078 @@ static void mg_ssl_begin(struct mg_connection *nc) {
/* In case port was set to 0, get the real port number */ /* In case port was set to 0, get the real port number */
(void) getsockname(nc->sock, &sa.sa, &sa_len); (void) getsockname(nc->sock, &sa.sa, &sa_len);
mg_call(nc, MG_EV_ACCEPT, &sa); mg_call(nc, MG_EV_ACCEPT, &sa);
} else {
int err = 0;
mg_call(nc, MG_EV_CONNECT, &err);
} }
} else { } else {
int ssl_err = mg_ssl_err(nc, res); int ssl_err = mg_ssl_err(nc, res);
if (ssl_err != SSL_ERROR_WANT_READ && ssl_err != SSL_ERROR_WANT_WRITE) { if (ssl_err != SSL_ERROR_WANT_READ && ssl_err != SSL_ERROR_WANT_WRITE) {
nc->flags |= MG_F_CLOSE_IMMEDIATELY; nc->flags |= MG_F_CLOSE_IMMEDIATELY;
if (!server_side) {
int err = 0;
mg_call(nc, MG_EV_CONNECT, &err);
}
} }
} }
} }
#endif /* MG_ENABLE_SSL */ #endif /* MG_ENABLE_SSL */
static void mg_connect_done(struct mg_connection *conn, int err) { void mg_send(struct mg_connection *nc, const void *buf, int len) {
#ifdef MG_ENABLE_SSL if (nc->flags & MG_F_UDP) {
if (err == 0 && conn->ssl != NULL) { mg_if_udp_send(nc, buf, len);
mg_ssl_begin(conn);
}
#endif
DBG(("%p connect, err=%d", conn, err));
if (err != 0) {
conn->flags |= MG_F_CLOSE_IMMEDIATELY;
} else { } else {
conn->flags &= ~MG_F_CONNECTING; mg_if_tcp_send(nc, buf, len);
}
mg_call(conn, MG_EV_CONNECT, &err);
return;
}
static void mg_read_from_socket(struct mg_connection *conn) {
int n = 0;
char buf[MG_READ_BUFFER_SIZE];
#ifdef MG_ENABLE_SSL
if (conn->ssl != NULL) {
if (conn->flags & MG_F_SSL_HANDSHAKE_DONE) {
/* SSL library may have more bytes ready to read then we ask to read.
* Therefore, read in a loop until we read everything. Without the loop,
* we skip to the next select() cycle which can just timeout. */
while ((n = SSL_read(conn->ssl, buf, sizeof(buf))) > 0) {
DBG(("%p %d bytes <- %d (SSL)", conn, n, conn->sock));
mbuf_append(&conn->recv_mbuf, buf, n);
mg_call(conn, MG_EV_RECV, &n);
if (conn->flags & MG_F_CLOSE_IMMEDIATELY) break;
} }
mg_ssl_err(conn, n); #ifndef MG_DISABLE_FILESYSTEM
} else { if (nc->mgr && nc->mgr->hexdump_file != NULL) {
mg_ssl_begin(conn); mg_hexdump_connection(nc, nc->mgr->hexdump_file, len, MG_EV_SEND);
return;
} }
} else
#endif #endif
while ((n = (int) MG_RECV_FUNC( }
conn->sock, buf, recv_avail_size(conn, sizeof(buf)), 0)) > 0) {
DBG(("%p %d bytes (PLAIN) <- %d", conn, n, conn->sock));
mbuf_append(&conn->recv_mbuf, buf, n);
mg_call(conn, MG_EV_RECV, &n);
if (conn->flags & MG_F_CLOSE_IMMEDIATELY) break;
}
DBG(("recv returns %d", n));
if (mg_is_error(n)) { void mg_if_sent_cb(struct mg_connection *nc, int num_sent) {
conn->flags |= MG_F_CLOSE_IMMEDIATELY; if (num_sent < 0) {
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
} }
mg_call(nc, MG_EV_SEND, &num_sent);
} }
static void mg_write_to_socket(struct mg_connection *conn) { static void mg_recv_common(struct mg_connection *nc, void *buf, int len) {
struct mbuf *io = &conn->send_mbuf; DBG(("%p %d %u", nc, len, (unsigned int) nc->recv_mbuf.len));
int n = 0; if (nc->recv_mbuf.len == 0) {
/* Adopt buf as recv_mbuf's backing store. */
#ifdef MG_LWIP mbuf_free(&nc->recv_mbuf);
/* With LWIP we don't know if the socket is ready */ nc->recv_mbuf.buf = (char *) buf;
if (io->len == 0) return; nc->recv_mbuf.size = nc->recv_mbuf.len = len;
#endif } else {
size_t avail = recv_avail_size(nc, len);
len = avail;
mbuf_append(&nc->recv_mbuf, buf, len);
MG_FREE(buf);
}
mg_call(nc, MG_EV_RECV, &len);
}
assert(io->len > 0); void mg_if_recv_tcp_cb(struct mg_connection *nc, void *buf, int len) {
mg_recv_common(nc, buf, len);
mg_if_recved(nc, len);
}
#ifdef MG_ENABLE_SSL void mg_if_recv_udp_cb(struct mg_connection *nc, void *buf, int len,
if (conn->ssl != NULL) { union socket_address *sa, size_t sa_len) {
if (conn->flags & MG_F_SSL_HANDSHAKE_DONE) { assert(nc->flags & MG_F_UDP);
n = SSL_write(conn->ssl, io->buf, io->len); DBG(("%p %u", nc, (unsigned int) len));
if (n <= 0) { if (nc->flags & MG_F_LISTENING) {
int ssl_err = mg_ssl_err(conn, n); struct mg_connection *lc = nc;
if (ssl_err == SSL_ERROR_WANT_READ || ssl_err == SSL_ERROR_WANT_WRITE) { /*
return; /* Call us again */ * Do we have an existing connection for this source?
} else { * This is very inefficient for long connection lists.
conn->flags |= MG_F_CLOSE_IMMEDIATELY; */
for (nc = mg_next(lc->mgr, NULL); nc != NULL; nc = mg_next(lc->mgr, nc)) {
if (memcmp(&nc->sa.sa, &sa->sa, sa_len) == 0) break;
} }
} else { if (nc == NULL) {
/* Successful SSL operation, clear off SSL wait flags */ struct mg_add_sock_opts opts;
conn->flags &= ~(MG_F_WANT_READ | MG_F_WANT_WRITE); memset(&opts, 0, sizeof(opts));
nc = mg_create_connection(lc->mgr, lc->handler, opts);
} }
if (nc != NULL) {
nc->sock = lc->sock;
nc->listener = lc;
nc->sa = *sa;
nc->proto_data = lc->proto_data;
nc->proto_handler = lc->proto_handler;
nc->user_data = lc->user_data;
nc->recv_mbuf_limit = lc->recv_mbuf_limit;
nc->flags = MG_F_UDP;
mg_add_conn(lc->mgr, nc);
mg_call(nc, MG_EV_ACCEPT, &nc->sa);
} else { } else {
mg_ssl_begin(conn); DBG(("OOM"));
return;
} }
} else
#endif
{
n = (int) MG_SEND_FUNC(conn->sock, io->buf, io->len, 0);
}
DBG(("%p %d bytes -> %d", conn, n, conn->sock));
if (mg_is_error(n)) {
conn->flags |= MG_F_CLOSE_IMMEDIATELY;
} else if (n > 0) {
#ifndef MG_DISABLE_FILESYSTEM
/* LCOV_EXCL_START */
if (conn->mgr->hexdump_file != NULL) {
mg_hexdump_connection(conn, conn->mgr->hexdump_file, n, MG_EV_SEND);
} }
/* LCOV_EXCL_STOP */ if (nc != NULL) {
#endif mg_recv_common(nc, buf, len);
mbuf_remove(io, n); } else {
/* Drop on the floor. */
MG_FREE(buf);
} }
mg_call(conn, MG_EV_SEND, &n); mg_if_recved(nc, len);
} }
int mg_send(struct mg_connection *conn, const void *buf, int len) { /*
return (int) mg_out(conn, buf, len); * Schedules an async connect for a resolved address and proto.
} * Called from two places: `mg_connect_opt()` and from async resolver.
* When called from the async resolver, it must trigger `MG_EV_CONNECT` event
static void mg_handle_udp(struct mg_connection *ls) { * with a failure flag to indicate connection failure.
struct mg_connection nc; */
char buf[MG_UDP_RECEIVE_BUFFER_SIZE]; MG_INTERNAL struct mg_connection *mg_do_connect(struct mg_connection *nc,
int n; int proto,
socklen_t s_len = sizeof(nc.sa); union socket_address *sa) {
DBG(("%p %s://%s:%hu", nc, proto == SOCK_DGRAM ? "udp" : "tcp",
inet_ntoa(sa->sin.sin_addr), ntohs(sa->sin.sin_port)));
memset(&nc, 0, sizeof(nc)); nc->flags |= MG_F_CONNECTING;
n = recvfrom(ls->sock, buf, sizeof(buf), 0, &nc.sa.sa, &s_len); if (proto == SOCK_DGRAM) {
if (n <= 0) { mg_if_connect_udp(nc);
DBG(("%p recvfrom: %s", ls, strerror(errno)));
} else { } else {
union socket_address sa = nc.sa; mg_if_connect_tcp(nc, sa);
/* Copy all attributes, preserving sender address */ }
nc = *ls; mg_add_conn(nc->mgr, nc);
return nc;
}
/* Then override some */ void mg_if_connect_cb(struct mg_connection *nc, int err) {
nc.sa = sa; DBG(("%p connect, err=%d", nc, err));
nc.recv_mbuf.buf = buf; nc->flags &= ~MG_F_CONNECTING;
nc.recv_mbuf.len = nc.recv_mbuf.size = n; if (err == 0) {
nc.listener = ls; #ifdef MG_ENABLE_SSL
nc.flags = MG_F_UDP; if (nc->ssl != NULL) {
SSL_set_fd(nc->ssl, nc->sock);
mg_ssl_begin(nc);
return;
}
#endif
} else {
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
}
mg_call(nc, MG_EV_CONNECT, &err);
}
/* Call MG_EV_RECV handler */ #ifndef MG_DISABLE_RESOLVER
DBG(("%p %d bytes received", ls, n)); /*
mg_call(&nc, MG_EV_RECV, &n); * Callback for the async resolver on mg_connect_opt() call.
* Main task of this function is to trigger MG_EV_CONNECT event with
* either failure (and dealloc the connection)
* or success (and proceed with connect()
*/
static void resolve_cb(struct mg_dns_message *msg, void *data) {
struct mg_connection *nc = (struct mg_connection *) data;
int i;
int failure = -1;
if (msg != NULL) {
/*
* Take the first DNS A answer and run...
*/
for (i = 0; i < msg->num_answers; i++) {
if (msg->answers[i].rtype == MG_DNS_A_RECORD) {
/* /*
* See https://github.com/cesanta/mongoose/issues/207 * Async resolver guarantees that there is at least one answer.
* mg_call migth set flags. They need to be synced back to ls. * TODO(lsm): handle IPv6 answers too
*/ */
ls->flags = nc.flags; mg_dns_parse_record_data(msg, &msg->answers[i], &nc->sa.sin.sin_addr,
4);
mg_do_connect(nc, nc->flags & MG_F_UDP ? SOCK_DGRAM : SOCK_STREAM,
&nc->sa);
return;
}
} }
}
/*
* If we get there was no MG_DNS_A_RECORD in the answer
*/
mg_call(nc, MG_EV_CONNECT, &failure);
mg_destroy_conn(nc);
} }
#endif
#define _MG_F_FD_CAN_READ 1 struct mg_connection *mg_connect(struct mg_mgr *mgr, const char *address,
#define _MG_F_FD_CAN_WRITE 1 << 1 mg_event_handler_t callback) {
#define _MG_F_FD_ERROR 1 << 2 static struct mg_connect_opts opts;
return mg_connect_opt(mgr, address, callback, opts);
}
static void mg_mgr_handle_connection(struct mg_connection *nc, int fd_flags, struct mg_connection *mg_connect_opt(struct mg_mgr *mgr, const char *address,
time_t now) { mg_event_handler_t callback,
DBG(("%p fd=%d fd_flags=%d nc_flags=%lu rmbl=%d smbl=%d", nc, nc->sock, struct mg_connect_opts opts) {
fd_flags, nc->flags, (int) nc->recv_mbuf.len, (int) nc->send_mbuf.len)); struct mg_connection *nc = NULL;
if (fd_flags != 0) nc->last_io_time = now; int proto, rc;
struct mg_add_sock_opts add_sock_opts;
char host[MG_MAX_HOST_LEN];
if (nc->flags & MG_F_CONNECTING) { MG_COPY_COMMON_CONNECTION_OPTIONS(&add_sock_opts, &opts);
if (fd_flags != 0) {
int err = 1; if ((nc = mg_create_connection(mgr, callback, add_sock_opts)) == NULL) {
#if !defined(MG_CC3200) && !defined(MG_ESP8266) return NULL;
socklen_t len = sizeof(err); } else if ((rc = mg_parse_address(address, &nc->sa, &proto, host,
int ret = getsockopt(nc->sock, SOL_SOCKET, SO_ERROR, (char *) &err, &len); sizeof(host))) < 0) {
if (ret != 0) err = 1; /* Address is malformed */
#else MG_SET_PTRPTR(opts.error_string, "cannot parse address");
/* On CC3200 and ESP8266 we use blocking connect. If we got as far as mg_destroy_conn(nc);
* this, return NULL;
* this means connect() was successful.
* TODO(rojer): Figure out why it fails where blocking succeeds.
*/
mg_set_non_blocking_mode(nc->sock);
err = 0;
#endif
mg_connect_done(nc, err);
}
return;
} }
nc->flags |= opts.flags & _MG_ALLOWED_CONNECT_FLAGS_MASK;
nc->flags |= (proto == SOCK_DGRAM) ? MG_F_UDP : 0;
nc->user_data = opts.user_data;
if (nc->flags & MG_F_LISTENING) { if (rc == 0) {
#ifndef MG_DISABLE_RESOLVER
/* /*
* We're not looping here, and accepting just one connection at * DNS resolution is required for host.
* a time. The reason is that eCos does not respect non-blocking * mg_parse_address() fills port in nc->sa, which we pass to resolve_cb()
* flag on a listening socket and hangs in a loop.
*/ */
if (fd_flags & _MG_F_FD_CAN_READ) accept_conn(nc); if (mg_resolve_async(nc->mgr, host, MG_DNS_A_RECORD, resolve_cb, nc) != 0) {
return; MG_SET_PTRPTR(opts.error_string, "cannot schedule DNS lookup");
mg_destroy_conn(nc);
return NULL;
} }
if (fd_flags & _MG_F_FD_CAN_READ) { return nc;
if (nc->flags & MG_F_UDP) { #else
mg_handle_udp(nc); MG_SET_PTRPTR(opts.error_string, "Resolver is disabled");
mg_destroy_conn(nc);
return NULL;
#endif
} else { } else {
mg_read_from_socket(nc); /* Address is parsed and resolved to IP. proceed with connect() */
return mg_do_connect(nc, proto, &nc->sa);
} }
if (nc->flags & MG_F_CLOSE_IMMEDIATELY) return; }
struct mg_connection *mg_bind(struct mg_mgr *srv, const char *address,
mg_event_handler_t event_handler) {
static struct mg_bind_opts opts;
return mg_bind_opt(srv, address, event_handler, opts);
}
struct mg_connection *mg_bind_opt(struct mg_mgr *mgr, const char *address,
mg_event_handler_t callback,
struct mg_bind_opts opts) {
union socket_address sa;
struct mg_connection *nc = NULL;
int proto, rc;
struct mg_add_sock_opts add_sock_opts;
char host[MG_MAX_HOST_LEN];
MG_COPY_COMMON_CONNECTION_OPTIONS(&add_sock_opts, &opts);
if (mg_parse_address(address, &sa, &proto, host, sizeof(host)) <= 0) {
MG_SET_PTRPTR(opts.error_string, "cannot parse address");
return NULL;
} }
if ((fd_flags & _MG_F_FD_CAN_WRITE) && !(nc->flags & MG_F_DONT_SEND) && nc = mg_create_connection(mgr, callback, add_sock_opts);
!(nc->flags & MG_F_UDP)) { /* Writes to UDP sockets are not buffered. */ if (nc == NULL) {
mg_write_to_socket(nc); return NULL;
} }
if (!(fd_flags & (_MG_F_FD_CAN_READ | _MG_F_FD_CAN_WRITE))) { nc->sa = sa;
mg_call(nc, MG_EV_POLL, &now); nc->flags |= MG_F_LISTENING;
if (proto == SOCK_DGRAM) {
nc->flags |= MG_F_UDP;
rc = mg_if_listen_udp(nc, &nc->sa);
} else {
rc = mg_if_listen_tcp(nc, &nc->sa);
}
if (rc != 0) {
DBG(("Failed to open listener: %d", rc));
MG_SET_PTRPTR(opts.error_string, "failed to open listener");
mg_destroy_conn(nc);
return NULL;
} }
mg_add_conn(nc->mgr, nc);
DBG(("%p after fd=%d nc_flags=%lu rmbl=%d smbl=%d", nc, nc->sock, nc->flags, return nc;
(int) nc->recv_mbuf.len, (int) nc->send_mbuf.len));
} }
#ifndef MG_DISABLE_SOCKETPAIR struct mg_connection *mg_next(struct mg_mgr *s, struct mg_connection *conn) {
static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) { return conn == NULL ? s->active_connections : conn->next;
}
void mg_broadcast(struct mg_mgr *mgr, mg_event_handler_t cb, void *data,
size_t len) {
struct ctl_msg ctl_msg; struct ctl_msg ctl_msg;
int len =
(int) MG_RECV_FUNC(mgr->ctl[1], (char *) &ctl_msg, sizeof(ctl_msg), 0);
size_t dummy = MG_SEND_FUNC(mgr->ctl[1], ctl_msg.message, 1, 0);
(void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */
if (len >= (int) sizeof(ctl_msg.callback) && ctl_msg.callback != NULL) { /*
struct mg_connection *nc; * Mongoose manager has a socketpair, `struct mg_mgr::ctl`,
for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) { * where `mg_broadcast()` pushes the message.
ctl_msg.callback(nc, MG_EV_POLL, ctl_msg.message); * `mg_mgr_poll()` wakes up, reads a message from the socket pair, and calls
} * specified callback for each connection. Thus the callback function executes
* in event manager thread.
*/
if (mgr->ctl[0] != INVALID_SOCKET && data != NULL &&
len < sizeof(ctl_msg.message)) {
size_t dummy;
ctl_msg.callback = cb;
memcpy(ctl_msg.message, data, len);
dummy = MG_SEND_FUNC(mgr->ctl[0], (char *) &ctl_msg,
offsetof(struct ctl_msg, message) + len, 0);
dummy = MG_RECV_FUNC(mgr->ctl[0], (char *) &len, 1, 0);
(void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */
} }
} }
#endif
#if MG_MGR_EV_MGR == 1 /* epoll() */ static int isbyte(int n) {
return n >= 0 && n <= 255;
}
#ifndef MG_EPOLL_MAX_EVENTS static int parse_net(const char *spec, uint32_t *net, uint32_t *mask) {
#define MG_EPOLL_MAX_EVENTS 100 int n, a, b, c, d, slash = 32, len = 0;
#endif
#define _MG_EPF_EV_EPOLLIN (1 << 0) if ((sscanf(spec, "%d.%d.%d.%d/%d%n", &a, &b, &c, &d, &slash, &n) == 5 ||
#define _MG_EPF_EV_EPOLLOUT (1 << 1) sscanf(spec, "%d.%d.%d.%d%n", &a, &b, &c, &d, &n) == 4) &&
#define _MG_EPF_NO_POLL (1 << 2) isbyte(a) && isbyte(b) && isbyte(c) && isbyte(d) && slash >= 0 &&
slash < 33) {
len = n;
*net =
((uint32_t) a << 24) | ((uint32_t) b << 16) | ((uint32_t) c << 8) | d;
*mask = slash ? 0xffffffffU << (32 - slash) : 0;
}
static uint32_t mg_epf_to_evflags(unsigned int epf) { return len;
uint32_t result = 0;
if (epf & _MG_EPF_EV_EPOLLIN) result |= EPOLLIN;
if (epf & _MG_EPF_EV_EPOLLOUT) result |= EPOLLOUT;
return result;
} }
static void mg_ev_mgr_epoll_set_flags(const struct mg_connection *nc, int mg_check_ip_acl(const char *acl, uint32_t remote_ip) {
struct epoll_event *ev) { int allowed, flag;
/* NOTE: EPOLLERR and EPOLLHUP are always enabled. */ uint32_t net, mask;
ev->events = 0; struct mg_str vec;
if (nc->recv_mbuf.len < nc->recv_mbuf_limit) {
ev->events |= EPOLLIN; /* If any ACL is set, deny by default */
allowed = (acl == NULL || *acl == '\0') ? '+' : '-';
while ((acl = mg_next_comma_list_entry(acl, &vec, NULL)) != NULL) {
flag = vec.p[0];
if ((flag != '+' && flag != '-') ||
parse_net(&vec.p[1], &net, &mask) == 0) {
return -1;
} }
if ((nc->flags & MG_F_CONNECTING) ||
(nc->send_mbuf.len > 0 && !(nc->flags & MG_F_DONT_SEND))) { if (net == (remote_ip & mask)) {
ev->events |= EPOLLOUT; allowed = flag;
} }
}
return allowed == '+';
} }
static void mg_ev_mgr_epoll_ctl(struct mg_connection *nc, int op) { /* Move data from one connection to another */
int epoll_fd = (intptr_t) nc->mgr->mgr_data; void mg_forward(struct mg_connection *from, struct mg_connection *to) {
struct epoll_event ev; mg_send(to, from->recv_mbuf.buf, from->recv_mbuf.len);
assert(op == EPOLL_CTL_ADD || op == EPOLL_CTL_MOD || EPOLL_CTL_DEL); mbuf_remove(&from->recv_mbuf, from->recv_mbuf.len);
if (op != EPOLL_CTL_DEL) {
mg_ev_mgr_epoll_set_flags(nc, &ev);
if (op == EPOLL_CTL_MOD) {
uint32_t old_ev_flags = mg_epf_to_evflags((intptr_t) nc->mgr_data);
if (ev.events == old_ev_flags) return;
}
ev.data.ptr = nc;
}
if (epoll_ctl(epoll_fd, op, nc->sock, &ev) != 0) {
perror("epoll_ctl");
abort();
}
} }
#ifdef NS_MODULE_LINES
#line 1 "src/net_if_socket.c"
/**/
#endif
/* Amalgamated: #include "internal.h" */
static void mg_ev_mgr_init(struct mg_mgr *mgr) { #define MG_TCP_RECV_BUFFER_SIZE 1024
int epoll_fd; #define MG_UDP_RECV_BUFFER_SIZE 1500
DBG(("%p using epoll()", mgr));
epoll_fd = epoll_create(MG_EPOLL_MAX_EVENTS /* unused but required */); static sock_t mg_open_listening_socket(union socket_address *sa, int proto);
if (epoll_fd < 0) {
perror("epoll_ctl"); void mg_set_non_blocking_mode(sock_t sock) {
abort(); #ifdef _WIN32
} unsigned long on = 1;
mgr->mgr_data = (void *) ((intptr_t) epoll_fd); ioctlsocket(sock, FIONBIO, &on);
if (mgr->ctl[1] != INVALID_SOCKET) { #elif defined(MG_CC3200)
struct epoll_event ev; cc3200_set_non_blocking_mode(sock);
ev.events = EPOLLIN; #else
ev.data.ptr = NULL; int flags = fcntl(sock, F_GETFL, 0);
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, mgr->ctl[1], &ev) != 0) { fcntl(sock, F_SETFL, flags | O_NONBLOCK);
perror("epoll_ctl"); #endif
abort(); }
int mg_is_error(int n) {
#ifdef MG_CC3200
DBG(("n = %d, errno = %d", n, errno));
if (n < 0) errno = n;
#endif
return n == 0 || (n < 0 && errno != EINTR && errno != EINPROGRESS &&
errno != EAGAIN && errno != EWOULDBLOCK
#ifdef MG_CC3200
&& errno != SL_EALREADY
#endif
#ifdef _WIN32
&& WSAGetLastError() != WSAEINTR &&
WSAGetLastError() != WSAEWOULDBLOCK
#endif
);
}
void mg_if_connect_tcp(struct mg_connection *nc,
const union socket_address *sa) {
int rc;
nc->sock = socket(AF_INET, SOCK_STREAM, 0);
if (nc->sock < 0) {
nc->sock = INVALID_SOCKET;
nc->err = errno ? errno : 1;
return;
} }
#if !defined(MG_CC3200) && !defined(MG_ESP8266)
mg_set_non_blocking_mode(nc->sock);
#endif
rc = connect(nc->sock, &sa->sa, sizeof(sa->sin));
nc->err = rc == 0 ? 0 : (errno ? errno : 1);
DBG(("%p sock %d err %d", nc, nc->sock, nc->err));
}
void mg_if_connect_udp(struct mg_connection *nc) {
nc->sock = socket(AF_INET, SOCK_DGRAM, 0);
if (nc->sock < 0) {
nc->sock = INVALID_SOCKET;
nc->err = errno ? errno : 1;
return;
} }
nc->err = 0;
} }
static void mg_ev_mgr_free(struct mg_mgr *mgr) { int mg_if_listen_tcp(struct mg_connection *nc, union socket_address *sa) {
int epoll_fd = (intptr_t) mgr->mgr_data; sock_t sock = mg_open_listening_socket(sa, SOCK_STREAM);
close(epoll_fd); if (sock < 0) return (errno ? errno : 1);
mg_set_sock(nc, sock);
return 0;
} }
static void mg_ev_mgr_add_conn(struct mg_connection *nc) { int mg_if_listen_udp(struct mg_connection *nc, union socket_address *sa) {
mg_ev_mgr_epoll_ctl(nc, EPOLL_CTL_ADD); sock_t sock = mg_open_listening_socket(sa, SOCK_DGRAM);
if (sock < 0) return (errno ? errno : 1);
mg_set_sock(nc, sock);
return 0;
} }
static void mg_ev_mgr_remove_conn(struct mg_connection *nc) { void mg_if_tcp_send(struct mg_connection *nc, const void *buf, size_t len) {
mg_ev_mgr_epoll_ctl(nc, EPOLL_CTL_DEL); mbuf_append(&nc->send_mbuf, buf, len);
} }
time_t mg_mgr_poll(struct mg_mgr *mgr, int timeout_ms) { void mg_if_udp_send(struct mg_connection *nc, const void *buf, size_t len) {
int epoll_fd = (intptr_t) mgr->mgr_data; DBG(("%p %d %d", nc, (int) len, (int) nc->send_mbuf.len));
struct epoll_event events[MG_EPOLL_MAX_EVENTS]; mbuf_append(&nc->send_mbuf, buf, len);
struct mg_connection *nc, *next; }
int num_ev, fd_flags;
time_t now;
num_ev = epoll_wait(epoll_fd, events, MG_EPOLL_MAX_EVENTS, timeout_ms); void mg_if_recved(struct mg_connection *nc, size_t len) {
now = time(NULL); (void) nc;
DBG(("epoll_wait @ %ld num_ev=%d", (long) now, num_ev)); (void) len;
}
while (num_ev-- > 0) { void mg_if_destroy_conn(struct mg_connection *nc) {
intptr_t epf; if (nc->sock == INVALID_SOCKET) return;
struct epoll_event *ev = events + num_ev; #ifdef MG_ENABLE_SSL
nc = (struct mg_connection *) ev->data.ptr; if (nc->ssl != NULL) SSL_free(nc->ssl);
if (nc == NULL) { if (nc->ssl_ctx != NULL) SSL_CTX_free(nc->ssl_ctx);
mg_mgr_handle_ctl_sock(mgr); #endif
continue; if (!(nc->flags & MG_F_UDP)) {
closesocket(nc->sock);
} else {
/* Only close outgoing UDP sockets or listeners. */
if (nc->listener == NULL) closesocket(nc->sock);
} }
fd_flags = ((ev->events & (EPOLLIN | EPOLLHUP)) ? _MG_F_FD_CAN_READ : 0) | /*
((ev->events & (EPOLLOUT)) ? _MG_F_FD_CAN_WRITE : 0) | * avoid users accidentally double close a socket
((ev->events & (EPOLLERR)) ? _MG_F_FD_ERROR : 0); * because it can lead to difficult to debug situations.
mg_mgr_handle_connection(nc, fd_flags, now); * It would happen only if reusing a destroyed mg_connection
epf = (intptr_t) nc->mgr_data; * but it's not always possible to run the code through an
epf ^= _MG_EPF_NO_POLL; * address sanitizer.
nc->mgr_data = (void *) epf; */
nc->sock = INVALID_SOCKET;
}
static void mg_accept_conn(struct mg_connection *lc) {
union socket_address sa;
socklen_t sa_len = sizeof(sa);
/* NOTE(lsm): on Windows, sock is always > FD_SETSIZE */
sock_t sock = accept(lc->sock, &sa.sa, &sa_len);
if (sock < 0) {
DBG(("%p: failed to accept: %d", lc, errno));
return;
} }
mg_if_accept_tcp_cb(lc, sock, &sa, sa_len);
}
for (nc = mgr->active_connections; nc != NULL; nc = next) { /* 'sa' must be an initialized address to bind to */
next = nc->next; static sock_t mg_open_listening_socket(union socket_address *sa, int proto) {
if (!(((intptr_t) nc->mgr_data) & _MG_EPF_NO_POLL)) { socklen_t sa_len =
mg_mgr_handle_connection(nc, 0, now); (sa->sa.sa_family == AF_INET) ? sizeof(sa->sin) : sizeof(sa->sin6);
sock_t sock = INVALID_SOCKET;
#if !defined(MG_CC3200) && !defined(MG_LWIP)
int on = 1;
#endif
if ((sock = socket(sa->sa.sa_family, proto, 0)) != INVALID_SOCKET &&
#if !defined(MG_CC3200) && \
!defined(MG_LWIP) /* CC3200 and LWIP don't support either */
#if defined(_WIN32) && defined(SO_EXCLUSIVEADDRUSE)
/* "Using SO_REUSEADDR and SO_EXCLUSIVEADDRUSE" http://goo.gl/RmrFTm */
!setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (void *) &on,
sizeof(on)) &&
#endif
#if !defined(_WIN32) || !defined(SO_EXCLUSIVEADDRUSE)
/*
* SO_RESUSEADDR is not enabled on Windows because the semantics of
* SO_REUSEADDR on UNIX and Windows is different. On Windows,
* SO_REUSEADDR allows to bind a socket to a port without error even if
* the port is already open by another program. This is not the behavior
* SO_REUSEADDR was designed for, and leads to hard-to-track failure
* scenarios. Therefore, SO_REUSEADDR was disabled on Windows unless
* SO_EXCLUSIVEADDRUSE is supported and set on a socket.
*/
!setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof(on)) &&
#endif
#endif /* !MG_CC3200 && !MG_LWIP */
!bind(sock, &sa->sa, sa_len) &&
(proto == SOCK_DGRAM || listen(sock, SOMAXCONN) == 0)) {
#if !defined(MG_CC3200) && !defined(MG_LWIP) /* TODO(rojer): Fix this. */
mg_set_non_blocking_mode(sock);
/* In case port was set to 0, get the real port number */
(void) getsockname(sock, &sa->sa, &sa_len);
#endif
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
sock = INVALID_SOCKET;
}
return sock;
}
static void mg_write_to_socket(struct mg_connection *nc) {
struct mbuf *io = &nc->send_mbuf;
int n = 0;
#ifdef MG_LWIP
/* With LWIP we don't know if the socket is ready */
if (io->len == 0) return;
#endif
assert(io->len > 0);
if (nc->flags & MG_F_UDP) {
int n =
sendto(nc->sock, io->buf, io->len, 0, &nc->sa.sa, sizeof(nc->sa.sin));
DBG(("%p %d %d %d %s:%hu", nc, nc->sock, n, errno,
inet_ntoa(nc->sa.sin.sin_addr), ntohs(nc->sa.sin.sin_port)));
if (n > 0) {
mbuf_remove(io, n);
}
mg_if_sent_cb(nc, n);
return;
}
#ifdef MG_ENABLE_SSL
if (nc->ssl != NULL) {
if (nc->flags & MG_F_SSL_HANDSHAKE_DONE) {
n = SSL_write(nc->ssl, io->buf, io->len);
if (n <= 0) {
int ssl_err = mg_ssl_err(nc, n);
if (ssl_err == SSL_ERROR_WANT_READ || ssl_err == SSL_ERROR_WANT_WRITE) {
return; /* Call us again */
} else { } else {
intptr_t epf = (intptr_t) nc->mgr_data; nc->flags |= MG_F_CLOSE_IMMEDIATELY;
epf ^= _MG_EPF_NO_POLL;
nc->mgr_data = (void *) epf;
} }
if ((nc->flags & MG_F_CLOSE_IMMEDIATELY) ||
(nc->send_mbuf.len == 0 && (nc->flags & MG_F_SEND_AND_CLOSE))) {
mg_close_conn(nc);
} else { } else {
mg_ev_mgr_epoll_ctl(nc, EPOLL_CTL_MOD); /* Successful SSL operation, clear off SSL wait flags */
nc->flags &= ~(MG_F_WANT_READ | MG_F_WANT_WRITE);
} }
} else {
mg_ssl_begin(nc);
return;
}
} else
#endif
{
n = (int) MG_SEND_FUNC(nc->sock, io->buf, io->len, 0);
} }
return now; DBG(("%p %d bytes -> %d", nc, n, nc->sock));
}
#else /* select() */
static void mg_ev_mgr_init(struct mg_mgr *mgr) { if (n > 0) {
(void) mgr; mbuf_remove(io, n);
DBG(("%p using select()", mgr)); }
mg_if_sent_cb(nc, n);
} }
static void mg_ev_mgr_free(struct mg_mgr *mgr) { static void mg_read_from_socket(struct mg_connection *conn) {
(void) mgr; int n = 0;
} char *buf = (char *) MG_MALLOC(MG_TCP_RECV_BUFFER_SIZE);
static void mg_ev_mgr_add_conn(struct mg_connection *nc) { if (buf == NULL) {
(void) nc; DBG(("OOM"));
} return;
}
static void mg_ev_mgr_remove_conn(struct mg_connection *nc) { #ifdef MG_ENABLE_SSL
(void) nc; if (conn->ssl != NULL) {
if (conn->flags & MG_F_SSL_HANDSHAKE_DONE) {
/* SSL library may have more bytes ready to read then we ask to read.
* Therefore, read in a loop until we read everything. Without the loop,
* we skip to the next select() cycle which can just timeout. */
while ((n = SSL_read(conn->ssl, buf, MG_TCP_RECV_BUFFER_SIZE)) > 0) {
DBG(("%p %d bytes <- %d (SSL)", conn, n, conn->sock));
mg_if_recv_tcp_cb(conn, buf, n);
buf = NULL;
if (conn->flags & MG_F_CLOSE_IMMEDIATELY) break;
/* buf has been freed, we need a new one. */
buf = (char *) MG_MALLOC(MG_TCP_RECV_BUFFER_SIZE);
if (buf == NULL) break;
}
MG_FREE(buf);
mg_ssl_err(conn, n);
} else {
MG_FREE(buf);
mg_ssl_begin(conn);
return;
}
} else
#endif
{
n = (int) MG_RECV_FUNC(conn->sock, buf,
recv_avail_size(conn, MG_TCP_RECV_BUFFER_SIZE), 0);
if (n > 0) {
DBG(("%p %d bytes (PLAIN) <- %d", conn, n, conn->sock));
mg_if_recv_tcp_cb(conn, buf, n);
} else {
MG_FREE(buf);
}
if (mg_is_error(n)) {
conn->flags |= MG_F_CLOSE_IMMEDIATELY;
}
}
} }
static void mg_add_to_set(sock_t sock, fd_set *set, sock_t *max_fd) { static int mg_recvfrom(struct mg_connection *nc, union socket_address *sa,
if (sock != INVALID_SOCKET) { socklen_t *sa_len, char **buf) {
FD_SET(sock, set); int n;
if (*max_fd == INVALID_SOCKET || sock > *max_fd) { *buf = (char *) MG_MALLOC(MG_UDP_RECV_BUFFER_SIZE);
*max_fd = sock; if (*buf == NULL) {
DBG(("Out of memory"));
return -ENOMEM;
} }
n = recvfrom(nc->sock, *buf, MG_UDP_RECV_BUFFER_SIZE, 0, &sa->sa, sa_len);
if (n <= 0) {
DBG(("%p recvfrom: %s", nc, strerror(errno)));
MG_FREE(*buf);
} }
return n;
} }
time_t mg_mgr_poll(struct mg_mgr *mgr, int milli) { static void mg_handle_udp_read(struct mg_connection *nc) {
time_t now; char *buf = NULL;
struct mg_connection *nc, *tmp; union socket_address sa;
struct timeval tv; socklen_t sa_len = sizeof(sa);
fd_set read_set, write_set, err_set; int n = mg_recvfrom(nc, &sa, &sa_len, &buf);
sock_t max_fd = INVALID_SOCKET; DBG(("%p %d bytes from %s:%d", nc, n, inet_ntoa(nc->sa.sin.sin_addr),
int num_selected; ntohs(nc->sa.sin.sin_port)));
mg_if_recv_udp_cb(nc, buf, n, &sa, sa_len);
}
FD_ZERO(&read_set); #define _MG_F_FD_CAN_READ 1
FD_ZERO(&write_set); #define _MG_F_FD_CAN_WRITE 1 << 1
FD_ZERO(&err_set); #define _MG_F_FD_ERROR 1 << 2
mg_add_to_set(mgr->ctl[1], &read_set, &max_fd);
for (nc = mgr->active_connections; nc != NULL; nc = tmp) { void mg_mgr_handle_conn(struct mg_connection *nc, int fd_flags, time_t now) {
tmp = nc->next; DBG(("%p fd=%d fd_flags=%d nc_flags=%lu rmbl=%d smbl=%d", nc, nc->sock,
fd_flags, nc->flags, (int) nc->recv_mbuf.len, (int) nc->send_mbuf.len));
if (fd_flags != 0) nc->last_io_time = now;
if (!(nc->flags & MG_F_WANT_WRITE) && if (nc->flags & MG_F_CONNECTING) {
nc->recv_mbuf.len < nc->recv_mbuf_limit) { if (fd_flags != 0) {
mg_add_to_set(nc->sock, &read_set, &max_fd); int err = 0;
#if !defined(MG_CC3200) && !defined(MG_ESP8266)
if (!(nc->flags & MG_F_UDP)) {
socklen_t len = sizeof(err);
int ret =
getsockopt(nc->sock, SOL_SOCKET, SO_ERROR, (char *) &err, &len);
if (ret != 0) err = 1;
}
#else
/* On CC3200 and ESP8266 we use blocking connect. If we got as far as
* this, it means connect() was successful.
* TODO(rojer): Figure out why it fails where blocking succeeds.
*/
#endif
mg_if_connect_cb(nc, err);
} else if (nc->err != 0) {
mg_if_connect_cb(nc, nc->err);
}
}
if (fd_flags & _MG_F_FD_CAN_READ) {
if (nc->flags & MG_F_UDP) {
mg_handle_udp_read(nc);
} else {
if (nc->flags & MG_F_LISTENING) {
/*
* We're not looping here, and accepting just one connection at
* a time. The reason is that eCos does not respect non-blocking
* flag on a listening socket and hangs in a loop.
*/
if (fd_flags & _MG_F_FD_CAN_READ) mg_accept_conn(nc);
return;
} else {
mg_read_from_socket(nc);
}
}
if (nc->flags & MG_F_CLOSE_IMMEDIATELY) return;
} }
if (((nc->flags & MG_F_CONNECTING) && !(nc->flags & MG_F_WANT_READ)) || if ((fd_flags & _MG_F_FD_CAN_WRITE) && !(nc->flags & MG_F_DONT_SEND) &&
(nc->send_mbuf.len > 0 && !(nc->flags & MG_F_CONNECTING) && nc->send_mbuf.len > 0) {
!(nc->flags & MG_F_DONT_SEND))) { mg_write_to_socket(nc);
mg_add_to_set(nc->sock, &write_set, &max_fd);
mg_add_to_set(nc->sock, &err_set, &max_fd);
}
} }
tv.tv_sec = milli / 1000; if (!(fd_flags & (_MG_F_FD_CAN_READ | _MG_F_FD_CAN_WRITE))) {
tv.tv_usec = (milli % 1000) * 1000; mg_call(nc, MG_EV_POLL, &now);
}
num_selected = select((int) max_fd + 1, &read_set, &write_set, &err_set, &tv); DBG(("%p after fd=%d nc_flags=%lu rmbl=%d smbl=%d", nc, nc->sock, nc->flags,
now = time(NULL); (int) nc->recv_mbuf.len, (int) nc->send_mbuf.len));
DBG(("select @ %ld num_ev=%d", (long) now, num_selected)); }
#ifndef MG_DISABLE_SOCKETPAIR #ifndef MG_DISABLE_SOCKETPAIR
if (num_selected > 0 && mgr->ctl[1] != INVALID_SOCKET && static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) {
FD_ISSET(mgr->ctl[1], &read_set)) { struct ctl_msg ctl_msg;
mg_mgr_handle_ctl_sock(mgr); int len =
} (int) MG_RECV_FUNC(mgr->ctl[1], (char *) &ctl_msg, sizeof(ctl_msg), 0);
#endif size_t dummy = MG_SEND_FUNC(mgr->ctl[1], ctl_msg.message, 1, 0);
(void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */
for (nc = mgr->active_connections; nc != NULL; nc = tmp) { if (len >= (int) sizeof(ctl_msg.callback) && ctl_msg.callback != NULL) {
int fd_flags = 0; struct mg_connection *nc;
if (num_selected > 0) { for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) {
fd_flags = (FD_ISSET(nc->sock, &read_set) ? _MG_F_FD_CAN_READ : 0) | ctl_msg.callback(nc, MG_EV_POLL, ctl_msg.message);
(FD_ISSET(nc->sock, &write_set) ? _MG_F_FD_CAN_WRITE : 0) |
(FD_ISSET(nc->sock, &err_set) ? _MG_F_FD_ERROR : 0);
} }
#ifdef MG_CC3200
// CC3200 does not report UDP sockets as writeable.
if (nc->flags & MG_F_UDP &&
(nc->send_mbuf.len > 0 || nc->flags & MG_F_CONNECTING)) {
fd_flags |= _MG_F_FD_CAN_WRITE;
} }
}
#endif #endif
#ifdef MG_LWIP
/* With LWIP socket emulation layer, we don't get write events */
fd_flags |= _MG_F_FD_CAN_WRITE;
#endif
tmp = nc->next;
mg_mgr_handle_connection(nc, fd_flags, now);
}
for (nc = mgr->active_connections; nc != NULL; nc = tmp) { struct mg_connection *mg_add_sock(struct mg_mgr *s, sock_t sock,
tmp = nc->next; mg_event_handler_t callback) {
if ((nc->flags & MG_F_CLOSE_IMMEDIATELY) || static struct mg_add_sock_opts opts;
(nc->send_mbuf.len == 0 && (nc->flags & MG_F_SEND_AND_CLOSE))) { return mg_add_sock_opt(s, sock, callback, opts);
mg_close_conn(nc); }
}
struct mg_connection *mg_add_sock_opt(struct mg_mgr *s, sock_t sock,
mg_event_handler_t callback,
struct mg_add_sock_opts opts) {
struct mg_connection *nc = mg_create_connection(s, callback, opts);
if (nc != NULL) {
mg_set_sock(nc, sock);
mg_add_conn(nc->mgr, nc);
} }
return nc;
}
return now; /* Associate a socket to a connection. */
MG_INTERNAL void mg_set_sock(struct mg_connection *nc, sock_t sock) {
mg_set_non_blocking_mode(sock);
mg_set_close_on_exec(sock);
nc->sock = sock;
DBG(("%p %d", nc, sock));
} }
#if MG_MGR_EV_MGR == 1 /* epoll() */
#ifndef MG_EPOLL_MAX_EVENTS
#define MG_EPOLL_MAX_EVENTS 100
#endif #endif
/* #define _MG_EPF_EV_EPOLLIN (1 << 0)
* Schedules an async connect for a resolved address and proto. #define _MG_EPF_EV_EPOLLOUT (1 << 1)
* Called from two places: `mg_connect_opt()` and from async resolver. #define _MG_EPF_NO_POLL (1 << 2)
* When called from the async resolver, it must trigger `MG_EV_CONNECT` event
* with a failure flag to indicate connection failure.
*/
MG_INTERNAL struct mg_connection *mg_finish_connect(struct mg_connection *nc,
int proto,
union socket_address *sa,
struct mg_add_sock_opts o) {
sock_t sock = INVALID_SOCKET;
int rc;
DBG(("%p %s://%s:%hu", nc, proto == SOCK_DGRAM ? "udp" : "tcp", static uint32_t mg_epf_to_evflags(unsigned int epf) {
inet_ntoa(nc->sa.sin.sin_addr), ntohs(nc->sa.sin.sin_port))); uint32_t result = 0;
if (epf & _MG_EPF_EV_EPOLLIN) result |= EPOLLIN;
if (epf & _MG_EPF_EV_EPOLLOUT) result |= EPOLLOUT;
return result;
}
if ((sock = socket(AF_INET, proto, 0)) == INVALID_SOCKET) { static void mg_ev_mgr_epoll_set_flags(const struct mg_connection *nc,
int failure = errno; struct epoll_event *ev) {
MG_SET_PTRPTR(o.error_string, "cannot create socket"); /* NOTE: EPOLLERR and EPOLLHUP are always enabled. */
if (nc->flags & MG_F_CONNECTING) { ev->events = 0;
mg_call(nc, MG_EV_CONNECT, &failure); if ((nc->flags & MG_F_LISTENING) || nc->recv_mbuf.len < nc->recv_mbuf_limit) {
ev->events |= EPOLLIN;
} }
mg_destroy_conn(nc); if ((nc->flags & MG_F_CONNECTING) ||
return NULL; (nc->send_mbuf.len > 0 && !(nc->flags & MG_F_DONT_SEND))) {
ev->events |= EPOLLOUT;
} }
}
#if !defined(MG_CC3200) && !defined(MG_ESP8266) static void mg_ev_mgr_epoll_ctl(struct mg_connection *nc, int op) {
mg_set_non_blocking_mode(sock); int epoll_fd = (intptr_t) nc->mgr->mgr_data;
#endif struct epoll_event ev;
rc = (proto == SOCK_DGRAM) ? 0 : connect(sock, &sa->sa, sizeof(sa->sin)); assert(op == EPOLL_CTL_ADD || op == EPOLL_CTL_MOD || EPOLL_CTL_DEL);
DBG(("%p %d %d", nc, nc->sock, op));
if (rc != 0 && mg_is_error(rc)) { if (nc->sock == INVALID_SOCKET) return;
MG_SET_PTRPTR(o.error_string, "cannot connect to socket"); if (op != EPOLL_CTL_DEL) {
if (nc->flags & MG_F_CONNECTING) { mg_ev_mgr_epoll_set_flags(nc, &ev);
mg_call(nc, MG_EV_CONNECT, &rc); if (op == EPOLL_CTL_MOD) {
uint32_t old_ev_flags = mg_epf_to_evflags((intptr_t) nc->mgr_data);
if (ev.events == old_ev_flags) return;
} }
mg_destroy_conn(nc); ev.data.ptr = nc;
close(sock);
return NULL;
} }
if (epoll_ctl(epoll_fd, op, nc->sock, &ev) != 0) {
/* Fire MG_EV_CONNECT on next poll. */ perror("epoll_ctl");
nc->flags |= MG_F_CONNECTING; abort();
/* No mg_destroy_conn() call after this! */
mg_set_sock(nc, sock);
#ifdef MG_ENABLE_SSL
/*
* If we are using async resolver, socket isn't open
* before this place, so
* for SSL connections we have to add socket to SSL fd set
*/
if (nc->ssl != NULL && !(nc->flags & MG_F_LISTENING)) {
SSL_set_fd(nc->ssl, nc->sock);
} }
#endif
return nc;
} }
#ifndef MG_DISABLE_RESOLVER static void mg_ev_mgr_init(struct mg_mgr *mgr) {
/* int epoll_fd;
* Callback for the async resolver on mg_connect_opt() call. DBG(("%p using epoll()", mgr));
* Main task of this function is to trigger MG_EV_CONNECT event with #ifndef MG_DISABLE_SOCKETPAIR
* either failure (and dealloc the connection) do {
* or success (and proceed with connect() mg_socketpair(mgr->ctl, SOCK_DGRAM);
*/ } while (mgr->ctl[0] == INVALID_SOCKET);
static void resolve_cb(struct mg_dns_message *msg, void *data) { #endif
struct mg_connection *nc = (struct mg_connection *) data; epoll_fd = epoll_create(MG_EPOLL_MAX_EVENTS /* unused but required */);
int i; if (epoll_fd < 0) {
int failure = -1; perror("epoll_ctl");
abort();
if (msg != NULL) {
/*
* Take the first DNS A answer and run...
*/
for (i = 0; i < msg->num_answers; i++) {
if (msg->answers[i].rtype == MG_DNS_A_RECORD) {
static struct mg_add_sock_opts opts;
/*
* Async resolver guarantees that there is at least one answer.
* TODO(lsm): handle IPv6 answers too
*/
mg_dns_parse_record_data(msg, &msg->answers[i], &nc->sa.sin.sin_addr,
4);
/* Make mg_finish_connect() trigger MG_EV_CONNECT on failure */
nc->flags |= MG_F_CONNECTING;
mg_finish_connect(nc, nc->flags & MG_F_UDP ? SOCK_DGRAM : SOCK_STREAM,
&nc->sa, opts);
return;
} }
mgr->mgr_data = (void *) ((intptr_t) epoll_fd);
if (mgr->ctl[1] != INVALID_SOCKET) {
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.ptr = NULL;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, mgr->ctl[1], &ev) != 0) {
perror("epoll_ctl");
abort();
} }
} }
}
/* static void mg_ev_mgr_free(struct mg_mgr *mgr) {
* If we get there was no MG_DNS_A_RECORD in the answer int epoll_fd = (intptr_t) mgr->mgr_data;
*/ close(epoll_fd);
mg_call(nc, MG_EV_CONNECT, &failure);
mg_destroy_conn(nc);
} }
#endif
struct mg_connection *mg_connect(struct mg_mgr *mgr, const char *address, static void mg_ev_mgr_add_conn(struct mg_connection *nc) {
mg_event_handler_t callback) { if (!(nc->flags & MG_F_UDP) || nc->listener == NULL) {
static struct mg_connect_opts opts; mg_ev_mgr_epoll_ctl(nc, EPOLL_CTL_ADD);
return mg_connect_opt(mgr, address, callback, opts); }
} }
struct mg_connection *mg_connect_opt(struct mg_mgr *mgr, const char *address, static void mg_ev_mgr_remove_conn(struct mg_connection *nc) {
mg_event_handler_t callback, if (!(nc->flags & MG_F_UDP) || nc->listener == NULL) {
struct mg_connect_opts opts) { mg_ev_mgr_epoll_ctl(nc, EPOLL_CTL_DEL);
struct mg_connection *nc = NULL; }
int proto, rc; }
struct mg_add_sock_opts add_sock_opts;
char host[MG_MAX_HOST_LEN];
MG_COPY_COMMON_CONNECTION_OPTIONS(&add_sock_opts, &opts); time_t mg_mgr_poll(struct mg_mgr *mgr, int timeout_ms) {
int epoll_fd = (intptr_t) mgr->mgr_data;
struct epoll_event events[MG_EPOLL_MAX_EVENTS];
struct mg_connection *nc, *next;
int num_ev, fd_flags;
time_t now;
if ((nc = mg_create_connection(mgr, callback, add_sock_opts)) == NULL) { num_ev = epoll_wait(epoll_fd, events, MG_EPOLL_MAX_EVENTS, timeout_ms);
return NULL; now = time(NULL);
} else if ((rc = mg_parse_address(address, &nc->sa, &proto, host, DBG(("epoll_wait @ %ld num_ev=%d", (long) now, num_ev));
sizeof(host))) < 0) {
/* Address is malformed */
MG_SET_PTRPTR(opts.error_string, "cannot parse address");
mg_destroy_conn(nc);
return NULL;
}
nc->flags |= opts.flags & _MG_ALLOWED_CONNECT_FLAGS_MASK;
nc->flags |= (proto == SOCK_DGRAM) ? MG_F_UDP : 0;
nc->user_data = opts.user_data;
if (rc == 0) { while (num_ev-- > 0) {
#ifndef MG_DISABLE_RESOLVER intptr_t epf;
/* struct epoll_event *ev = events + num_ev;
* DNS resolution is required for host. nc = (struct mg_connection *) ev->data.ptr;
* mg_parse_address() fills port in nc->sa, which we pass to resolve_cb() if (nc == NULL) {
*/ mg_mgr_handle_ctl_sock(mgr);
if (mg_resolve_async(nc->mgr, host, MG_DNS_A_RECORD, resolve_cb, nc) != 0) { continue;
MG_SET_PTRPTR(opts.error_string, "cannot schedule DNS lookup"); }
mg_destroy_conn(nc); fd_flags = ((ev->events & (EPOLLIN | EPOLLHUP)) ? _MG_F_FD_CAN_READ : 0) |
return NULL; ((ev->events & (EPOLLOUT)) ? _MG_F_FD_CAN_WRITE : 0) |
((ev->events & (EPOLLERR)) ? _MG_F_FD_ERROR : 0);
mg_mgr_handle_conn(nc, fd_flags, now);
epf = (intptr_t) nc->mgr_data;
epf ^= _MG_EPF_NO_POLL;
nc->mgr_data = (void *) epf;
} }
return nc; for (nc = mgr->active_connections; nc != NULL; nc = next) {
#else next = nc->next;
MG_SET_PTRPTR(opts.error_string, "Resolver is disabled"); if (!(((intptr_t) nc->mgr_data) & _MG_EPF_NO_POLL)) {
mg_destroy_conn(nc); mg_mgr_handle_conn(nc, 0, now);
return NULL;
#endif
} else { } else {
/* Address is parsed and resolved to IP. proceed with connect() */ intptr_t epf = (intptr_t) nc->mgr_data;
return mg_finish_connect(nc, proto, &nc->sa, add_sock_opts); epf ^= _MG_EPF_NO_POLL;
nc->mgr_data = (void *) epf;
}
if ((nc->flags & MG_F_CLOSE_IMMEDIATELY) ||
(nc->send_mbuf.len == 0 && (nc->flags & MG_F_SEND_AND_CLOSE))) {
mg_close_conn(nc);
} else {
if (!(nc->flags & MG_F_UDP) || nc->listener == NULL) {
mg_ev_mgr_epoll_ctl(nc, EPOLL_CTL_MOD);
} else {
/* This is a kludge, but... */
if (nc->send_mbuf.len > 0) {
mg_mgr_handle_conn(nc, _MG_F_FD_CAN_WRITE, now);
}
}
}
} }
}
struct mg_connection *mg_bind(struct mg_mgr *srv, const char *address, return now;
mg_event_handler_t event_handler) {
static struct mg_bind_opts opts;
return mg_bind_opt(srv, address, event_handler, opts);
} }
struct mg_connection *mg_bind_opt(struct mg_mgr *mgr, const char *address, #else /* select() */
mg_event_handler_t callback,
struct mg_bind_opts opts) {
union socket_address sa;
struct mg_connection *nc = NULL;
int proto;
sock_t sock;
struct mg_add_sock_opts add_sock_opts;
char host[MG_MAX_HOST_LEN];
MG_COPY_COMMON_CONNECTION_OPTIONS(&add_sock_opts, &opts);
if (mg_parse_address(address, &sa, &proto, host, sizeof(host)) <= 0) {
MG_SET_PTRPTR(opts.error_string, "cannot parse address");
} else if ((sock = mg_open_listening_socket(&sa, proto)) == INVALID_SOCKET) {
DBG(("Failed to open listener: %d", errno));
MG_SET_PTRPTR(opts.error_string, "failed to open listener");
} else if ((nc = mg_add_sock_opt(mgr, sock, callback, add_sock_opts)) ==
NULL) {
/* opts.error_string set by mg_add_sock_opt */
DBG(("Failed to mg_add_sock"));
closesocket(sock);
} else {
nc->sa = sa;
nc->handler = callback;
if (proto == SOCK_DGRAM) { static void mg_ev_mgr_init(struct mg_mgr *mgr) {
nc->flags |= MG_F_UDP; (void) mgr;
} else { DBG(("%p using select()", mgr));
nc->flags |= MG_F_LISTENING; #ifndef MG_DISABLE_SOCKETPAIR
} do {
mg_socketpair(mgr->ctl, SOCK_DGRAM);
} while (mgr->ctl[0] == INVALID_SOCKET);
#endif
}
DBG(("%p sock %d/%d", nc, sock, proto)); static void mg_ev_mgr_free(struct mg_mgr *mgr) {
} (void) mgr;
}
return nc; static void mg_ev_mgr_add_conn(struct mg_connection *nc) {
(void) nc;
} }
struct mg_connection *mg_add_sock(struct mg_mgr *s, sock_t sock, static void mg_ev_mgr_remove_conn(struct mg_connection *nc) {
mg_event_handler_t callback) { (void) nc;
static struct mg_add_sock_opts opts;
return mg_add_sock_opt(s, sock, callback, opts);
} }
struct mg_connection *mg_add_sock_opt(struct mg_mgr *s, sock_t sock, static void mg_add_to_set(sock_t sock, fd_set *set, sock_t *max_fd) {
mg_event_handler_t callback, if (sock != INVALID_SOCKET) {
struct mg_add_sock_opts opts) { FD_SET(sock, set);
struct mg_connection *nc = mg_create_connection(s, callback, opts); if (*max_fd == INVALID_SOCKET || sock > *max_fd) {
if (nc != NULL) { *max_fd = sock;
mg_set_sock(nc, sock); }
} }
return nc;
} }
struct mg_connection *mg_next(struct mg_mgr *s, struct mg_connection *conn) { time_t mg_mgr_poll(struct mg_mgr *mgr, int milli) {
return conn == NULL ? s->active_connections : conn->next; time_t now = time(NULL);
} struct mg_connection *nc, *tmp;
struct timeval tv;
fd_set read_set, write_set, err_set;
sock_t max_fd = INVALID_SOCKET;
int num_fds, num_selected;
void mg_broadcast(struct mg_mgr *mgr, mg_event_handler_t cb, void *data, FD_ZERO(&read_set);
size_t len) { FD_ZERO(&write_set);
struct ctl_msg ctl_msg; FD_ZERO(&err_set);
mg_add_to_set(mgr->ctl[1], &read_set, &max_fd);
/* for (nc = mgr->active_connections, num_fds = 0; nc != NULL; nc = tmp) {
* Mongoose manager has a socketpair, `struct mg_mgr::ctl`, tmp = nc->next;
* where `mg_broadcast()` pushes the message.
* `mg_mgr_poll()` wakes up, reads a message from the socket pair, and calls
* specified callback for each connection. Thus the callback function executes
* in event manager thread.
*/
if (mgr->ctl[0] != INVALID_SOCKET && data != NULL &&
len < sizeof(ctl_msg.message)) {
size_t dummy;
ctl_msg.callback = cb; if (nc->sock == INVALID_SOCKET) {
memcpy(ctl_msg.message, data, len); mg_mgr_handle_conn(nc, 0, now);
dummy = MG_SEND_FUNC(mgr->ctl[0], (char *) &ctl_msg, continue;
offsetof(struct ctl_msg, message) + len, 0);
dummy = MG_RECV_FUNC(mgr->ctl[0], (char *) &len, 1, 0);
(void) dummy;
} }
}
static int isbyte(int n) { num_fds++;
return n >= 0 && n <= 255;
}
static int parse_net(const char *spec, uint32_t *net, uint32_t *mask) { if (!(nc->flags & MG_F_WANT_WRITE) &&
int n, a, b, c, d, slash = 32, len = 0; nc->recv_mbuf.len < nc->recv_mbuf_limit &&
(!(nc->flags & MG_F_UDP) || nc->listener == NULL)) {
mg_add_to_set(nc->sock, &read_set, &max_fd);
}
if ((sscanf(spec, "%d.%d.%d.%d/%d%n", &a, &b, &c, &d, &slash, &n) == 5 || if (((nc->flags & MG_F_CONNECTING) && !(nc->flags & MG_F_WANT_READ)) ||
sscanf(spec, "%d.%d.%d.%d%n", &a, &b, &c, &d, &n) == 4) && (nc->send_mbuf.len > 0 && !(nc->flags & MG_F_CONNECTING) &&
isbyte(a) && isbyte(b) && isbyte(c) && isbyte(d) && slash >= 0 && !(nc->flags & MG_F_DONT_SEND))) {
slash < 33) { mg_add_to_set(nc->sock, &write_set, &max_fd);
len = n; mg_add_to_set(nc->sock, &err_set, &max_fd);
*net = }
((uint32_t) a << 24) | ((uint32_t) b << 16) | ((uint32_t) c << 8) | d;
*mask = slash ? 0xffffffffU << (32 - slash) : 0;
} }
return len; tv.tv_sec = milli / 1000;
} tv.tv_usec = (milli % 1000) * 1000;
int mg_check_ip_acl(const char *acl, uint32_t remote_ip) { num_selected = select((int) max_fd + 1, &read_set, &write_set, &err_set, &tv);
int allowed, flag; now = time(NULL);
uint32_t net, mask; DBG(("select @ %ld num_ev=%d of %d", (long) now, num_selected, num_fds));
struct mg_str vec;
/* If any ACL is set, deny by default */ #ifndef MG_DISABLE_SOCKETPAIR
allowed = (acl == NULL || *acl == '\0') ? '+' : '-'; if (num_selected > 0 && mgr->ctl[1] != INVALID_SOCKET &&
FD_ISSET(mgr->ctl[1], &read_set)) {
mg_mgr_handle_ctl_sock(mgr);
}
#endif
while ((acl = mg_next_comma_list_entry(acl, &vec, NULL)) != NULL) { for (nc = mgr->active_connections; nc != NULL; nc = tmp) {
flag = vec.p[0]; int fd_flags = 0;
if ((flag != '+' && flag != '-') || if (num_selected > 0) {
parse_net(&vec.p[1], &net, &mask) == 0) { fd_flags = (FD_ISSET(nc->sock, &read_set) ? _MG_F_FD_CAN_READ : 0) |
return -1; (FD_ISSET(nc->sock, &write_set) ? _MG_F_FD_CAN_WRITE : 0) |
(FD_ISSET(nc->sock, &err_set) ? _MG_F_FD_ERROR : 0);
}
#ifdef MG_CC3200
// CC3200 does not report UDP sockets as writeable.
if (nc->flags & MG_F_UDP &&
(nc->send_mbuf.len > 0 || nc->flags & MG_F_CONNECTING)) {
fd_flags |= _MG_F_FD_CAN_WRITE;
}
#endif
#ifdef MG_LWIP
/* With LWIP socket emulation layer, we don't get write events */
fd_flags |= _MG_F_FD_CAN_WRITE;
#endif
tmp = nc->next;
mg_mgr_handle_conn(nc, fd_flags, now);
} }
if (net == (remote_ip & mask)) { for (nc = mgr->active_connections; nc != NULL; nc = tmp) {
allowed = flag; tmp = nc->next;
if ((nc->flags & MG_F_CLOSE_IMMEDIATELY) ||
(nc->send_mbuf.len == 0 && (nc->flags & MG_F_SEND_AND_CLOSE))) {
mg_close_conn(nc);
} }
} }
return allowed == '+'; return now;
} }
/* Move data from one connection to another */ #endif
void mg_forward(struct mg_connection *from, struct mg_connection *to) {
mg_send(to, from->recv_mbuf.buf, from->recv_mbuf.len); #ifndef MG_DISABLE_SOCKETPAIR
mbuf_remove(&from->recv_mbuf, from->recv_mbuf.len); int mg_socketpair(sock_t sp[2], int sock_type) {
union socket_address sa;
sock_t sock;
socklen_t len = sizeof(sa.sin);
int ret = 0;
sock = sp[0] = sp[1] = INVALID_SOCKET;
(void) memset(&sa, 0, sizeof(sa));
sa.sin.sin_family = AF_INET;
sa.sin.sin_port = htons(0);
sa.sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
if ((sock = socket(AF_INET, sock_type, 0)) == INVALID_SOCKET) {
} else if (bind(sock, &sa.sa, len) != 0) {
} else if (sock_type == SOCK_STREAM && listen(sock, 1) != 0) {
} else if (getsockname(sock, &sa.sa, &len) != 0) {
} else if ((sp[0] = socket(AF_INET, sock_type, 0)) == INVALID_SOCKET) {
} else if (connect(sp[0], &sa.sa, len) != 0) {
} else if (sock_type == SOCK_DGRAM &&
(getsockname(sp[0], &sa.sa, &len) != 0 ||
connect(sock, &sa.sa, len) != 0)) {
} else if ((sp[1] = (sock_type == SOCK_DGRAM ? sock
: accept(sock, &sa.sa, &len))) ==
INVALID_SOCKET) {
} else {
mg_set_close_on_exec(sp[0]);
mg_set_close_on_exec(sp[1]);
if (sock_type == SOCK_STREAM) closesocket(sock);
ret = 1;
}
if (!ret) {
if (sp[0] != INVALID_SOCKET) closesocket(sp[0]);
if (sp[1] != INVALID_SOCKET) closesocket(sp[1]);
if (sock != INVALID_SOCKET) closesocket(sock);
sock = sp[0] = sp[1] = INVALID_SOCKET;
}
return ret;
} }
#endif /* MG_DISABLE_SOCKETPAIR */
#ifdef NS_MODULE_LINES #ifdef NS_MODULE_LINES
#line 1 "src/multithreading.c" #line 1 "src/multithreading.c"
/**/ /**/
...@@ -7293,7 +7443,7 @@ struct mg_dns_reply mg_dns_create_reply(struct mbuf *io, ...@@ -7293,7 +7443,7 @@ struct mg_dns_reply mg_dns_create_reply(struct mbuf *io,
return rep; return rep;
} }
int mg_dns_send_reply(struct mg_connection *nc, struct mg_dns_reply *r) { void mg_dns_send_reply(struct mg_connection *nc, struct mg_dns_reply *r) {
size_t sent = r->io->len - r->start; size_t sent = r->io->len - r->start;
mg_dns_insert_header(r->io, r->start, r->msg); mg_dns_insert_header(r->io, r->start, r->msg);
if (!(nc->flags & MG_F_UDP)) { if (!(nc->flags & MG_F_UDP)) {
...@@ -7301,11 +7451,10 @@ int mg_dns_send_reply(struct mg_connection *nc, struct mg_dns_reply *r) { ...@@ -7301,11 +7451,10 @@ int mg_dns_send_reply(struct mg_connection *nc, struct mg_dns_reply *r) {
mbuf_insert(r->io, r->start, &len, 2); mbuf_insert(r->io, r->start, &len, 2);
} }
if (&nc->send_mbuf != r->io || nc->flags & MG_F_UDP) { if (&nc->send_mbuf != r->io) {
sent = mg_send(nc, r->io->buf + r->start, r->io->len - r->start); mg_send(nc, r->io->buf + r->start, r->io->len - r->start);
r->io->len = r->start; r->io->len = r->start;
} }
return sent;
} }
int mg_dns_reply_record(struct mg_dns_reply *reply, int mg_dns_reply_record(struct mg_dns_reply *reply,
...@@ -8098,7 +8247,6 @@ uint32_t mg_coap_compose(struct mg_coap_message *cm, struct mbuf *io) { ...@@ -8098,7 +8247,6 @@ uint32_t mg_coap_compose(struct mg_coap_message *cm, struct mbuf *io) {
uint32_t mg_coap_send_message(struct mg_connection *nc, uint32_t mg_coap_send_message(struct mg_connection *nc,
struct mg_coap_message *cm) { struct mg_coap_message *cm) {
struct mbuf packet_out; struct mbuf packet_out;
int send_res;
uint32_t compose_res; uint32_t compose_res;
mbuf_init(&packet_out, 0); mbuf_init(&packet_out, 0);
...@@ -8107,17 +8255,9 @@ uint32_t mg_coap_send_message(struct mg_connection *nc, ...@@ -8107,17 +8255,9 @@ uint32_t mg_coap_send_message(struct mg_connection *nc,
return compose_res; /* LCOV_EXCL_LINE */ return compose_res; /* LCOV_EXCL_LINE */
} }
send_res = mg_send(nc, packet_out.buf, (int) packet_out.len); mg_send(nc, packet_out.buf, (int) packet_out.len);
mbuf_free(&packet_out); mbuf_free(&packet_out);
if (send_res == 0) {
/*
* in case of UDP mg_send tries to send immediately
* and could return an error.
*/
return MG_COAP_NETWORK_ERROR; /* LCOV_EXCL_LINE */
}
return 0; return 0;
} }
......
...@@ -673,6 +673,7 @@ struct mg_connection { ...@@ -673,6 +673,7 @@ struct mg_connection {
struct mg_mgr *mgr; /* Pointer to containing manager */ struct mg_mgr *mgr; /* Pointer to containing manager */
sock_t sock; /* Socket to the remote peer */ sock_t sock; /* Socket to the remote peer */
int err;
union socket_address sa; /* Remote peer address */ union socket_address sa; /* Remote peer address */
size_t recv_mbuf_limit; /* Max size of recv buffer */ size_t recv_mbuf_limit; /* Max size of recv buffer */
struct mbuf recv_mbuf; /* Received data */ struct mbuf recv_mbuf; /* Received data */
...@@ -923,13 +924,11 @@ const char *mg_set_ssl(struct mg_connection *nc, const char *cert, ...@@ -923,13 +924,11 @@ const char *mg_set_ssl(struct mg_connection *nc, const char *cert,
/* /*
* Send data to the connection. * Send data to the connection.
* *
* Return number of written bytes. Note that sending * Note that sending functions do not actually push data to the socket.
* functions do not actually push data to the socket. They just append data * They just append data to the output buffer. MG_EV_SEND will be delivered when
* to the output buffer. The exception is UDP connections. For UDP, data is * the data has actually been pushed out.
* sent immediately, and returned value indicates an actual number of bytes
* sent to the socket.
*/ */
int mg_send(struct mg_connection *, const void *buf, int len); void mg_send(struct mg_connection *, const void *buf, int len);
/* Enables format string warnings for mg_printf */ /* Enables format string warnings for mg_printf */
#if defined(__GNUC__) #if defined(__GNUC__)
...@@ -1013,6 +1012,57 @@ enum v7_err mg_enable_javascript(struct mg_mgr *m, struct v7 *v7, ...@@ -1013,6 +1012,57 @@ enum v7_err mg_enable_javascript(struct mg_mgr *m, struct v7 *v7,
#endif /* __cplusplus */ #endif /* __cplusplus */
#endif /* MG_NET_HEADER_INCLUDED */ #endif /* MG_NET_HEADER_INCLUDED */
#ifndef MG_NET_IF_HEADER_INCLUDED
#define MG_NET_IF_HEADER_INCLUDED
/*
* Internal async networking core interface.
* Consists of calls made by the core, which should not block,
* and callbacks back into the core ("..._cb").
* Callbacks may (will) cause methods to be invoked from within,
* but methods are not allowed to invoke callbacks inline.
*
* Implementation must ensure that only one callback is invoked at any time.
*/
/* Request that a TCP connection is made to the specified address. */
void mg_if_connect_tcp(struct mg_connection *nc,
const union socket_address *sa);
/* Open a UDP socket. Doesn't actually connect anything. */
void mg_if_connect_udp(struct mg_connection *nc);
/* Callback invoked by connect methods. err = 0 -> ok, != 0 -> error. */
void mg_if_connect_cb(struct mg_connection *nc, int err);
/* Set up a listening TCP socket on a given address. rv = 0 -> ok. */
int mg_if_listen_tcp(struct mg_connection *nc, union socket_address *sa);
/* Deliver a new TCP connection. */
void mg_if_accept_tcp_cb(struct mg_connection *lc, sock_t sock,
union socket_address *sa, size_t sa_len);
/* Request that a "listening" UDP socket be created. */
int mg_if_listen_udp(struct mg_connection *nc, union socket_address *sa);
/* Send functions for TCP and UDP. Sent data is copied before return. */
void mg_if_tcp_send(struct mg_connection *nc, const void *buf, size_t len);
void mg_if_udp_send(struct mg_connection *nc, const void *buf, size_t len);
/* Callback that reports that data has been put on the wire. */
void mg_if_sent_cb(struct mg_connection *nc, int num_sent);
/*
* Receive callback.
* buf must be heap-allocated and ownership is transferred to the core.
* Core will acknowledge consumption by calling mg_if_recved.
* No more than one chunk of data can be unacknowledged at any time.
*/
void mg_if_recv_tcp_cb(struct mg_connection *nc, void *buf, int len);
void mg_if_recv_udp_cb(struct mg_connection *nc, void *buf, int len,
union socket_address *sa, size_t sa_len);
void mg_if_recved(struct mg_connection *nc, size_t len);
/* Perform interface-related cleanup on connection before destruction. */
void mg_if_destroy_conn(struct mg_connection *nc);
#endif /* MG_NET_IF_HEADER_INCLUDED */
/* /*
* Copyright (c) 2014 Cesanta Software Limited * Copyright (c) 2014 Cesanta Software Limited
* All rights reserved * All rights reserved
...@@ -2344,7 +2394,7 @@ int mg_dns_reply_record(struct mg_dns_reply *, struct mg_dns_resource_record *, ...@@ -2344,7 +2394,7 @@ int mg_dns_reply_record(struct mg_dns_reply *, struct mg_dns_resource_record *,
* Once sent, the IO buffer will be trimmed unless the reply IO buffer * Once sent, the IO buffer will be trimmed unless the reply IO buffer
* is the connection's send buffer and the connection is not in UDP mode. * is the connection's send buffer and the connection is not in UDP mode.
*/ */
int mg_dns_send_reply(struct mg_connection *, struct mg_dns_reply *); void mg_dns_send_reply(struct mg_connection *, struct mg_dns_reply *);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment