diff --git a/src/mqtt_server.c b/src/mqtt_server.c index 4e02371..e00cd70 100644 --- a/src/mqtt_server.c +++ b/src/mqtt_server.c @@ -341,24 +341,32 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns } MQTT_INFO("MQTT: TCP: data received %d bytes (State: %d)\r\n", len, clientcon->connState); + MQTT_INFO("MQTT: clientcon->mqtt_state.message_length_read %d\r\n", clientcon->mqtt_state.message_length_read); - // Expect minimum the full fixed size header - if (len + clientcon->mqtt_state.message_length_read > MQTT_BUF_SIZE || len < 2) { - MQTT_ERROR("MQTT: Message too short/long\r\n"); - MQTT_server_disconnectClientCon(clientcon); - //clientcon->mqtt_state.message_length_read = 0; - return; + // Do not exceed receive buffer + if (len + clientcon->mqtt_state.message_length_read > MQTT_BUF_SIZE) { + MQTT_ERROR("MQTT: Message too long: %d\r\n", len + clientcon->mqtt_state.message_length_read); + MQTT_server_disconnectClientCon(clientcon); + clientcon->mqtt_state.message_length_read = 0; + return; } - READPACKET: + os_memcpy(&clientcon->mqtt_state.in_buffer[clientcon->mqtt_state.message_length_read], pdata, len); clientcon->mqtt_state.message_length_read += len; - clientcon->mqtt_state.message_length = - mqtt_get_total_length(clientcon->mqtt_state.in_buffer, clientcon->mqtt_state.message_length_read); - MQTT_INFO("MQTT: total_len: %d\r\n", clientcon->mqtt_state.message_length); + READPACKET: + // Expect minimum the full fixed size header + if (len < 2) { + MQTT_INFO("MQTT: Partial message received (< 2 byte)\r\n"); + return; + } + clientcon->mqtt_state.message_length = + mqtt_get_total_length(clientcon->mqtt_state.in_buffer, clientcon->mqtt_state.message_length_read); + MQTT_INFO("MQTT: next message length: %d\r\n", clientcon->mqtt_state.message_length); if (clientcon->mqtt_state.message_length > clientcon->mqtt_state.message_length_read) { - MQTT_WARNING("MQTT: Partial message received\r\n"); - return; + MQTT_INFO("MQTT: Partial message received (%d vs. %d\r\n", clientcon->mqtt_state.message_length, + clientcon->mqtt_state.message_length_read); + return; } msg_type = mqtt_get_type(clientcon->mqtt_state.in_buffer); @@ -592,7 +600,6 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns if ((local_auth_cb != NULL) && local_auth_cb(clientcon->connect_info.username==NULL?"":clientcon->connect_info.username, clientcon->connect_info.password==NULL?"":clientcon->connect_info.password, - clientcon->connect_info.client_id, clientcon->pCon) == false) { MQTT_WARNING("MQTT: Authorization failed\r\n"); @@ -816,14 +823,12 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns clientcon->connectionTimeout = 2 * clientcon->connect_info.keepalive+10; // More than one MQTT command in the packet? - len = clientcon->mqtt_state.message_length_read; - if (clientcon->mqtt_state.message_length < len) { - len -= clientcon->mqtt_state.message_length; - pdata += clientcon->mqtt_state.message_length; - clientcon->mqtt_state.message_length_read = 0; - - MQTT_INFO("MQTT: Get another received message\r\n"); - goto READPACKET; + int diff = clientcon->mqtt_state.message_length_read - clientcon->mqtt_state.message_length; + if (diff > 0) { + MQTT_INFO("MQTT: %d bytes left in buffer, try next message\r\n", diff); + os_memcpy(clientcon->mqtt_state.in_buffer, &clientcon->mqtt_state.in_buffer[clientcon->mqtt_state.message_length], diff); + clientcon->mqtt_state.message_length_read = diff; + goto READPACKET; } clientcon->mqtt_state.message_length_read = 0; @@ -834,6 +839,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns } } + /* Called when a client has disconnected from the MQTT server */ static void ICACHE_FLASH_ATTR MQTT_ClientCon_discon_cb(void *arg) { struct espconn *pCon = (struct espconn *)arg;