Commit 09e42547 authored by Our CI Bot's avatar Our CI Bot Committed by Cesanta Bot

Add mqtt_over_websocket_server example

Fixed bug in websocket handshake:
now adding Sec-WebSocket-Protocol header in handshake response.

PUBLISHED_FROM=e4a71ff9dc4aeec63db40cb7f356dc5b25c1215a
parent d4b55c4c
PROG = mqtt_over_websocket_server
MODULE_CFLAGS = -DMG_ENABLE_MQTT_BROKER=1
#SSL_LIB=mbedtls
include ../examples.mk
/*
* Copyright (c) 2014 Cesanta Software Limited
* All rights reserved
* This software is dual-licensed: you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation. For the terms of this
* license, see <http://www.gnu.org/licenses/>.
*
* You are free to use this software under the terms of the GNU General
* Public License, but WITHOUT ANY WARRANTY; without even the implied
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU General Public License for more details.
*
* Alternatively, you can license this software under a commercial
* license, as set out in <https://www.cesanta.com/license>.
*/
#include "mongoose.h"
static const char *s_mqtt_address = "0.0.0.0:1883";
static const char *s_http_address = "0.0.0.0:8080";
static void unproxy(struct mg_connection *c) {
struct mg_connection *pc = (struct mg_connection *) c->user_data;
if (pc != NULL) {
pc->flags |= MG_F_CLOSE_IMMEDIATELY;
pc->user_data = NULL;
c->user_data = NULL;
}
printf("Closing connection %p\n", c);
}
static void proxy_handler(struct mg_connection *c, int ev, void *ev_data) {
if (ev == MG_EV_POLL) return;
printf("%p %s EVENT %d %p\n", c, __func__, ev, ev_data);
switch (ev) {
case MG_EV_CLOSE: {
unproxy(c);
break;
}
case MG_EV_RECV: {
struct mg_connection *pc = (struct mg_connection *) c->user_data;
if (pc != NULL) {
mg_send_websocket_frame(pc, WEBSOCKET_OP_BINARY, c->recv_mbuf.buf,
c->recv_mbuf.len);
mbuf_remove(&c->recv_mbuf, c->recv_mbuf.len);
}
break;
}
}
}
static void http_handler(struct mg_connection *c, int ev, void *ev_data) {
struct mg_connection *pc = (struct mg_connection *) c->user_data;
if (ev == MG_EV_POLL) return;
printf("%p %s EVENT %d %p\n", c, __func__, ev, ev_data);
/* Do your custom event processing here */
switch (ev) {
case MG_EV_WEBSOCKET_HANDSHAKE_DONE: {
pc = mg_connect(c->mgr, s_mqtt_address, proxy_handler);
pc->user_data = c;
c->user_data = pc;
printf("Created proxy connection %p\n", pc);
break;
}
case MG_EV_WEBSOCKET_FRAME: {
struct websocket_message *wm = (struct websocket_message *) ev_data;
if (pc != NULL) {
printf("Forwarding %d bytes\n", (int) wm->size);
mg_send(pc, wm->data, wm->size);
}
break;
}
case MG_EV_CLOSE: {
unproxy(c);
break;
}
}
}
static void mqtt_handler(struct mg_connection *c, int ev, void *ev_data) {
if (ev == MG_EV_POLL) return;
printf("%p %s EVENT %d %p\n", c, __func__, ev, ev_data);
/* Do your custom event processing here */
switch (ev) {
case MG_EV_CLOSE:
printf("Closing MQTT connection %p\n", c);
break;
}
mg_mqtt_broker(c, ev, ev_data);
}
static void start_mqtt_server(struct mg_mgr *mgr, const char *addr) {
struct mg_connection *c;
static struct mg_mqtt_broker brk; // static is important - must not perish
if ((c = mg_bind(mgr, addr, mqtt_handler)) == NULL) {
fprintf(stderr, "Cannot start MQTT server on port [%s]\n", addr);
exit(EXIT_FAILURE);
}
mg_mqtt_broker_init(&brk, NULL);
c->user_data = &brk;
mg_set_protocol_mqtt(c);
printf("MQTT server started on %s\n", addr);
}
static void start_http_server(struct mg_mgr *mgr, const char *addr) {
struct mg_connection *c;
if ((c = mg_bind(mgr, addr, http_handler)) == NULL) {
fprintf(stderr, "Cannot start HTTP server on port [%s]\n", addr);
exit(EXIT_FAILURE);
}
mg_set_protocol_http_websocket(c);
printf("HTTP server started on %s\n", addr);
}
int main(void) {
struct mg_mgr mgr;
mg_mgr_init(&mgr, NULL);
start_http_server(&mgr, s_http_address);
start_mqtt_server(&mgr, s_mqtt_address);
for (;;) {
mg_mgr_poll(&mgr, 1000);
}
}
...@@ -34,9 +34,9 @@ ...@@ -34,9 +34,9 @@
#define MG_DISABLE_PFS #define MG_DISABLE_PFS
#endif #endif
/* Amalgamated: #include "mongoose/src/net.h" */
/* Amalgamated: #include "mongoose/src/http.h" */
/* Amalgamated: #include "common/cs_dbg.h" */ /* Amalgamated: #include "common/cs_dbg.h" */
/* Amalgamated: #include "mongoose/src/http.h" */
/* Amalgamated: #include "mongoose/src/net.h" */
#define MG_CTL_MSG_MESSAGE_SIZE 8192 #define MG_CTL_MSG_MESSAGE_SIZE 8192
...@@ -140,7 +140,8 @@ MG_INTERNAL void mg_handle_put(struct mg_connection *nc, const char *path, ...@@ -140,7 +140,8 @@ MG_INTERNAL void mg_handle_put(struct mg_connection *nc, const char *path,
MG_INTERNAL void mg_ws_handler(struct mg_connection *nc, int ev, MG_INTERNAL void mg_ws_handler(struct mg_connection *nc, int ev,
void *ev_data MG_UD_ARG(void *user_data)); void *ev_data MG_UD_ARG(void *user_data));
MG_INTERNAL void mg_ws_handshake(struct mg_connection *nc, MG_INTERNAL void mg_ws_handshake(struct mg_connection *nc,
const struct mg_str *key); const struct mg_str *key,
struct http_message *);
#endif #endif
#endif /* MG_ENABLE_HTTP */ #endif /* MG_ENABLE_HTTP */
...@@ -6021,7 +6022,7 @@ void mg_http_handler(struct mg_connection *nc, int ev, ...@@ -6021,7 +6022,7 @@ void mg_http_handler(struct mg_connection *nc, int ev,
hm); hm);
if (!(nc->flags & (MG_F_CLOSE_IMMEDIATELY | MG_F_SEND_AND_CLOSE))) { if (!(nc->flags & (MG_F_CLOSE_IMMEDIATELY | MG_F_SEND_AND_CLOSE))) {
if (nc->send_mbuf.len == 0) { if (nc->send_mbuf.len == 0) {
mg_ws_handshake(nc, vec); mg_ws_handshake(nc, vec, hm);
} }
mg_call(nc, nc->handler, nc->user_data, MG_EV_WEBSOCKET_HANDSHAKE_DONE, mg_call(nc, nc->handler, nc->user_data, MG_EV_WEBSOCKET_HANDSHAKE_DONE,
NULL); NULL);
...@@ -9164,8 +9165,9 @@ static int mg_deliver_websocket_data(struct mg_connection *nc) { ...@@ -9164,8 +9165,9 @@ static int mg_deliver_websocket_data(struct mg_connection *nc) {
unsigned char *p = (unsigned char *) nc->recv_mbuf.buf, *buf = p, unsigned char *p = (unsigned char *) nc->recv_mbuf.buf, *buf = p,
*e = p + buf_len; *e = p + buf_len;
unsigned *sizep = (unsigned *) &p[1]; /* Size ptr for defragmented frames */ unsigned *sizep = (unsigned *) &p[1]; /* Size ptr for defragmented frames */
int ok, reass = buf_len > 0 && mg_is_ws_fragment(p[0]) && int ok;
!(nc->flags & MG_F_WEBSOCKET_NO_DEFRAG); int reass = buf_len > 0 && mg_is_ws_fragment(p[0]) &&
!(nc->flags & MG_F_WEBSOCKET_NO_DEFRAG);
/* If that's a continuation frame that must be reassembled, handle it */ /* If that's a continuation frame that must be reassembled, handle it */
if (reass && !mg_is_ws_first_fragment(p[0]) && if (reass && !mg_is_ws_first_fragment(p[0]) &&
...@@ -9418,21 +9420,28 @@ extern void mg_hash_sha1_v(size_t num_msgs, const uint8_t *msgs[], ...@@ -9418,21 +9420,28 @@ extern void mg_hash_sha1_v(size_t num_msgs, const uint8_t *msgs[],
#endif #endif
MG_INTERNAL void mg_ws_handshake(struct mg_connection *nc, MG_INTERNAL void mg_ws_handshake(struct mg_connection *nc,
const struct mg_str *key) { const struct mg_str *key,
struct http_message *hm) {
static const char *magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; static const char *magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
const uint8_t *msgs[2] = {(const uint8_t *) key->p, (const uint8_t *) magic}; const uint8_t *msgs[2] = {(const uint8_t *) key->p, (const uint8_t *) magic};
const size_t msg_lens[2] = {key->len, 36}; const size_t msg_lens[2] = {key->len, 36};
unsigned char sha[20]; unsigned char sha[20];
char b64_sha[30]; char b64_sha[30];
struct mg_str *s;
mg_hash_sha1_v(2, msgs, msg_lens, sha); mg_hash_sha1_v(2, msgs, msg_lens, sha);
mg_base64_encode(sha, sizeof(sha), b64_sha); mg_base64_encode(sha, sizeof(sha), b64_sha);
mg_printf(nc, "%s%s%s", mg_printf(nc, "%s",
"HTTP/1.1 101 Switching Protocols\r\n" "HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n" "Upgrade: websocket\r\n"
"Connection: Upgrade\r\n" "Connection: Upgrade\r\n");
"Sec-WebSocket-Accept: ",
b64_sha, "\r\n\r\n"); s = mg_get_http_header(hm, "Sec-WebSocket-Protocol");
if (s != NULL) {
mg_printf(nc, "Sec-WebSocket-Protocol: %.*s\r\n", (int) s->len, s->p);
}
mg_printf(nc, "Sec-WebSocket-Accept: %s%s", b64_sha, "\r\n\r\n");
DBG(("%p %.*s %s", nc, (int) key->len, key->p, b64_sha)); DBG(("%p %.*s %s", nc, (int) key->len, key->p, b64_sha));
} }
...@@ -9996,7 +10005,8 @@ MG_INTERNAL int parse_mqtt(struct mbuf *io, struct mg_mqtt_message *mm) { ...@@ -9996,7 +10005,8 @@ MG_INTERNAL int parse_mqtt(struct mbuf *io, struct mg_mqtt_message *mm) {
break; break;
} }
return end - io->buf; mm->len = end - io->buf;
return mm->len;
} }
static void mqtt_handler(struct mg_connection *nc, int ev, static void mqtt_handler(struct mg_connection *nc, int ev,
...@@ -10008,6 +10018,9 @@ static void mqtt_handler(struct mg_connection *nc, int ev, ...@@ -10008,6 +10018,9 @@ static void mqtt_handler(struct mg_connection *nc, int ev,
nc->handler(nc, ev, ev_data MG_UD_ARG(user_data)); nc->handler(nc, ev, ev_data MG_UD_ARG(user_data));
switch (ev) { switch (ev) {
case MG_EV_ACCEPT:
if (nc->proto_data == NULL) mg_set_protocol_mqtt(nc);
break;
case MG_EV_RECV: { case MG_EV_RECV: {
/* There can be multiple messages in the buffer, process them all. */ /* There can be multiple messages in the buffer, process them all. */
while (1) { while (1) {
...@@ -10027,6 +10040,7 @@ static void mqtt_handler(struct mg_connection *nc, int ev, ...@@ -10027,6 +10040,7 @@ static void mqtt_handler(struct mg_connection *nc, int ev,
LOG(LL_DEBUG, ("Send PINGREQ")); LOG(LL_DEBUG, ("Send PINGREQ"));
mg_mqtt_ping(nc); mg_mqtt_ping(nc);
} }
break;
} }
} }
} }
...@@ -10438,7 +10452,7 @@ void mg_mqtt_broker(struct mg_connection *nc, int ev, void *data) { ...@@ -10438,7 +10452,7 @@ void mg_mqtt_broker(struct mg_connection *nc, int ev, void *data) {
switch (ev) { switch (ev) {
case MG_EV_ACCEPT: case MG_EV_ACCEPT:
mg_set_protocol_mqtt(nc); if (nc->proto_data == NULL) mg_set_protocol_mqtt(nc);
nc->user_data = NULL; /* Clear up the inherited pointer to broker */ nc->user_data = NULL; /* Clear up the inherited pointer to broker */
break; break;
case MG_EV_MQTT_CONNECT: case MG_EV_MQTT_CONNECT:
......
...@@ -5087,6 +5087,7 @@ int mg_http_create_digest_auth_header(char *buf, size_t buf_len, ...@@ -5087,6 +5087,7 @@ int mg_http_create_digest_auth_header(char *buf, size_t buf_len,
struct mg_mqtt_message { struct mg_mqtt_message {
int cmd; int cmd;
int qos; int qos;
int len; /* message length in the IO buffer */
struct mg_str topic; struct mg_str topic;
struct mg_str payload; struct mg_str payload;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment