Commit 4e6c2867 authored by Deomid Ryabkov's avatar Deomid Ryabkov Committed by Cesanta Bot

UDP fixes: move recv callback into event loop, ...

...and fix a leak where UDP connections with SEND_AND_CLOSE set would not be closed

PUBLISHED_FROM=d60f1fc037e8e57671c7d3146fabc1f068bbb514
parent 54989e81
......@@ -13535,6 +13535,8 @@ struct mg_lwip_conn_state {
size_t rx_offset; /* Offset within the first pbuf (if partially consumed) */
/* Last SSL write size, for retries. */
int last_ssl_write_size;
int recv_pending; /* Whether MG_SIG_RECV is already pending for this
connection */
};
enum mg_sig_type {
......@@ -13611,6 +13613,8 @@ void mg_lwip_if_add_conn(struct mg_connection *nc);
void mg_lwip_if_remove_conn(struct mg_connection *nc);
time_t mg_lwip_if_poll(struct mg_iface *iface, int timeout_ms);
static void mg_lwip_recv_common(struct mg_connection *nc, struct pbuf *p);
#if LWIP_TCP_KEEPALIVE
void mg_lwip_set_keepalive_params(struct mg_connection *nc, int idle,
int interval, int count) {
......@@ -13688,27 +13692,23 @@ static err_t mg_lwip_tcp_recv_cb(void *arg, struct tcp_pcb *tpcb,
for (; q != NULL; q = q->next) pbuf_ref(q);
}
if (cs->rx_chain == NULL) {
cs->rx_chain = p;
cs->rx_offset = 0;
} else {
if (pbuf_clen(cs->rx_chain) >= 4) {
/* ESP SDK has a limited pool of 5 pbufs. We must not hog them all or RX
* will be completely blocked. We already have at least 4 in the chain,
* this one is, so we have to make a copy and release this one. */
struct pbuf *np = pbuf_alloc(PBUF_RAW, p->tot_len, PBUF_RAM);
if (np != NULL) {
pbuf_copy(np, p);
pbuf_free(p);
p = np;
}
}
pbuf_chain(cs->rx_chain, p);
}
mg_lwip_post_signal(MG_SIG_RECV, nc);
} else if (pbuf_clen(cs->rx_chain) >= 4) {
/* ESP SDK has a limited pool of 5 pbufs. We must not hog them all or RX
* will be completely blocked. We already have at least 4 in the chain,
* this one is, so we have to make a copy and release this one. */
struct pbuf *np = pbuf_alloc(PBUF_RAW, p->tot_len, PBUF_RAM);
if (np != NULL) {
pbuf_copy(np, p);
pbuf_free(p);
p = np;
}
}
mg_lwip_recv_common(nc, p);
return ERR_OK;
}
static void mg_lwip_handle_recv(struct mg_connection *nc) {
static void mg_lwip_handle_recv_tcp(struct mg_connection *nc) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
#if MG_ENABLE_SSL
......@@ -13799,21 +13799,61 @@ static void mg_lwip_udp_recv_cb(void *arg, struct udp_pcb *pcb, struct pbuf *p,
#endif
{
struct mg_connection *nc = (struct mg_connection *) arg;
size_t len = p->len;
char *data = (char *) malloc(len);
union socket_address sa;
(void) pcb;
DBG(("%p %s:%u %u", nc, IPADDR_NTOA(addr), port, p->len));
if (data == NULL) {
DBG(("OOM"));
LOG(LL_INFO,
("%p %s:%u %p %u %u", nc, IPADDR_NTOA(addr), port, p, p->ref, p->len));
/* Put address in a separate pbuf and tack it onto the packet. */
struct pbuf *sap =
pbuf_alloc(PBUF_RAW, sizeof(union socket_address), PBUF_RAM);
if (sap == NULL) {
pbuf_free(p);
return;
}
sa.sin.sin_addr.s_addr = addr->addr;
sa.sin.sin_port = htons(port);
pbuf_copy_partial(p, data, len, 0);
pbuf_free(p);
mg_if_recv_udp_cb(nc, data, len, &sa, sizeof(sa.sin));
union socket_address *sa = (union socket_address *) sap->payload;
sa->sin.sin_addr.s_addr = addr->addr;
sa->sin.sin_port = htons(port);
/* Logic in the recv handler requires that there be exactly one data pbuf. */
p = pbuf_coalesce(p, PBUF_RAW);
pbuf_chain(sap, p);
mg_lwip_recv_common(nc, sap);
(void) pcb;
}
static void mg_lwip_recv_common(struct mg_connection *nc, struct pbuf *p) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
if (cs->rx_chain == NULL) {
cs->rx_chain = p;
} else {
pbuf_chain(cs->rx_chain, p);
}
if (!cs->recv_pending) {
cs->recv_pending = 1;
mg_lwip_post_signal(MG_SIG_RECV, nc);
}
}
static void mg_lwip_handle_recv_udp(struct mg_connection *nc) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
/*
* For UDP, RX chain consists of interleaved address and packet bufs:
* Address pbuf followed by exactly one data pbuf (recv_cb took care of that).
*/
while (cs->rx_chain != NULL) {
struct pbuf *sap = cs->rx_chain;
struct pbuf *p = sap->next;
cs->rx_chain = pbuf_dechain(p);
size_t data_len = p->len;
char *data = (char *) malloc(data_len);
if (data != NULL) {
pbuf_copy_partial(p, data, data_len, 0);
pbuf_free(p);
mg_if_recv_udp_cb(nc, data, data_len,
(union socket_address *) sap->payload, sap->len);
pbuf_free(sap);
} else {
pbuf_free(p);
pbuf_free(sap);
}
}
}
void mg_lwip_if_connect_udp(struct mg_connection *nc) {
......@@ -14168,7 +14208,12 @@ void mg_ev_mgr_lwip_process_signals(struct mg_mgr *mgr) {
break;
}
case MG_SIG_RECV: {
mg_lwip_handle_recv(nc);
cs->recv_pending = 0;
if (nc->flags & MG_F_UDP) {
mg_lwip_handle_recv_udp(nc);
} else {
mg_lwip_handle_recv_tcp(nc);
}
break;
}
case MG_SIG_SENT_CB: {
......@@ -14231,7 +14276,9 @@ time_t mg_lwip_if_poll(struct mg_iface *iface, int timeout_ms) {
struct mg_lwip_conn_state *cs = (struct mg_lwip_conn_state *) nc->sock;
tmp = nc->next;
n++;
if (nc->flags & MG_F_CLOSE_IMMEDIATELY) {
if ((nc->flags & MG_F_CLOSE_IMMEDIATELY) ||
((nc->flags & MG_F_SEND_AND_CLOSE) && (nc->flags & MG_F_UDP) &&
(nc->send_mbuf.len == 0))) {
mg_close_conn(nc);
continue;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment