From 466e6dcf4bb7aecf0e828a5de606413e79e2830e Mon Sep 17 00:00:00 2001 From: Tuan PM Date: Mon, 2 Feb 2015 19:46:46 +0700 Subject: [PATCH] remove old queue to get more space if the queue is full --- mqtt/mqtt.c | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/mqtt/mqtt.c b/mqtt/mqtt.c index 8665248..b1d407e 100644 --- a/mqtt/mqtt.c +++ b/mqtt/mqtt.c @@ -391,6 +391,8 @@ mqtt_tcpclient_recon_cb(void *arg, sint8 errType) BOOL ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain) { + uint8_t dataBuffer[MQTT_BUF_SIZE]; + uint16_t dataLen; client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, topic, data, data_length, qos, retain, @@ -400,9 +402,12 @@ MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_ return FALSE; } INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size); - if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ + while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ INFO("MQTT: Exceed the amount of queues\r\n"); - return FALSE; + if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) { + INFO("MQTT: Serious buffer error\r\n"); + return FALSE; + } } system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); return TRUE; @@ -418,15 +423,19 @@ MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_ BOOL ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos) { - + uint8_t dataBuffer[MQTT_BUF_SIZE]; + uint16_t dataLen; client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, topic, 0, &client->mqtt_state.pending_msg_id); INFO("MQTT: queue subscribe, topic\"%s\", id: %d\r\n",topic, client->mqtt_state.pending_msg_id); - if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ + while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ INFO("MQTT: Exceed the amount of queues\r\n"); - return FALSE; + if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) { + INFO("MQTT: Serious buffer error\r\n"); + return FALSE; + } } system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); return TRUE;