From d8101388581e4da69ed5b84957ad7ac5f96f558b Mon Sep 17 00:00:00 2001 From: Tuan PM Date: Thu, 29 Jan 2015 09:57:15 +0700 Subject: [PATCH] fixes error reconnect, thank @scragill reported --- mqtt/mqtt.c | 82 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 56 insertions(+), 26 deletions(-) diff --git a/mqtt/mqtt.c b/mqtt/mqtt.c index 684a771..f9529c8 100644 --- a/mqtt/mqtt.c +++ b/mqtt/mqtt.c @@ -133,31 +133,37 @@ READPACKET: if(len < MQTT_BUF_SIZE && len > 0){ os_memcpy(client->mqtt_state.in_buffer, pdata, len); + msg_type = mqtt_get_type(client->mqtt_state.in_buffer); + msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer); + msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); switch(client->connState){ - case MQTT_DATA: - client->mqtt_state.message_length_read = len; - client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); - msg_type = mqtt_get_type(client->mqtt_state.in_buffer); - msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer); - msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); - - switch(msg_type) - { - case MQTT_MSG_TYPE_CONNACK: - if(client->mqtt_state.pending_msg_type != MQTT_MSG_TYPE_CONNECT){ - INFO("MQTT: Invalid packet\r\n"); + case MQTT_CONNECT_SENDING: + if(msg_type == MQTT_MSG_TYPE_CONNACK){ + if(client->mqtt_state.pending_msg_type != MQTT_MSG_TYPE_CONNECT){ + INFO("MQTT: Invalid packet\r\n"); if(client->security){ espconn_secure_disconnect(client->pCon); } else { espconn_disconnect(client->pCon); } - } else { + } else { INFO("MQTT: Connected to %s:%d\r\n", client->host, client->port); + client->connState = MQTT_DATA; if(client->connectedCb) client->connectedCb((uint32_t*)client); - } - break; + } + + } + break; + case MQTT_DATA: + client->mqtt_state.message_length_read = len; + client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); + + + switch(msg_type) + { + case MQTT_MSG_TYPE_SUBACK: if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) INFO("MQTT: Subscribe successful\r\n"); @@ -269,9 +275,21 @@ void ICACHE_FLASH_ATTR mqtt_timer(void *arg) INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port); client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); - if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ - INFO("MQTT: Exceed the amount of queues\r\n"); + client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ; + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + + + client->sendTimeout = MQTT_SEND_TIMOUT; + INFO("MQTT: Sending, type: %d, id: %04X\r\n",client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id); + if(client->security){ + espconn_secure_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); } + else{ + espconn_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + } + + client->mqtt_state.outbound_message = NULL; client->keepAliveTick = 0; system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); @@ -323,11 +341,21 @@ mqtt_tcpclient_connect_cb(void *arg) mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length); client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info); - if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ - INFO("MQTT: Exceed the amount of queues\r\n"); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + + + client->sendTimeout = MQTT_SEND_TIMOUT; + INFO("MQTT: Sending, type: %d, id: %04X\r\n",client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id); + if(client->security){ + espconn_secure_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); } + else{ + espconn_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + } + client->mqtt_state.outbound_message = NULL; - client->connState = MQTT_DATA; + client->connState = MQTT_CONNECT_SENDING; system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); } @@ -363,15 +391,16 @@ mqtt_tcpclient_recon_cb(void *arg, sint8 errType) BOOL ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain) { - mqtt_message_t* outbound_message; - - - outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, + client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, topic, data, data_length, qos, retain, &client->mqtt_state.pending_msg_id); - INFO("MQTT: queuing publish, length: %d...\r\n", outbound_message->length); - if(QUEUE_Puts(&client->msgQueue, outbound_message->data, outbound_message->length) == -1){ + if(client->mqtt_state.outbound_message->length == 0){ + INFO("MQTT: Queuing publish failed\r\n"); + return FALSE; + } + INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size); + if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ INFO("MQTT: Exceed the amount of queues\r\n"); return FALSE; } @@ -516,6 +545,7 @@ void MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* clien mqttClient->mqtt_state.connect_info = &mqttClient->connect_info; mqttClient->keepAliveTick = 0; mqttClient->reconnectTick = 0; + mqtt_msg_init(&mqttClient->mqtt_state.mqtt_connection, mqttClient->mqtt_state.out_buffer, mqttClient->mqtt_state.out_buffer_length); os_timer_disarm(&mqttClient->mqttTimer); os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient);