diff --git a/mqtt/include/mqtt.h b/mqtt/include/mqtt.h index 9398cd4..862c481 100644 --- a/mqtt/include/mqtt.h +++ b/mqtt/include/mqtt.h @@ -140,6 +140,7 @@ void ICACHE_FLASH_ATTR MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback pu void ICACHE_FLASH_ATTR MQTT_OnTimeout(MQTT_Client *mqttClient, MqttCallback timeoutCb); void ICACHE_FLASH_ATTR MQTT_OnData(MQTT_Client *mqttClient, MqttDataCallback dataCb); BOOL ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos); +BOOL ICACHE_FLASH_ATTR MQTT_UnSubscribe(MQTT_Client *client, char* topic); void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client *mqttClient); void ICACHE_FLASH_ATTR MQTT_Disconnect(MQTT_Client *mqttClient); BOOL ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain); diff --git a/mqtt/mqtt.c b/mqtt/mqtt.c index eab8e4c..6043b52 100644 --- a/mqtt/mqtt.c +++ b/mqtt/mqtt.c @@ -142,7 +142,7 @@ mqtt_send_keepalive(MQTT_Client *client) client->keepAliveTick = 0; client->connState = MQTT_DATA; system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); - } + } else { client->connState = TCP_RECONNECT_DISCONNECTING; system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); @@ -158,7 +158,7 @@ void ICACHE_FLASH_ATTR mqtt_tcpclient_delete(MQTT_Client *mqttClient) { if (mqttClient->pCon != NULL) { - INFO("Free memory\r\n"); + INFO("Free memory\r\n"); espconn_delete(mqttClient->pCon); if (mqttClient->pCon->proto.tcp) os_free(mqttClient->pCon->proto.tcp); @@ -554,6 +554,32 @@ MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos) return TRUE; } +/** + * @brief MQTT un-subscibe function. + * @param client: MQTT_Client reference + * @param topic: String topic will un-subscribe + * @retval TRUE if success queue + */ +BOOL ICACHE_FLASH_ATTR +MQTT_UnSubscribe(MQTT_Client *client, char* topic) +{ + uint8_t dataBuffer[MQTT_BUF_SIZE]; + uint16_t dataLen; + client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection, + topic, + &client->mqtt_state.pending_msg_id); + INFO("MQTT: queue un-subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id); + while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) { + INFO("MQTT: Queue full\r\n"); + if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) { + INFO("MQTT: Serious buffer error\r\n"); + return FALSE; + } + } + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); + return TRUE; +} + /** * @brief MQTT ping function. * @param client: MQTT_Client reference @@ -611,14 +637,14 @@ MQTT_Task(os_event_t *e) } else { espconn_disconnect(client->pCon); - } + } break; case TCP_DISCONNECTED: INFO("MQTT: Disconnected\r\n"); mqtt_tcpclient_delete(client); break; case MQTT_DELETED: - INFO("MQTT: Deleted client\r\n"); + INFO("MQTT: Deleted client\r\n"); mqtt_client_delete(client); break; case MQTT_KEEPALIVE_SEND: @@ -698,15 +724,21 @@ MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_use os_strcpy(mqttClient->connect_info.client_id, client_id); mqttClient->connect_info.client_id[temp] = 0; - temp = os_strlen(client_user); - mqttClient->connect_info.username = (uint8_t*)os_zalloc(temp + 1); - os_strcpy(mqttClient->connect_info.username, client_user); - mqttClient->connect_info.username[temp] = 0; + if (client_user) + { + temp = os_strlen(client_user); + mqttClient->connect_info.username = (uint8_t*)os_zalloc(temp + 1); + os_strcpy(mqttClient->connect_info.username, client_user); + mqttClient->connect_info.username[temp] = 0; + } - temp = os_strlen(client_pass); - mqttClient->connect_info.password = (uint8_t*)os_zalloc(temp + 1); - os_strcpy(mqttClient->connect_info.password, client_pass); - mqttClient->connect_info.password[temp] = 0; + if (client_pass) + { + temp = os_strlen(client_pass); + mqttClient->connect_info.password = (uint8_t*)os_zalloc(temp + 1); + os_strcpy(mqttClient->connect_info.password, client_pass); + mqttClient->connect_info.password[temp] = 0; + } mqttClient->connect_info.keepalive = keepAliveTime; @@ -837,7 +869,7 @@ MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb) mqttClient->publishedCb = publishedCb; } -void ICACHE_FLASH_ATTR +void ICACHE_FLASH_ATTR MQTT_OnTimeout(MQTT_Client *mqttClient, MqttCallback timeoutCb) { mqttClient->timeoutCb = timeoutCb;