diff --git a/mqtt/include/mqtt.h b/mqtt/include/mqtt.h index 6a8a56d..9398cd4 100644 --- a/mqtt/include/mqtt.h +++ b/mqtt/include/mqtt.h @@ -70,6 +70,7 @@ typedef enum { DNS_RESOLVE, TCP_DISCONNECTING, TCP_DISCONNECTED, + TCP_RECONNECT_DISCONNECTING, TCP_RECONNECT_REQ, TCP_RECONNECT, TCP_CONNECTING, @@ -80,6 +81,7 @@ typedef enum { MQTT_SUBSCIBE_SEND, MQTT_SUBSCIBE_SENDING, MQTT_DATA, + MQTT_KEEPALIVE_SEND, MQTT_PUBLISH_RECV, MQTT_PUBLISHING, MQTT_DELETING, diff --git a/mqtt/mqtt.c b/mqtt/mqtt.c index 2735bb7..2e646e5 100644 --- a/mqtt/mqtt.c +++ b/mqtt/mqtt.c @@ -113,6 +113,42 @@ deliver_publish(MQTT_Client* client, uint8_t* message, int length) } +void ICACHE_FLASH_ATTR +mqtt_send_keepalive(MQTT_Client *client) +{ + INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port); + client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); + client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ; + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + + + client->sendTimeout = MQTT_SEND_TIMOUT; + INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id); + err_t result = ESPCONN_OK; + if (client->security) { +#ifdef MQTT_SSL_ENABLE + result = espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); +#else + INFO("TCP: Do not support SSL\r\n"); +#endif + } + else { + result = espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + } + + client->mqtt_state.outbound_message = NULL; + if(ESPCONN_OK == result) { + client->keepAliveTick = 0; + client->connState = MQTT_DATA; + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); + } + else { + client->connState = TCP_RECONNECT_DISCONNECTING; + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); + } +} + /** * @brief Delete tcp client and free all memory * @param mqttClient: The mqtt client which contain TCP client @@ -237,6 +273,7 @@ READPACKET: } break; case MQTT_DATA: + case MQTT_KEEPALIVE_SEND: client->mqtt_state.message_length_read = len; client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); @@ -338,7 +375,8 @@ mqtt_tcpclient_sent_cb(void *arg) MQTT_Client* client = (MQTT_Client *)pCon->reverse; INFO("TCP: Sent\r\n"); client->sendTimeout = 0; - if (client->connState == MQTT_DATA && client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH) { + if ((client->connState == MQTT_DATA || client->connState == MQTT_KEEPALIVE_SEND) + && client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH) { if (client->publishedCb) client->publishedCb((uint32_t*)client); } @@ -352,37 +390,8 @@ void ICACHE_FLASH_ATTR mqtt_timer(void *arg) if (client->connState == MQTT_DATA) { client->keepAliveTick ++; if (client->keepAliveTick > client->mqtt_state.connect_info->keepalive) { - - INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port); - client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); - client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ; - client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); - client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); - - - client->sendTimeout = MQTT_SEND_TIMOUT; - INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id); - err_t result = ESPCONN_OK; - if (client->security) { -#ifdef MQTT_SSL_ENABLE - result = espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); -#else - INFO("TCP: Do not support SSL\r\n"); -#endif - } - else { - result = espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); - } - - client->mqtt_state.outbound_message = NULL; - - if(ESPCONN_OK == result) { - client->keepAliveTick = 0; - system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); - } - else { - client->connState = TCP_RECONNECT_REQ; - } + client->connState = MQTT_KEEPALIVE_SEND; + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); } } else if (client->connState == TCP_RECONNECT_REQ) { @@ -585,12 +594,14 @@ MQTT_Task(os_event_t *e) case TCP_RECONNECT_REQ: break; case TCP_RECONNECT: + mqtt_tcpclient_delete(client); MQTT_Connect(client); INFO("TCP: Reconnect to: %s:%d\r\n", client->host, client->port); client->connState = TCP_CONNECTING; break; case MQTT_DELETING: case TCP_DISCONNECTING: + case TCP_RECONNECT_DISCONNECTING: if (client->security) { #ifdef MQTT_SSL_ENABLE espconn_secure_connect(client->pCon); @@ -610,6 +621,9 @@ MQTT_Task(os_event_t *e) INFO("MQTT: Deleted client\r\n"); mqtt_client_delete(client); break; + case MQTT_KEEPALIVE_SEND: + mqtt_send_keepalive(client); + break; case MQTT_DATA: if (QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0) { break; @@ -737,7 +751,11 @@ MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, ui void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client *mqttClient) { - MQTT_Disconnect(mqttClient); + // Do not connect if this client is already connected otherwise the + // two espconn connections may interfere causing unexpected behaviour. + if (mqttClient->pCon) { + return; + } mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn)); mqttClient->pCon->type = ESPCONN_TCP; mqttClient->pCon->state = ESPCONN_NONE;