Commit ab48174b authored by Deomid Ryabkov's avatar Deomid Ryabkov Committed by Cesanta Bot

esp_mg_net_if refactoring: split LWIP & ESP parts

esp_mg_net_if was a rat's nest of mongoose net_if and event manager
implementaions and the event loop task for the ESP8266 miot port.
From that, pieces that are related to LWIP support have been pulled out,
separated into net_if and event manager files, and event loop task
has been moved under the miot esp8266 dirrectory.

This is done to facilitate LWIP code reuse. This may not be the end of
it, but it's a start.

Note: custom retransmit logic has been removed for now. Can be
reintroduced later is necessary.

PUBLISHED_FROM=fd5bbf75714583ce95776d4c76b6c5b5dc535364
parent 4439251f
......@@ -119,6 +119,10 @@ MG_INTERNAL int parse_mqtt(struct mbuf *io, struct mg_mqtt_message *mm);
extern void *(*test_malloc)(size_t size);
extern void *(*test_calloc)(size_t count, size_t size);
#ifndef MIN
#define MIN(a, b) ((a) < (b) ? (a) : (b))
#endif
#endif /* CS_MONGOOSE_SRC_INTERNAL_H_ */
#ifdef MG_MODULE_LINES
#line 1 "common/cs_dbg.h"
......@@ -11140,3 +11144,850 @@ void sl_restart_cb(struct mg_mgr *mgr) {
}
#endif /* !defined(MG_DISABLE_SOCKET_IF) && defined(MG_SOCKET_SIMPLELINK) */
#ifdef MG_MODULE_LINES
#line 1 "common/platforms/lwip/mg_lwip_net_if.h"
#endif
/*
* Copyright (c) 2014-2016 Cesanta Software Limited
* All rights reserved
*/
#ifndef CS_COMMON_PLATFORMS_LWIP_MG_NET_IF_LWIP_H_
#define CS_COMMON_PLATFORMS_LWIP_MG_NET_IF_LWIP_H_
#ifdef MG_NET_IF_LWIP
#include <inttypes.h>
struct mg_lwip_conn_state {
union {
struct tcp_pcb *tcp;
struct udp_pcb *udp;
} pcb;
err_t err;
size_t num_sent; /* Number of acknowledged bytes to be reported to the core */
struct pbuf *rx_chain; /* Chain of incoming data segments. */
size_t rx_offset; /* Offset within the first pbuf (if partially consumed) */
/* Last SSL write size, for retries. */
int last_ssl_write_size;
};
enum mg_sig_type {
MG_SIG_CONNECT_RESULT = 1, /* struct mg_connection* */
MG_SIG_SENT_CB = 2, /* struct mg_connection* */
MG_SIG_CLOSE_CONN = 3, /* struct mg_connection* */
MG_SIG_TOMBSTONE = 4,
};
void mg_lwip_post_signal(enum mg_sig_type sig, struct mg_connection *nc);
/* To be implemented by the platform. */
void mg_lwip_mgr_schedule_poll(struct mg_mgr *mgr);
#endif /* MG_NET_IF_LWIP */
#endif /* CS_COMMON_PLATFORMS_LWIP_MG_NET_IF_LWIP_H_ */
#ifdef MG_MODULE_LINES
#line 1 "common/platforms/lwip/mg_lwip_net_if.c"
#endif
/*
* Copyright (c) 2014-2016 Cesanta Software Limited
* All rights reserved
*/
#ifdef MG_NET_IF_LWIP
#include <lwip/pbuf.h>
#include <lwip/tcp.h>
#include <lwip/tcp_impl.h>
#include <lwip/udp.h>
/* Amalgamated: #include "common/cs_dbg.h" */
void mg_lwip_ssl_do_hs(struct mg_connection *nc);
void mg_lwip_ssl_send(struct mg_connection *nc);
void mg_lwip_ssl_recv(struct mg_connection *nc);
void mg_lwip_set_keepalive_params(struct mg_connection *nc, int idle,
int interval, int count) {
if (nc->sock == INVALID_SOCKET || nc->flags & MG_F_UDP) {
return;
}
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
struct tcp_pcb *tpcb = cs->pcb.tcp;
if (idle > 0 && interval > 0 && count > 0) {
tpcb->keep_idle = idle * 1000;
tpcb->keep_intvl = interval * 1000;
tpcb->keep_cnt = count;
tpcb->so_options |= SOF_KEEPALIVE;
} else {
tpcb->so_options &= ~SOF_KEEPALIVE;
}
}
static err_t mg_lwip_tcp_conn_cb(void *arg, struct tcp_pcb *tpcb, err_t err) {
struct mg_connection *nc = (struct mg_connection *) arg;
DBG(("%p connect to %s:%u = %d", nc, ipaddr_ntoa(&tpcb->remote_ip),
tpcb->remote_port, err));
if (nc == NULL) {
tcp_abort(tpcb);
return ERR_ARG;
}
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
cs->err = err;
if (err == 0) mg_lwip_set_keepalive_params(nc, 60, 10, 6);
#ifdef SSL_KRYPTON
if (err == 0 && nc->ssl != NULL) {
SSL_set_fd(nc->ssl, (intptr_t) nc);
mg_lwip_ssl_do_hs(nc);
} else
#endif
{
mg_lwip_post_signal(MG_SIG_CONNECT_RESULT, nc);
}
return ERR_OK;
}
static void mg_lwip_tcp_error_cb(void *arg, err_t err) {
struct mg_connection *nc = (struct mg_connection *) arg;
DBG(("%p conn error %d", nc, err));
if (nc == NULL) return;
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
cs->pcb.tcp = NULL; /* Has already been deallocated */
if (nc->flags & MG_F_CONNECTING) {
cs->err = err;
mg_lwip_post_signal(MG_SIG_CONNECT_RESULT, nc);
} else {
mg_lwip_post_signal(MG_SIG_CLOSE_CONN, nc);
}
}
static err_t mg_lwip_tcp_recv_cb(void *arg, struct tcp_pcb *tpcb,
struct pbuf *p, err_t err) {
struct mg_connection *nc = (struct mg_connection *) arg;
DBG(("%p %p %u %d", nc, tpcb, (p != NULL ? p->tot_len : 0), err));
if (p == NULL) {
if (nc != NULL) {
mg_lwip_post_signal(MG_SIG_CLOSE_CONN, nc);
} else {
/* Tombstoned connection, do nothing. */
}
return ERR_OK;
} else if (nc == NULL) {
tcp_abort(tpcb);
return ERR_ARG;
}
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
/*
* If we get a chain of more than one segment at once, we need to bump
* refcount on the subsequent bufs to make them independent.
*/
if (p->next != NULL) {
struct pbuf *q = p->next;
for (; q != NULL; q = q->next) pbuf_ref(q);
}
if (cs->rx_chain == NULL) {
cs->rx_chain = p;
cs->rx_offset = 0;
} else {
if (pbuf_clen(cs->rx_chain) >= 4) {
/* ESP SDK has a limited pool of 5 pbufs. We must not hog them all or RX
* will be completely blocked. We already have at least 4 in the chain,
* this one is, so we have to make a copy and release this one. */
struct pbuf *np = pbuf_alloc(PBUF_RAW, p->tot_len, PBUF_RAM);
if (np != NULL) {
pbuf_copy(np, p);
pbuf_free(p);
p = np;
}
}
pbuf_chain(cs->rx_chain, p);
}
#ifdef SSL_KRYPTON
if (nc->ssl != NULL) {
if (nc->flags & MG_F_SSL_HANDSHAKE_DONE) {
mg_lwip_ssl_recv(nc);
} else {
mg_lwip_ssl_do_hs(nc);
}
return ERR_OK;
}
#endif
while (cs->rx_chain != NULL) {
struct pbuf *seg = cs->rx_chain;
size_t len = (seg->len - cs->rx_offset);
char *data = (char *) malloc(len);
if (data == NULL) {
DBG(("OOM"));
return ERR_MEM;
}
pbuf_copy_partial(seg, data, len, cs->rx_offset);
mg_if_recv_tcp_cb(nc, data, len); /* callee takes over data */
cs->rx_offset += len;
if (cs->rx_offset == cs->rx_chain->len) {
cs->rx_chain = pbuf_dechain(cs->rx_chain);
pbuf_free(seg);
cs->rx_offset = 0;
}
}
if (nc->send_mbuf.len > 0) {
mg_lwip_mgr_schedule_poll(nc->mgr);
}
return ERR_OK;
}
static err_t mg_lwip_tcp_sent_cb(void *arg, struct tcp_pcb *tpcb,
u16_t num_sent) {
struct mg_connection *nc = (struct mg_connection *) arg;
DBG(("%p %p %u", nc, tpcb, num_sent));
if (nc == NULL) {
tcp_abort(tpcb);
return ERR_ABRT;
}
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
cs->num_sent += num_sent;
mg_lwip_post_signal(MG_SIG_SENT_CB, nc);
return ERR_OK;
}
void mg_if_connect_tcp(struct mg_connection *nc,
const union socket_address *sa) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
struct tcp_pcb *tpcb = tcp_new();
cs->pcb.tcp = tpcb;
ip_addr_t *ip = (ip_addr_t *) &sa->sin.sin_addr.s_addr;
u16_t port = ntohs(sa->sin.sin_port);
tcp_arg(tpcb, nc);
tcp_err(tpcb, mg_lwip_tcp_error_cb);
tcp_sent(tpcb, mg_lwip_tcp_sent_cb);
tcp_recv(tpcb, mg_lwip_tcp_recv_cb);
cs->err = tcp_bind(tpcb, IP_ADDR_ANY, 0 /* any port */);
DBG(("%p tcp_bind = %d", nc, cs->err));
if (cs->err != ERR_OK) {
mg_lwip_post_signal(MG_SIG_CONNECT_RESULT, nc);
return;
}
cs->err = tcp_connect(tpcb, ip, port, mg_lwip_tcp_conn_cb);
DBG(("%p tcp_connect %p = %d", nc, tpcb, cs->err));
if (cs->err != ERR_OK) {
mg_lwip_post_signal(MG_SIG_CONNECT_RESULT, nc);
return;
}
}
static void mg_lwip_udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p,
ip_addr_t *addr, u16_t port) {
struct mg_connection *nc = (struct mg_connection *) arg;
size_t len = p->len;
char *data = (char *) malloc(len);
union socket_address sa;
(void) pcb;
DBG(("%p %s:%u %u", nc, ipaddr_ntoa(addr), port, p->len));
if (data == NULL) {
DBG(("OOM"));
pbuf_free(p);
return;
}
sa.sin.sin_addr.s_addr = addr->addr;
sa.sin.sin_port = htons(port);
pbuf_copy_partial(p, data, len, 0);
pbuf_free(p);
mg_if_recv_udp_cb(nc, data, len, &sa, sizeof(sa.sin));
}
void mg_if_connect_udp(struct mg_connection *nc) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
struct udp_pcb *upcb = udp_new();
cs->err = udp_bind(upcb, IP_ADDR_ANY, 0 /* any port */);
DBG(("%p udp_bind %p = %d", nc, upcb, cs->err));
if (cs->err == ERR_OK) {
udp_recv(upcb, mg_lwip_udp_recv_cb, nc);
cs->pcb.udp = upcb;
} else {
udp_remove(upcb);
}
mg_lwip_post_signal(MG_SIG_CONNECT_RESULT, nc);
}
void mg_lwip_accept_conn(struct mg_connection *nc, struct tcp_pcb *tpcb) {
union socket_address sa;
sa.sin.sin_addr.s_addr = tpcb->remote_ip.addr;
sa.sin.sin_port = htons(tpcb->remote_port);
mg_if_accept_tcp_cb(nc, &sa, sizeof(sa.sin));
}
static err_t mg_lwip_accept_cb(void *arg, struct tcp_pcb *newtpcb, err_t err) {
struct mg_connection *lc = (struct mg_connection *) arg;
(void) err;
DBG(("%p conn %p from %s:%u", lc, newtpcb, ipaddr_ntoa(&newtpcb->remote_ip),
newtpcb->remote_port));
struct mg_connection *nc = mg_if_accept_new_conn(lc);
if (nc == NULL) {
tcp_abort(newtpcb);
return ERR_ABRT;
}
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
cs->pcb.tcp = newtpcb;
tcp_arg(newtpcb, nc);
tcp_err(newtpcb, mg_lwip_tcp_error_cb);
tcp_sent(newtpcb, mg_lwip_tcp_sent_cb);
tcp_recv(newtpcb, mg_lwip_tcp_recv_cb);
mg_lwip_set_keepalive_params(nc, 60, 10, 6);
#ifdef SSL_KRYPTON
if (lc->ssl_ctx != NULL) {
nc->ssl = SSL_new(lc->ssl_ctx);
if (nc->ssl == NULL || SSL_set_fd(nc->ssl, (intptr_t) nc) != 1) {
LOG(LL_ERROR, ("SSL error"));
tcp_close(newtpcb);
}
} else
#endif
{
mg_lwip_accept_conn(nc, newtpcb);
}
return ERR_OK;
}
int mg_if_listen_tcp(struct mg_connection *nc, union socket_address *sa) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
struct tcp_pcb *tpcb = tcp_new();
ip_addr_t *ip = (ip_addr_t *) &sa->sin.sin_addr.s_addr;
u16_t port = ntohs(sa->sin.sin_port);
cs->err = tcp_bind(tpcb, ip, port);
DBG(("%p tcp_bind(%s:%u) = %d", nc, ipaddr_ntoa(ip), port, cs->err));
if (cs->err != ERR_OK) {
tcp_close(tpcb);
return -1;
}
tcp_arg(tpcb, nc);
tpcb = tcp_listen(tpcb);
cs->pcb.tcp = tpcb;
tcp_accept(tpcb, mg_lwip_accept_cb);
return 0;
}
int mg_if_listen_udp(struct mg_connection *nc, union socket_address *sa) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
struct udp_pcb *upcb = udp_new();
ip_addr_t *ip = (ip_addr_t *) &sa->sin.sin_addr.s_addr;
u16_t port = ntohs(sa->sin.sin_port);
cs->err = udp_bind(upcb, ip, port);
DBG(("%p udb_bind(%s:%u) = %d", nc, ipaddr_ntoa(ip), port, cs->err));
if (cs->err != ERR_OK) {
udp_remove(upcb);
return -1;
}
udp_recv(upcb, mg_lwip_udp_recv_cb, nc);
cs->pcb.udp = upcb;
return 0;
}
int mg_lwip_tcp_write(struct mg_connection *nc, const void *data,
uint16_t len) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
struct tcp_pcb *tpcb = cs->pcb.tcp;
len = MIN(tpcb->mss, MIN(len, tpcb->snd_buf));
if (len == 0) {
DBG(("%p no buf avail %u %u %u %p %p", tpcb, tpcb->acked, tpcb->snd_buf,
tpcb->snd_queuelen, tpcb->unsent, tpcb->unacked));
tcp_output(tpcb);
return 0;
}
err_t err = tcp_write(tpcb, data, len, TCP_WRITE_FLAG_COPY);
tcp_output(tpcb);
DBG(("%p tcp_write %u = %d", tpcb, len, err));
if (err != ERR_OK) {
/*
* We ignore ERR_MEM because memory will be freed up when the data is sent
* and we'll retry.
*/
return (err == ERR_MEM ? 0 : -1);
}
return len;
}
static void mg_lwip_send_more(struct mg_connection *nc) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
if (nc->sock == INVALID_SOCKET || cs->pcb.tcp == NULL) {
DBG(("%p invalid socket", nc));
return;
}
int num_written = mg_lwip_tcp_write(nc, nc->send_mbuf.buf, nc->send_mbuf.len);
DBG(("%p mg_lwip_tcp_write %u = %d", nc, nc->send_mbuf.len, num_written));
if (num_written == 0) return;
if (num_written < 0) {
mg_lwip_post_signal(MG_SIG_CLOSE_CONN, nc);
}
mbuf_remove(&nc->send_mbuf, num_written);
mbuf_trim(&nc->send_mbuf);
}
void mg_if_tcp_send(struct mg_connection *nc, const void *buf, size_t len) {
mbuf_append(&nc->send_mbuf, buf, len);
mg_lwip_mgr_schedule_poll(nc->mgr);
}
void mg_if_udp_send(struct mg_connection *nc, const void *buf, size_t len) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
if (nc->sock == INVALID_SOCKET || cs->pcb.udp == NULL) {
/*
* In case of UDP, this usually means, what
* async DNS resolve is still in progress and connection
* is not ready yet
*/
DBG(("%p socket is not connected", nc));
return;
}
struct udp_pcb *upcb = cs->pcb.udp;
struct pbuf *p = pbuf_alloc(PBUF_TRANSPORT, len, PBUF_RAM);
ip_addr_t *ip = (ip_addr_t *) &nc->sa.sin.sin_addr.s_addr;
u16_t port = ntohs(nc->sa.sin.sin_port);
memcpy(p->payload, buf, len);
cs->err = udp_sendto(upcb, p, (ip_addr_t *) ip, port);
DBG(("%p udp_sendto = %d", nc, cs->err));
pbuf_free(p);
if (cs->err != ERR_OK) {
mg_lwip_post_signal(MG_SIG_CLOSE_CONN, nc);
} else {
cs->num_sent += len;
mg_lwip_post_signal(MG_SIG_SENT_CB, nc);
}
}
void mg_if_recved(struct mg_connection *nc, size_t len) {
if (nc->flags & MG_F_UDP) return;
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
if (nc->sock == INVALID_SOCKET || cs->pcb.tcp == NULL) {
DBG(("%p invalid socket", nc));
return;
}
DBG(("%p %p %u", nc, cs->pcb.tcp, len));
/* Currently SSL acknowledges data immediately.
* TODO(rojer): Find a way to propagate mg_if_recved. */
if (nc->ssl == NULL) {
tcp_recved(cs->pcb.tcp, len);
}
mbuf_trim(&nc->recv_mbuf);
}
int mg_if_create_conn(struct mg_connection *nc) {
struct mg_lwip_conn_state *cs =
(struct mg_lwip_conn_state *) calloc(1, sizeof(*cs));
if (cs == NULL) return 0;
nc->sock = (intptr_t) cs;
return 1;
}
void mg_if_destroy_conn(struct mg_connection *nc) {
if (nc->sock == INVALID_SOCKET) return;
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
if (!(nc->flags & MG_F_UDP)) {
struct tcp_pcb *tpcb = cs->pcb.tcp;
if (tpcb != NULL) {
tcp_arg(tpcb, NULL);
DBG(("%p tcp_close %p", nc, tpcb));
tcp_arg(tpcb, NULL);
tcp_close(tpcb);
}
while (cs->rx_chain != NULL) {
struct pbuf *seg = cs->rx_chain;
cs->rx_chain = pbuf_dechain(cs->rx_chain);
pbuf_free(seg);
}
memset(cs, 0, sizeof(*cs));
free(cs);
} else if (nc->listener == NULL) {
/* Only close outgoing UDP pcb or listeners. */
struct udp_pcb *upcb = cs->pcb.udp;
if (upcb != NULL) {
DBG(("%p udp_remove %p", nc, upcb));
udp_remove(upcb);
}
memset(cs, 0, sizeof(*cs));
free(cs);
}
nc->sock = INVALID_SOCKET;
}
void mg_if_get_conn_addr(struct mg_connection *nc, int remote,
union socket_address *sa) {
memset(sa, 0, sizeof(*sa));
if (nc->sock == INVALID_SOCKET) return;
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
if (nc->flags & MG_F_UDP) {
struct udp_pcb *upcb = cs->pcb.udp;
if (remote) {
memcpy(sa, &nc->sa, sizeof(*sa));
} else {
sa->sin.sin_port = htons(upcb->local_port);
sa->sin.sin_addr.s_addr = upcb->local_ip.addr;
}
} else {
struct tcp_pcb *tpcb = cs->pcb.tcp;
if (remote) {
sa->sin.sin_port = htons(tpcb->remote_port);
sa->sin.sin_addr.s_addr = tpcb->remote_ip.addr;
} else {
sa->sin.sin_port = htons(tpcb->local_port);
sa->sin.sin_addr.s_addr = tpcb->local_ip.addr;
}
}
}
void mg_sock_set(struct mg_connection *nc, sock_t sock) {
nc->sock = sock;
}
#endif /* MG_NET_IF_LWIP */
#ifdef MG_MODULE_LINES
#line 1 "common/platforms/lwip/mg_lwip_ev_mgr.c"
#endif
/*
* Copyright (c) 2014-2016 Cesanta Software Limited
* All rights reserved
*/
#ifdef MG_NET_IF_LWIP
#ifndef MG_SIG_QUEUE_LEN
#define MG_SIG_QUEUE_LEN 16
#endif
struct mg_ev_mgr_lwip_signal {
int sig;
struct mg_connection *nc;
};
struct mg_ev_mgr_lwip_data {
struct mg_ev_mgr_lwip_signal sig_queue[MG_SIG_QUEUE_LEN];
int sig_queue_len;
int start_index;
};
void mg_lwip_post_signal(enum mg_sig_type sig, struct mg_connection *nc) {
struct mg_ev_mgr_lwip_data *md =
(struct mg_ev_mgr_lwip_data *) nc->mgr->mgr_data;
if (md->sig_queue_len >= MG_SIG_QUEUE_LEN) return;
int end_index = (md->start_index + md->sig_queue_len) % MG_SIG_QUEUE_LEN;
md->sig_queue[end_index].sig = sig;
md->sig_queue[end_index].nc = nc;
md->sig_queue_len++;
}
void mg_ev_mgr_lwip_process_signals(struct mg_mgr *mgr) {
struct mg_ev_mgr_lwip_data *md = (struct mg_ev_mgr_lwip_data *) mgr->mgr_data;
while (md->sig_queue_len > 0) {
struct mg_connection *nc = md->sig_queue[md->start_index].nc;
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
switch (md->sig_queue[md->start_index].sig) {
case MG_SIG_CONNECT_RESULT: {
mg_if_connect_cb(nc, cs->err);
break;
}
case MG_SIG_CLOSE_CONN: {
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
mg_close_conn(nc);
break;
}
case MG_SIG_SENT_CB: {
if (cs->num_sent > 0) mg_if_sent_cb(nc, cs->num_sent);
cs->num_sent = 0;
break;
}
case MG_SIG_TOMBSTONE: {
break;
}
}
md->start_index = (md->start_index + 1) % MG_SIG_QUEUE_LEN;
md->sig_queue_len--;
}
}
void mg_ev_mgr_init(struct mg_mgr *mgr) {
LOG(LL_INFO, ("%p Mongoose init"));
mgr->mgr_data = MG_CALLOC(1, sizeof(struct mg_ev_mgr_lwip_data));
}
void mg_ev_mgr_free(struct mg_mgr *mgr) {
MG_FREE(mgr->mgr_data);
mgr->mgr_data = NULL;
}
void mg_ev_mgr_add_conn(struct mg_connection *nc) {
(void) nc;
}
void mg_ev_mgr_remove_conn(struct mg_connection *nc) {
struct mg_ev_mgr_lwip_data *md =
(struct mg_ev_mgr_lwip_data *) nc->mgr->mgr_data;
/* Walk the queue and null-out further signals for this conn. */
for (int i = 0; i < MG_SIG_QUEUE_LEN; i++) {
if (md->sig_queue[i].nc == nc) {
md->sig_queue[i].sig = MG_SIG_TOMBSTONE;
}
}
}
time_t mg_mgr_poll(struct mg_mgr *mgr, int timeout_ms) {
int n = 0;
double now = mg_time();
struct mg_connection *nc, *tmp;
double min_timer = 0;
int num_timers = 0;
DBG(("begin poll @%u", (unsigned int) (now * 1000)));
mg_ev_mgr_lwip_process_signals(mgr);
for (nc = mgr->active_connections; nc != NULL; nc = tmp) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
(void) cs;
tmp = nc->next;
n++;
if (nc->flags & MG_F_CLOSE_IMMEDIATELY) {
mg_close_conn(nc);
continue;
}
mg_if_poll(nc, now);
mg_if_timer(nc, now);
if (nc->send_mbuf.len == 0 && (nc->flags & MG_F_SEND_AND_CLOSE) &&
!(nc->flags & MG_F_WANT_WRITE)) {
mg_close_conn(nc);
continue;
}
#ifdef SSL_KRYPTON
if (nc->ssl != NULL && cs != NULL && cs->pcb.tcp != NULL &&
cs->pcb.tcp->state == ESTABLISHED) {
if (((nc->flags & MG_F_WANT_WRITE) || nc->send_mbuf.len > 0) &&
cs->pcb.tcp->snd_buf > 0) {
/* Can write more. */
if (nc->flags & MG_F_SSL_HANDSHAKE_DONE) {
if (!(nc->flags & MG_F_CONNECTING)) mg_lwip_ssl_send(nc);
} else {
mg_lwip_ssl_do_hs(nc);
}
}
if (cs->rx_chain != NULL || (nc->flags & MG_F_WANT_READ)) {
if (nc->flags & MG_F_SSL_HANDSHAKE_DONE) {
if (!(nc->flags & MG_F_CONNECTING)) mg_lwip_ssl_recv(nc);
} else {
mg_lwip_ssl_do_hs(nc);
}
}
} else
#endif /* SSL_KRYPTON */
{
if (!(nc->flags & (MG_F_CONNECTING | MG_F_UDP))) {
if (nc->send_mbuf.len > 0) mg_lwip_send_more(nc);
}
}
if (nc->ev_timer_time > 0) {
if (num_timers == 0 || nc->ev_timer_time < min_timer) {
min_timer = nc->ev_timer_time;
}
num_timers++;
}
}
DBG(("end poll @%u, %d conns, %d timers (min %u), next in %d ms",
(unsigned int) (now * 1000), n, num_timers,
(unsigned int) (min_timer * 1000), timeout_ms));
return now;
}
uint32_t mg_lwip_get_poll_delay_ms(struct mg_mgr *mgr) {
struct mg_connection *nc;
double now = mg_time();
double min_timer = 0;
int num_timers = 0;
mg_ev_mgr_lwip_process_signals(mgr);
for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) {
if (nc->ev_timer_time > 0) {
if (num_timers == 0 || nc->ev_timer_time < min_timer) {
min_timer = nc->ev_timer_time;
}
num_timers++;
}
}
uint32_t timeout_ms = ~0;
if (num_timers > 0) {
double timer_timeout_ms = (min_timer - now) * 1000 + 1 /* rounding */;
if (timer_timeout_ms < timeout_ms) {
timeout_ms = timer_timeout_ms;
}
}
return timeout_ms;
}
#endif /* MG_NET_IF_LWIP */
#ifdef MG_MODULE_LINES
#line 1 "common/platforms/lwip/mg_lwip_ssl_krypton.c"
#endif
/*
* Copyright (c) 2014-2016 Cesanta Software Limited
* All rights reserved
*/
#if defined(MG_NET_IF_LWIP) && defined(SSL_KRYPTON)
/* Amalgamated: #include "common/cs_dbg.h" */
#include <lwip/pbuf.h>
#include <lwip/tcp.h>
#ifndef MG_LWIP_SSL_IO_SIZE
#define MG_LWIP_SSL_IO_SIZE 1024
#endif
/*
* Stop processing incoming SSL traffic when recv_mbuf.size is this big.
* It'a a uick solution for SSL recv pushback.
*/
#ifndef MG_LWIP_SSL_RECV_MBUF_LIMIT
#define MG_LWIP_SSL_RECV_MBUF_LIMIT 3072
#endif
#define MIN(a, b) ((a) < (b) ? (a) : (b))
void mg_lwip_ssl_do_hs(struct mg_connection *nc) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
int server_side = (nc->listener != NULL);
int ret = server_side ? SSL_accept(nc->ssl) : SSL_connect(nc->ssl);
int err = SSL_get_error(nc->ssl, ret);
DBG(("%s %d %d", (server_side ? "SSL_accept" : "SSL_connect"), ret, err));
if (ret <= 0) {
if (err == SSL_ERROR_WANT_WRITE) {
nc->flags |= MG_F_WANT_WRITE;
cs->err = 0;
} else if (err == SSL_ERROR_WANT_READ) {
/* Nothing, we are callback-driven. */
cs->err = 0;
} else {
cs->err = err;
LOG(LL_ERROR, ("SSL handshake error: %d", cs->err));
if (server_side) {
mg_lwip_post_signal(MG_SIG_CLOSE_CONN, nc);
} else {
mg_lwip_post_signal(MG_SIG_CONNECT_RESULT, nc);
}
}
} else {
cs->err = 0;
nc->flags &= ~MG_F_WANT_WRITE;
/*
* Handshake is done. Schedule a read immediately to consume app data
* which may already be waiting.
*/
nc->flags |= (MG_F_SSL_HANDSHAKE_DONE | MG_F_WANT_READ);
if (server_side) {
mg_lwip_accept_conn(nc, cs->pcb.tcp);
} else {
mg_lwip_post_signal(MG_SIG_CONNECT_RESULT, nc);
}
}
}
void mg_lwip_ssl_send(struct mg_connection *nc) {
if (nc->sock == INVALID_SOCKET) {
DBG(("%p invalid socket", nc));
return;
}
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
/* It's ok if the buffer is empty. Return value of 0 may also be valid. */
int len = cs->last_ssl_write_size;
if (len == 0) {
len = MIN(MG_LWIP_SSL_IO_SIZE, nc->send_mbuf.len);
}
int ret = SSL_write(nc->ssl, nc->send_mbuf.buf, len);
int err = SSL_get_error(nc->ssl, ret);
DBG(("%p SSL_write %u = %d, %d", nc, len, ret, err));
if (ret > 0) {
mbuf_remove(&nc->send_mbuf, ret);
mbuf_trim(&nc->send_mbuf);
cs->last_ssl_write_size = 0;
} else if (ret < 0) {
/* This is tricky. We must remember the exact data we were sending to retry
* exactly the same send next time. */
cs->last_ssl_write_size = len;
}
if (err == SSL_ERROR_NONE) {
nc->flags &= ~MG_F_WANT_WRITE;
} else if (err == SSL_ERROR_WANT_WRITE) {
nc->flags |= MG_F_WANT_WRITE;
} else {
LOG(LL_ERROR, ("SSL write error: %d", err));
mg_lwip_post_signal(MG_SIG_CLOSE_CONN, nc);
}
}
void mg_lwip_ssl_recv(struct mg_connection *nc) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
/* Don't deliver data before connect callback */
if (nc->flags & MG_F_CONNECTING) return;
while (nc->recv_mbuf.len < MG_LWIP_SSL_RECV_MBUF_LIMIT) {
char *buf = (char *) malloc(MG_LWIP_SSL_IO_SIZE);
if (buf == NULL) return;
int ret = SSL_read(nc->ssl, buf, MG_LWIP_SSL_IO_SIZE);
int err = SSL_get_error(nc->ssl, ret);
DBG(("%p SSL_read %u = %d, %d", nc, MG_LWIP_SSL_IO_SIZE, ret, err));
if (ret <= 0) {
free(buf);
if (err == SSL_ERROR_WANT_WRITE) {
nc->flags |= MG_F_WANT_WRITE;
return;
} else if (err == SSL_ERROR_WANT_READ) {
/* Nothing, we are callback-driven. */
cs->err = 0;
return;
} else {
LOG(LL_ERROR, ("SSL read error: %d", err));
mg_lwip_post_signal(MG_SIG_CLOSE_CONN, nc);
}
} else {
mg_if_recv_tcp_cb(nc, buf, ret); /* callee takes over data */
}
}
if (nc->recv_mbuf.len >= MG_LWIP_SSL_RECV_MBUF_LIMIT) {
nc->flags |= MG_F_WANT_READ;
} else {
nc->flags &= ~MG_F_WANT_READ;
}
}
ssize_t kr_send(int fd, const void *buf, size_t len, int flags) {
struct mg_connection *nc = (struct mg_connection *) fd;
int ret = mg_lwip_tcp_write(nc, buf, len);
(void) flags;
DBG(("mg_lwip_tcp_write %u = %d", len, ret));
if (ret <= 0) {
errno = (ret == 0 ? EWOULDBLOCK : EIO);
ret = -1;
}
return ret;
}
ssize_t kr_recv(int fd, void *buf, size_t len, int flags) {
struct mg_connection *nc = (struct mg_connection *) fd;
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
struct pbuf *seg = cs->rx_chain;
(void) flags;
if (seg == NULL) {
DBG(("%u - nothing to read", len));
errno = EWOULDBLOCK;
return -1;
}
size_t seg_len = (seg->len - cs->rx_offset);
DBG(("%u %u %u %u", len, cs->rx_chain->len, seg_len, cs->rx_chain->tot_len));
len = MIN(len, seg_len);
pbuf_copy_partial(seg, buf, len, cs->rx_offset);
cs->rx_offset += len;
tcp_recved(cs->pcb.tcp, len);
if (cs->rx_offset == cs->rx_chain->len) {
cs->rx_chain = pbuf_dechain(cs->rx_chain);
pbuf_free(seg);
cs->rx_offset = 0;
}
return len;
}
#endif /* defined(MG_NET_IF_LWIP) && defined(SSL_KRYPTON) */
......@@ -433,6 +433,15 @@ typedef struct stat cs_stat_t;
unsigned long os_random(void);
#define random os_random
#ifndef RTOS_SDK
#define MG_NET_IF_LWIP
struct mg_mgr;
struct mg_connection;
uint32_t mg_lwip_get_poll_delay_ms(struct mg_mgr *mgr);
void mg_lwip_set_keepalive_params(struct mg_connection *nc, int idle,
int interval, int count);
#endif
#endif /* CS_PLATFORM == CS_P_ESP_LWIP */
#endif /* CS_COMMON_PLATFORMS_PLATFORM_ESP_LWIP_H_ */
#ifdef MG_MODULE_LINES
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment