Merge pull request #112 from someburner/master

add wrapper for un-subscribe, check user/pass for NULL
develop
Tuan PM 2016-06-30 07:44:44 +07:00 zatwierdzone przez GitHub
commit 8933adb4d6
2 zmienionych plików z 46 dodań i 13 usunięć

Wyświetl plik

@ -140,6 +140,7 @@ void ICACHE_FLASH_ATTR MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback pu
void ICACHE_FLASH_ATTR MQTT_OnTimeout(MQTT_Client *mqttClient, MqttCallback timeoutCb);
void ICACHE_FLASH_ATTR MQTT_OnData(MQTT_Client *mqttClient, MqttDataCallback dataCb);
BOOL ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos);
BOOL ICACHE_FLASH_ATTR MQTT_UnSubscribe(MQTT_Client *client, char* topic);
void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client *mqttClient);
void ICACHE_FLASH_ATTR MQTT_Disconnect(MQTT_Client *mqttClient);
BOOL ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain);

Wyświetl plik

@ -142,7 +142,7 @@ mqtt_send_keepalive(MQTT_Client *client)
client->keepAliveTick = 0;
client->connState = MQTT_DATA;
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
}
}
else {
client->connState = TCP_RECONNECT_DISCONNECTING;
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
@ -158,7 +158,7 @@ void ICACHE_FLASH_ATTR
mqtt_tcpclient_delete(MQTT_Client *mqttClient)
{
if (mqttClient->pCon != NULL) {
INFO("Free memory\r\n");
INFO("Free memory\r\n");
espconn_delete(mqttClient->pCon);
if (mqttClient->pCon->proto.tcp)
os_free(mqttClient->pCon->proto.tcp);
@ -554,6 +554,32 @@ MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos)
return TRUE;
}
/**
* @brief MQTT un-subscibe function.
* @param client: MQTT_Client reference
* @param topic: String topic will un-subscribe
* @retval TRUE if success queue
*/
BOOL ICACHE_FLASH_ATTR
MQTT_UnSubscribe(MQTT_Client *client, char* topic)
{
uint8_t dataBuffer[MQTT_BUF_SIZE];
uint16_t dataLen;
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
topic,
&client->mqtt_state.pending_msg_id);
INFO("MQTT: queue un-subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1) {
INFO("MQTT: Queue full\r\n");
if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
INFO("MQTT: Serious buffer error\r\n");
return FALSE;
}
}
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
return TRUE;
}
/**
* @brief MQTT ping function.
* @param client: MQTT_Client reference
@ -611,14 +637,14 @@ MQTT_Task(os_event_t *e)
}
else {
espconn_disconnect(client->pCon);
}
}
break;
case TCP_DISCONNECTED:
INFO("MQTT: Disconnected\r\n");
mqtt_tcpclient_delete(client);
break;
case MQTT_DELETED:
INFO("MQTT: Deleted client\r\n");
INFO("MQTT: Deleted client\r\n");
mqtt_client_delete(client);
break;
case MQTT_KEEPALIVE_SEND:
@ -698,15 +724,21 @@ MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_use
os_strcpy(mqttClient->connect_info.client_id, client_id);
mqttClient->connect_info.client_id[temp] = 0;
temp = os_strlen(client_user);
mqttClient->connect_info.username = (uint8_t*)os_zalloc(temp + 1);
os_strcpy(mqttClient->connect_info.username, client_user);
mqttClient->connect_info.username[temp] = 0;
if (client_user)
{
temp = os_strlen(client_user);
mqttClient->connect_info.username = (uint8_t*)os_zalloc(temp + 1);
os_strcpy(mqttClient->connect_info.username, client_user);
mqttClient->connect_info.username[temp] = 0;
}
temp = os_strlen(client_pass);
mqttClient->connect_info.password = (uint8_t*)os_zalloc(temp + 1);
os_strcpy(mqttClient->connect_info.password, client_pass);
mqttClient->connect_info.password[temp] = 0;
if (client_pass)
{
temp = os_strlen(client_pass);
mqttClient->connect_info.password = (uint8_t*)os_zalloc(temp + 1);
os_strcpy(mqttClient->connect_info.password, client_pass);
mqttClient->connect_info.password[temp] = 0;
}
mqttClient->connect_info.keepalive = keepAliveTime;
@ -837,7 +869,7 @@ MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb)
mqttClient->publishedCb = publishedCb;
}
void ICACHE_FLASH_ATTR
void ICACHE_FLASH_ATTR
MQTT_OnTimeout(MQTT_Client *mqttClient, MqttCallback timeoutCb)
{
mqttClient->timeoutCb = timeoutCb;