Optimize the code, fixes some issues for qos=1 and 2

develop
Tuan PM 2015-01-17 17:30:34 +07:00
rodzic d121eaf694
commit a8c335eb3d
3 zmienionych plików z 107 dodań i 56 usunięć

Wyświetl plik

@ -1,4 +1,5 @@
/* mqtt.c
* Protocol: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
*
* Copyright (c) 2014-2015, Tuan PM <tuanpm at live dot com>
* All rights reserved.
@ -97,7 +98,7 @@ mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
LOCAL void ICACHE_FLASH_ATTR
deliver_publish(MQTT_Client* client, uint8_t* message, int length)
{
INFO("deliver_publish\r\n");
mqtt_event_data_t event_data;
event_data.topic_length = length;
@ -146,27 +147,9 @@ mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
INFO("TCP: data received\r\n");
if(len < MQTT_BUF_SIZE && len > 0){
memcpy(client->mqtt_state.in_buffer, pdata, len);
os_memcpy(client->mqtt_state.in_buffer, pdata, len);
switch(client->connState){
case MQTT_CONNECT_SENDING:
if(mqtt_get_type(client->mqtt_state.in_buffer) != MQTT_MSG_TYPE_CONNACK){
INFO("MQTT: Invalid packet\r\n");
if(client->security){
espconn_secure_disconnect(client->pCon);
}
else {
espconn_disconnect(client->pCon);
}
} else {
INFO("MQTT: Connected to %s:%d\r\n", client->host, client->port);
client->connState = MQTT_DATA;
if(client->connectedCb)
client->connectedCb((uint32_t*)client);
}
break;
case MQTT_DATA:
client->mqtt_state.message_length_read = len;
client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
@ -175,6 +158,21 @@ mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
switch(msg_type)
{
case MQTT_MSG_TYPE_CONNACK:
if(client->mqtt_state.pending_msg_type != MQTT_MSG_TYPE_CONNECT){
INFO("MQTT: Invalid packet\r\n");
if(client->security){
espconn_secure_disconnect(client->pCon);
}
else {
espconn_disconnect(client->pCon);
}
} else {
INFO("MQTT: Connected to %s:%d\r\n", client->host, client->port);
if(client->connectedCb)
client->connectedCb((uint32_t*)client);
}
break;
case MQTT_MSG_TYPE_SUBACK:
if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
INFO("MQTT: Subscribe successful to %s:%d\r\n", client->host, client->port);
@ -188,12 +186,17 @@ mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
else if(msg_qos == 2)
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
if(msg_qos == 1 || msg_qos == 2){
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
INFO("MQTT: Exceed the amount of queues\r\n");
}
}
deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
break;
case MQTT_MSG_TYPE_PUBACK:
if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id){
INFO("MQTT: Publish successful\r\n");
INFO("MQTT: Publish successful qos = 1\r\n");
if(client->publishedCb)
client->publishedCb((uint32_t*)client);
}
@ -201,19 +204,28 @@ mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
break;
case MQTT_MSG_TYPE_PUBREC:
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
INFO("MQTT: Exceed the amount of queues\r\n");
}
break;
case MQTT_MSG_TYPE_PUBREL:
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
INFO("MQTT: Exceed the amount of queues\r\n");
}
break;
case MQTT_MSG_TYPE_PUBCOMP:
if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id){
INFO("MQTT: Public successful\r\n");
INFO("MQTT: Public successful qos = 2\r\n");
if(client->publishedCb)
client->publishedCb((uint32_t*)client);
}
break;
case MQTT_MSG_TYPE_PINGREQ:
client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection);
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
INFO("MQTT: Exceed the amount of queues\r\n");
}
break;
case MQTT_MSG_TYPE_PINGRESP:
// Ignore
@ -266,7 +278,7 @@ mqtt_tcpclient_sent_cb(void *arg)
struct espconn *pCon = (struct espconn *)arg;
MQTT_Client* client = (MQTT_Client *)pCon->reverse;
INFO("TCP: Sent\r\n");
if(client->connState == MQTT_DATA){
if(client->connState == MQTT_DATA && client->mqtt_state.pending_publish_qos == 0){
if(client->publishedCb)
client->publishedCb((uint32_t*)client);
}
@ -276,15 +288,14 @@ mqtt_tcpclient_sent_cb(void *arg)
void ICACHE_FLASH_ATTR mqtt_timer(void *arg)
{
MQTT_Client* client = (MQTT_Client*)arg;
mqtt_message_t* outbound_message;
if(client->connState == MQTT_DATA){
client->keepAliveTick ++;
if(client->keepAliveTick > client->mqtt_state.connect_info->keepalive){
INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port);
outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
if(QUEUE_Puts(&client->msgQueue, outbound_message->data, outbound_message->length) == -1){
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
INFO("MQTT: Exceed the amount of queues\r\n");
}
@ -333,7 +344,14 @@ mqtt_tcpclient_connect_cb(void *arg)
espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv);////////
espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb);///////
INFO("MQTT: Connected to broker %s:%d\r\n", client->host, client->port);
client->connState = MQTT_CONNECT_SEND;
mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length);
client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info);
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
INFO("MQTT: Exceed the amount of queues\r\n");
}
client->mqtt_state.outbound_message = NULL;
client->connState = MQTT_DATA;
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
}
@ -356,7 +374,16 @@ mqtt_tcpclient_recon_cb(void *arg, sint8 errType)
}
/**
* @brief MQTT publish function.
* @param client: MQTT_Client reference
* @param topic: string topic will publish to
* @param data: buffer data send point to
* @param data_length: length of data
* @param qos: qos
* @param retain: retain
* @retval TRUE if success queue
*/
BOOL ICACHE_FLASH_ATTR
MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain)
{
@ -376,18 +403,23 @@ MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_
return TRUE;
}
/**
* @brief MQTT subscibe function.
* @param client: MQTT_Client reference
* @param topic: string topic will subscribe
* @param qos: qos
* @retval TRUE if success queue
*/
BOOL ICACHE_FLASH_ATTR
MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos)
{
mqtt_message_t* outbound_message;
INFO("MQTT: subscribe, topic\"%s\" at broker %s:%d\r\n", topic, client->host, client->port);
outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, 0,
&client->mqtt_state.pending_msg_id);
if(QUEUE_Puts(&client->msgQueue, outbound_message->data, outbound_message->length) == -1){
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
INFO("MQTT: Exceed the amount of queues\r\n");
return FALSE;
}
@ -410,25 +442,19 @@ MQTT_Task(os_event_t *e)
INFO("TCP:Reconect to: %s:%d\r\n", client->host, client->port);
client->connState = TCP_CONNECTING;
break;
case MQTT_CONNECT_SEND:
mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length);
client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info);
if(client->security){
espconn_secure_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
}
else
{
espconn_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
}
INFO("MQTT: Send mqtt connection info, to broker %s:%d\r\n", client->host, client->port);
client->connState = MQTT_CONNECT_SENDING;
client->mqtt_state.outbound_message = NULL;
break;
case MQTT_DATA:
if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0){
INFO("MQTT: Sending..\r\n");
uint8_t type = mqtt_get_type(dataBuffer);
client->mqtt_state.pending_msg_type = type;
if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH)
client->mqtt_state.pending_publish_qos = mqtt_get_qos(dataBuffer);
else
client->mqtt_state.pending_publish_qos = 4;
client->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen);
INFO("MQTT: Sending..type: %d,qos: %d\r\n",client->mqtt_state.pending_msg_type, client->mqtt_state.pending_publish_qos);
if(client->security){
espconn_secure_sent(client->pCon, dataBuffer, dataLen);
}
@ -445,7 +471,14 @@ MQTT_Task(os_event_t *e)
}
}
/**
* @brief MQTT initialization connection function
* @param client: MQTT_Client reference
* @param host: Domain or IP string
* @param port: Port to connect
* @param security: 1 for ssl, 0 for none
* @retval None
*/
void MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32 port, uint8_t security)
{
INFO("MQTT_InitConnection\r\n");
@ -465,8 +498,15 @@ void MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32 port, ui
espconn_regist_reconcb(mqttClient->pCon, mqtt_tcpclient_recon_cb);
}
/**
* @brief MQTT initialization mqtt client function
* @param client: MQTT_Client reference
* @param clientid: MQTT client id
* @param client_user:MQTT client user
* @param client_pass:MQTT client password
* @param client_pass:MQTT keep alive timer, in second
* @retval None
*/
void MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime)
{
INFO("MQTT_InitClient\r\n");
@ -478,7 +518,6 @@ void MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* clien
mqttClient->connect_info.keepalive = keepAliveTime;
mqttClient->connect_info.clean_session = 1;
//mqttClient->mqtt_state = (mqtt_state_t *)os_zalloc(sizeof(mqtt_state_t));
mqttClient->mqtt_state.in_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);
mqttClient->mqtt_state.in_buffer_length = MQTT_BUF_SIZE;
@ -498,6 +537,11 @@ void MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* clien
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
}
/**
* @brief Begin connect to MQTT broker
* @param client: MQTT_Client reference
* @retval None
*/
void MQTT_Connect(MQTT_Client *mqttClient)
{
if(UTILS_StrToIP(mqttClient->host, &mqttClient->pCon->proto.tcp->remote_ip)) {
@ -515,18 +559,22 @@ void MQTT_Connect(MQTT_Client *mqttClient)
}
mqttClient->connState = TCP_CONNECTING;
}
void MQTT_OnConnected(MQTT_Client *mqttClient, MqttCallback connectedCb)
{
mqttClient->connectedCb = connectedCb;
}
void MQTT_OnDisconnected(MQTT_Client *mqttClient, MqttCallback disconnectedCb)
{
mqttClient->disconnectedCb = disconnectedCb;
}
void MQTT_OnData(MQTT_Client *mqttClient, MqttDataCallback dataCb)
{
mqttClient->dataCb = dataCb;
}
void MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb)
{
mqttClient->publishedCb = publishedCb;

Wyświetl plik

@ -58,6 +58,7 @@ typedef struct mqtt_state_t
mqtt_connection_t mqtt_connection;
uint16_t pending_msg_id;
int pending_msg_type;
int pending_publish_qos;
} mqtt_state_t;
typedef enum {

Wyświetl plik

@ -49,10 +49,12 @@ void mqttConnectedCb(uint32_t *args)
{
MQTT_Client* client = (MQTT_Client*)args;
INFO("MQTT: Connected\r\n");
MQTT_Subscribe(client, "/mqtt/topic/1", 0);
MQTT_Subscribe(client, "/mqtt/topic/2", 0);
MQTT_Publish(client, "/mqtt/topic/2", "hello2", 6, 0, 0);
MQTT_Publish(client, "/mqtt/topic/1", "hello1", 6, 0, 0);
MQTT_Subscribe(client, "/mqtt/topic/0", 0);
MQTT_Subscribe(client, "/mqtt/topic/1", 1);
MQTT_Subscribe(client, "/mqtt/topic/2", 2);
MQTT_Publish(client, "/mqtt/topic/0", "hello0", 6, 0, 0);
MQTT_Publish(client, "/mqtt/topic/1", "hello1", 6, 1, 0);
MQTT_Publish(client, "/mqtt/topic/2", "hello2", 6, 2, 0);
}
void mqttDisconnectedCb(uint32_t *args)