kopia lustrzana https://github.com/martin-ger/esp_mqtt
remove old queue to get more space if the queue is full
rodzic
2b7be3cc8e
commit
466e6dcf4b
19
mqtt/mqtt.c
19
mqtt/mqtt.c
|
@ -391,6 +391,8 @@ mqtt_tcpclient_recon_cb(void *arg, sint8 errType)
|
|||
BOOL ICACHE_FLASH_ATTR
|
||||
MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain)
|
||||
{
|
||||
uint8_t dataBuffer[MQTT_BUF_SIZE];
|
||||
uint16_t dataLen;
|
||||
client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
|
||||
topic, data, data_length,
|
||||
qos, retain,
|
||||
|
@ -400,9 +402,12 @@ MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_
|
|||
return FALSE;
|
||||
}
|
||||
INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
|
||||
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
|
||||
while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
|
||||
INFO("MQTT: Exceed the amount of queues\r\n");
|
||||
return FALSE;
|
||||
if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
|
||||
INFO("MQTT: Serious buffer error\r\n");
|
||||
return FALSE;
|
||||
}
|
||||
}
|
||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
||||
return TRUE;
|
||||
|
@ -418,15 +423,19 @@ MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_
|
|||
BOOL ICACHE_FLASH_ATTR
|
||||
MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos)
|
||||
{
|
||||
|
||||
uint8_t dataBuffer[MQTT_BUF_SIZE];
|
||||
uint16_t dataLen;
|
||||
|
||||
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
||||
topic, 0,
|
||||
&client->mqtt_state.pending_msg_id);
|
||||
INFO("MQTT: queue subscribe, topic\"%s\", id: %d\r\n",topic, client->mqtt_state.pending_msg_id);
|
||||
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
|
||||
while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
|
||||
INFO("MQTT: Exceed the amount of queues\r\n");
|
||||
return FALSE;
|
||||
if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
|
||||
INFO("MQTT: Serious buffer error\r\n");
|
||||
return FALSE;
|
||||
}
|
||||
}
|
||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
||||
return TRUE;
|
||||
|
|
Ładowanie…
Reference in New Issue