add wrapper for un-subscribe

develop
Jeff H 2016-06-13 01:05:36 -07:00
rodzic 368c1aad63
commit 53c913256d
2 zmienionych plików z 32 dodań i 5 usunięć

Wyświetl plik

@ -140,6 +140,7 @@ void ICACHE_FLASH_ATTR MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback pu
void ICACHE_FLASH_ATTR MQTT_OnTimeout(MQTT_Client *mqttClient, MqttCallback timeoutCb);
void ICACHE_FLASH_ATTR MQTT_OnData(MQTT_Client *mqttClient, MqttDataCallback dataCb);
BOOL ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos);
BOOL MQTT_UnSubscribe(MQTT_Client *client, char* topic);
void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client *mqttClient);
void ICACHE_FLASH_ATTR MQTT_Disconnect(MQTT_Client *mqttClient);
BOOL ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain);

Wyświetl plik

@ -142,7 +142,7 @@ mqtt_send_keepalive(MQTT_Client *client)
client->keepAliveTick = 0;
client->connState = MQTT_DATA;
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
}
}
else {
client->connState = TCP_RECONNECT_DISCONNECTING;
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
@ -158,7 +158,7 @@ void ICACHE_FLASH_ATTR
mqtt_tcpclient_delete(MQTT_Client *mqttClient)
{
if (mqttClient->pCon != NULL) {
INFO("Free memory\r\n");
INFO("Free memory\r\n");
espconn_delete(mqttClient->pCon);
if (mqttClient->pCon->proto.tcp)
os_free(mqttClient->pCon->proto.tcp);
@ -554,6 +554,32 @@ MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos)
return TRUE;
}
/**
* @brief MQTT un-subscibe function.
* @param client: MQTT_Client reference
* @param topic: String topic will un-subscribe
* @retval TRUE if success queue
*/
BOOL ICACHE_FLASH_ATTR
MQTT_UnSubscribe(MQTT_Client *client, char* topic)
{
uint8_t dataBuffer[MQTT_BUF_SIZE];
uint16_t dataLen;
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
topic,
&client->mqtt_state.pending_msg_id);
INFO("MQTT: queue un-subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
INFO("MQTT: Queue full\r\n");
if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
INFO("MQTT: Serious buffer error\r\n");
return FALSE;
}
}
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
return TRUE;
}
/**
* @brief MQTT ping function.
* @param client: MQTT_Client reference
@ -611,14 +637,14 @@ MQTT_Task(os_event_t *e)
}
else {
espconn_disconnect(client->pCon);
}
}
break;
case TCP_DISCONNECTED:
INFO("MQTT: Disconnected\r\n");
mqtt_tcpclient_delete(client);
break;
case MQTT_DELETED:
INFO("MQTT: Deleted client\r\n");
INFO("MQTT: Deleted client\r\n");
mqtt_client_delete(client);
break;
case MQTT_KEEPALIVE_SEND:
@ -837,7 +863,7 @@ MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb)
mqttClient->publishedCb = publishedCb;
}
void ICACHE_FLASH_ATTR
void ICACHE_FLASH_ATTR
MQTT_OnTimeout(MQTT_Client *mqttClient, MqttCallback timeoutCb)
{
mqttClient->timeoutCb = timeoutCb;