Commit 3a8f47be authored by Sergey Lyubka's avatar Sergey Lyubka Committed by Cesanta Bot

Mqtt: parsing auth info in the connect message

PUBLISHED_FROM=017e707ea2bd7b1d1972fbb92e5d98c74e17e9d0
parent a3e2a418
......@@ -20,7 +20,6 @@ items:
- { name: mg_send_mqtt_handshake.md }
- { name: mg_send_mqtt_handshake_opt.md }
- { name: mg_set_protocol_mqtt.md }
- { name: struct_mg_mqtt_topic_expression.md }
---
......
---
title: "struct mg_mqtt_topic_expression"
decl_name: "struct mg_mqtt_topic_expression"
symbol_kind: "struct"
signature: |
struct mg_mqtt_topic_expression {
const char *topic;
uint8_t qos;
};
---
puback
......@@ -63,10 +63,10 @@ static void ev_handler(struct mg_connection *nc, int ev, void *p) {
#if 0
char hex[1024] = {0};
mg_hexdump(nc->recv_mbuf.buf, msg->payload.len, hex, sizeof(hex));
printf("Got incoming message %s:\n%s", msg->topic, hex);
printf("Got incoming message %.*s:\n%s", (int)msg->topic.len, msg->topic.p, hex);
#else
printf("Got incoming message %s: %.*s\n", msg->topic,
(int) msg->payload.len, msg->payload.p);
printf("Got incoming message %.*s: %.*s\n", (int)msg->topic.len,
msg->topic.p, (int) msg->payload.len, msg->payload.p);
#endif
printf("Forwarding to /test\n");
......
......@@ -8246,72 +8246,100 @@ MG_INTERNAL int mg_get_errno(void) {
/* Amalgamated: #include "mongoose/src/internal.h" */
/* Amalgamated: #include "mongoose/src/mqtt.h" */
static const char *scanto(const char *p, struct mg_str *s) {
s->len = ntohs(*(uint16_t *) p);
s->p = p + 2;
return p + 2 + s->len;
}
MG_INTERNAL int parse_mqtt(struct mbuf *io, struct mg_mqtt_message *mm) {
uint8_t header;
int cmd;
size_t len = 0;
int var_len = 0;
char *vlen = &io->buf[1];
int cmd;
const char *p = &io->buf[1], *end;
if (io->len < 2) return -1;
header = io->buf[0];
cmd = header >> 4;
/* decode mqtt variable length */
do {
len += (*vlen & 127) << 7 * (vlen - &io->buf[1]);
} while ((*vlen++ & 128) != 0 && ((size_t)(vlen - io->buf) <= io->len));
len += (*p & 127) << 7 * (p - &io->buf[1]);
} while ((*p++ & 128) != 0 && ((size_t)(p - io->buf) <= io->len));
if (len != 0 && io->len < (size_t)(len - 1)) return -1;
end = p + len;
if (end > io->buf + io->len + 1) {
return -1;
}
mbuf_remove(io, 1 + (vlen - &io->buf[1]));
mm->cmd = cmd;
mm->qos = MG_MQTT_GET_QOS(header);
switch (cmd) {
case MG_MQTT_CMD_CONNECT:
/* TODO(mkm): parse keepalive and will */
case MG_MQTT_CMD_CONNECT: {
p = scanto(p, &mm->protocol_name);
mm->protocol_version = *(uint8_t *) p++;
mm->connect_flags = *(uint8_t *) p++;
mm->keep_alive_timer = ntohs(*(uint16_t *) p);
p += 2;
if (p < end) p = scanto(p, &mm->client_id);
if (p < end && (mm->connect_flags & MG_MQTT_HAS_WILL))
p = scanto(p, &mm->will_topic);
if (p < end && (mm->connect_flags & MG_MQTT_HAS_WILL))
p = scanto(p, &mm->will_message);
if (p < end && (mm->connect_flags & MG_MQTT_HAS_USER_NAME))
p = scanto(p, &mm->user_name);
if (p < end && (mm->connect_flags & MG_MQTT_HAS_PASSWORD))
p = scanto(p, &mm->password);
LOG(LL_DEBUG,
("%d %2x %d proto [%.*s] client_id [%.*s] will_topic [%.*s] "
"will_msg [%.*s] user_name [%.*s] password [%.*s]",
len, (int) mm->connect_flags, (int) mm->keep_alive_timer,
(int) mm->protocol_name.len, mm->protocol_name.p,
(int) mm->client_id.len, mm->client_id.p, (int) mm->will_topic.len,
mm->will_topic.p, (int) mm->will_message.len, mm->will_message.p,
(int) mm->user_name.len, mm->user_name.p, (int) mm->password.len,
mm->password.p));
break;
}
case MG_MQTT_CMD_CONNACK:
mm->connack_ret_code = io->buf[1];
var_len = 2;
mm->connack_ret_code = p[1];
break;
case MG_MQTT_CMD_PUBACK:
case MG_MQTT_CMD_PUBREC:
case MG_MQTT_CMD_PUBREL:
case MG_MQTT_CMD_PUBCOMP:
case MG_MQTT_CMD_SUBACK:
mm->message_id = ntohs(*(uint16_t *) io->buf);
var_len = 2;
mm->message_id = ntohs(*(uint16_t *) p);
break;
case MG_MQTT_CMD_PUBLISH: {
uint16_t topic_len = ntohs(*(uint16_t *) io->buf);
mm->topic = (char *) MG_MALLOC(topic_len + 1);
mm->topic[topic_len] = 0;
strncpy(mm->topic, io->buf + 2, topic_len);
var_len = topic_len + 2;
if (MG_MQTT_GET_QOS(header) > 0) {
mm->message_id = ntohs(*(uint16_t *) io->buf);
var_len += 2;
p += 2;
}
p = scanto(p, &mm->topic);
mm->payload.p = p;
mm->payload.len = end - p;
break;
}
} break;
case MG_MQTT_CMD_SUBSCRIBE:
mm->message_id = ntohs(*(uint16_t *) p);
p += 2;
/*
* topic expressions are left in the payload and can be parsed with
* `mg_mqtt_next_subscribe_topic`
*/
mm->message_id = ntohs(*(uint16_t *) io->buf);
var_len = 2;
mm->payload.p = p;
mm->payload.len = end - p;
break;
default:
/* Unhandled command */
break;
}
mbuf_remove(io, var_len);
return len - var_len;
return end - io->buf;
}
static void mqtt_handler(struct mg_connection *nc, int ev, void *ev_data) {
......@@ -8326,15 +8354,8 @@ static void mqtt_handler(struct mg_connection *nc, int ev, void *ev_data) {
case MG_EV_RECV:
len = parse_mqtt(io, &mm);
if (len == -1) break; /* not fully buffered */
mm.payload.p = io->buf;
mm.payload.len = len;
nc->handler(nc, MG_MQTT_EVENT_BASE + mm.cmd, &mm);
if (mm.topic) {
MG_FREE(mm.topic);
}
mbuf_remove(io, mm.payload.len);
mbuf_remove(io, len);
break;
}
}
......@@ -8463,6 +8484,7 @@ void mg_mqtt_subscribe(struct mg_connection *nc,
int mg_mqtt_next_subscribe_topic(struct mg_mqtt_message *msg,
struct mg_str *topic, uint8_t *qos, int pos) {
unsigned char *buf = (unsigned char *) msg->payload.p + pos;
if ((size_t) pos >= msg->payload.len) {
return -1;
}
......@@ -8610,7 +8632,7 @@ void mg_mqtt_broker_init(struct mg_mqtt_broker *brk, void *user_data) {
static void mg_mqtt_broker_handle_connect(struct mg_mqtt_broker *brk,
struct mg_connection *nc) {
struct mg_mqtt_session *s = (struct mg_mqtt_session *) malloc(sizeof *s);
struct mg_mqtt_session *s = (struct mg_mqtt_session *) calloc(1, sizeof *s);
if (s == NULL) {
/* LCOV_EXCL_START */
mg_mqtt_connack(nc, MG_EV_MQTT_CONNACK_SERVER_UNAVAILABLE);
......@@ -8630,6 +8652,7 @@ static void mg_mqtt_broker_handle_connect(struct mg_mqtt_broker *brk,
static void mg_mqtt_broker_handle_subscribe(struct mg_connection *nc,
struct mg_mqtt_message *msg) {
struct mg_mqtt_session *ss = (struct mg_mqtt_session *) nc->user_data;
uint8_t qoss[512];
size_t qoss_len = 0;
......@@ -8643,6 +8666,7 @@ static void mg_mqtt_broker_handle_subscribe(struct mg_connection *nc,
qoss[qoss_len++] = qos;
}
ss->subscriptions = (struct mg_mqtt_topic_expression *) realloc(
ss->subscriptions, sizeof(*ss->subscriptions) * qoss_len);
for (pos = 0;
......@@ -8664,13 +8688,17 @@ static void mg_mqtt_broker_handle_subscribe(struct mg_connection *nc,
*
* Returns 1 if it matches; 0 otherwise.
*/
static int mg_mqtt_match_topic_expression(const char *exp, const char *topic) {
static int mg_mqtt_match_topic_expression(const char *exp,
const struct mg_str *topic) {
/* TODO(mkm): implement real matching */
int len = strlen(exp);
size_t len = strlen(exp);
if (strchr(exp, '#')) {
len -= 2;
if (topic->len < len) {
len = topic->len;
}
return strncmp(exp, topic, len) == 0;
}
return strncmp(topic->p, exp, len) == 0;
}
static void mg_mqtt_broker_handle_publish(struct mg_mqtt_broker *brk,
......@@ -8681,9 +8709,16 @@ static void mg_mqtt_broker_handle_publish(struct mg_mqtt_broker *brk,
for (s = mg_mqtt_next(brk, NULL); s != NULL; s = mg_mqtt_next(brk, s)) {
for (i = 0; i < s->num_subscriptions; i++) {
if (mg_mqtt_match_topic_expression(s->subscriptions[i].topic,
msg->topic)) {
mg_mqtt_publish(s->nc, msg->topic, 0, 0, msg->payload.p,
msg->payload.len);
&msg->topic)) {
char buf[100], *p = buf;
mg_asprintf(&p, sizeof(buf), "%.*s", (int) msg->topic.len, msg->topic.p);
if (p == NULL) {
return;
}
mg_mqtt_publish(s->nc, p, 0, 0, msg->payload.p, msg->payload.len);
if (p != buf) {
MG_FREE(p);
}
break;
}
}
......@@ -8714,7 +8749,7 @@ void mg_mqtt_broker(struct mg_connection *nc, int ev, void *data) {
mg_mqtt_broker_handle_publish(brk, msg);
break;
case MG_EV_CLOSE:
if (nc->listener) {
if (nc->listener && nc->user_data != NULL) {
mg_mqtt_close_session((struct mg_mqtt_session *) nc->user_data);
}
break;
......
......@@ -3624,11 +3624,23 @@ int mg_http_create_digest_auth_header(char *buf, size_t buf_len,
struct mg_mqtt_message {
int cmd;
struct mg_str payload;
int qos;
struct mg_str topic;
struct mg_str payload;
uint8_t connack_ret_code; /* connack */
uint16_t message_id; /* puback */
char *topic;
/* connect */
uint8_t protocol_version;
uint8_t connect_flags;
uint16_t keep_alive_timer;
struct mg_str protocol_name;
struct mg_str client_id;
struct mg_str will_topic;
struct mg_str will_message;
struct mg_str user_name;
struct mg_str password;
};
struct mg_mqtt_topic_expression {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment