kopia lustrzana https://github.com/martin-ger/esp_mqtt
commit
1843a1efb4
|
@ -70,6 +70,7 @@ typedef enum {
|
||||||
DNS_RESOLVE,
|
DNS_RESOLVE,
|
||||||
TCP_DISCONNECTING,
|
TCP_DISCONNECTING,
|
||||||
TCP_DISCONNECTED,
|
TCP_DISCONNECTED,
|
||||||
|
TCP_RECONNECT_DISCONNECTING,
|
||||||
TCP_RECONNECT_REQ,
|
TCP_RECONNECT_REQ,
|
||||||
TCP_RECONNECT,
|
TCP_RECONNECT,
|
||||||
TCP_CONNECTING,
|
TCP_CONNECTING,
|
||||||
|
@ -80,6 +81,7 @@ typedef enum {
|
||||||
MQTT_SUBSCIBE_SEND,
|
MQTT_SUBSCIBE_SEND,
|
||||||
MQTT_SUBSCIBE_SENDING,
|
MQTT_SUBSCIBE_SENDING,
|
||||||
MQTT_DATA,
|
MQTT_DATA,
|
||||||
|
MQTT_KEEPALIVE_SEND,
|
||||||
MQTT_PUBLISH_RECV,
|
MQTT_PUBLISH_RECV,
|
||||||
MQTT_PUBLISHING,
|
MQTT_PUBLISHING,
|
||||||
MQTT_DELETING,
|
MQTT_DELETING,
|
||||||
|
|
84
mqtt/mqtt.c
84
mqtt/mqtt.c
|
@ -113,6 +113,42 @@ deliver_publish(MQTT_Client* client, uint8_t* message, int length)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ICACHE_FLASH_ATTR
|
||||||
|
mqtt_send_keepalive(MQTT_Client *client)
|
||||||
|
{
|
||||||
|
INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port);
|
||||||
|
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
|
||||||
|
client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ;
|
||||||
|
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||||
|
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||||
|
|
||||||
|
|
||||||
|
client->sendTimeout = MQTT_SEND_TIMOUT;
|
||||||
|
INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
|
||||||
|
err_t result = ESPCONN_OK;
|
||||||
|
if (client->security) {
|
||||||
|
#ifdef MQTT_SSL_ENABLE
|
||||||
|
result = espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||||
|
#else
|
||||||
|
INFO("TCP: Do not support SSL\r\n");
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
result = espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||||
|
}
|
||||||
|
|
||||||
|
client->mqtt_state.outbound_message = NULL;
|
||||||
|
if(ESPCONN_OK == result) {
|
||||||
|
client->keepAliveTick = 0;
|
||||||
|
client->connState = MQTT_DATA;
|
||||||
|
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
client->connState = TCP_RECONNECT_DISCONNECTING;
|
||||||
|
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Delete tcp client and free all memory
|
* @brief Delete tcp client and free all memory
|
||||||
* @param mqttClient: The mqtt client which contain TCP client
|
* @param mqttClient: The mqtt client which contain TCP client
|
||||||
|
@ -237,6 +273,7 @@ READPACKET:
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case MQTT_DATA:
|
case MQTT_DATA:
|
||||||
|
case MQTT_KEEPALIVE_SEND:
|
||||||
client->mqtt_state.message_length_read = len;
|
client->mqtt_state.message_length_read = len;
|
||||||
client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
|
client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
|
||||||
|
|
||||||
|
@ -338,7 +375,8 @@ mqtt_tcpclient_sent_cb(void *arg)
|
||||||
MQTT_Client* client = (MQTT_Client *)pCon->reverse;
|
MQTT_Client* client = (MQTT_Client *)pCon->reverse;
|
||||||
INFO("TCP: Sent\r\n");
|
INFO("TCP: Sent\r\n");
|
||||||
client->sendTimeout = 0;
|
client->sendTimeout = 0;
|
||||||
if (client->connState == MQTT_DATA && client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH) {
|
if ((client->connState == MQTT_DATA || client->connState == MQTT_KEEPALIVE_SEND)
|
||||||
|
&& client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH) {
|
||||||
if (client->publishedCb)
|
if (client->publishedCb)
|
||||||
client->publishedCb((uint32_t*)client);
|
client->publishedCb((uint32_t*)client);
|
||||||
}
|
}
|
||||||
|
@ -352,37 +390,8 @@ void ICACHE_FLASH_ATTR mqtt_timer(void *arg)
|
||||||
if (client->connState == MQTT_DATA) {
|
if (client->connState == MQTT_DATA) {
|
||||||
client->keepAliveTick ++;
|
client->keepAliveTick ++;
|
||||||
if (client->keepAliveTick > client->mqtt_state.connect_info->keepalive) {
|
if (client->keepAliveTick > client->mqtt_state.connect_info->keepalive) {
|
||||||
|
client->connState = MQTT_KEEPALIVE_SEND;
|
||||||
INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port);
|
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
||||||
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
|
|
||||||
client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ;
|
|
||||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
|
||||||
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
|
||||||
|
|
||||||
|
|
||||||
client->sendTimeout = MQTT_SEND_TIMOUT;
|
|
||||||
INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
|
|
||||||
err_t result = ESPCONN_OK;
|
|
||||||
if (client->security) {
|
|
||||||
#ifdef MQTT_SSL_ENABLE
|
|
||||||
result = espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
|
||||||
#else
|
|
||||||
INFO("TCP: Do not support SSL\r\n");
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
result = espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
|
||||||
}
|
|
||||||
|
|
||||||
client->mqtt_state.outbound_message = NULL;
|
|
||||||
|
|
||||||
if(ESPCONN_OK == result) {
|
|
||||||
client->keepAliveTick = 0;
|
|
||||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
client->connState = TCP_RECONNECT_REQ;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if (client->connState == TCP_RECONNECT_REQ) {
|
} else if (client->connState == TCP_RECONNECT_REQ) {
|
||||||
|
@ -585,12 +594,14 @@ MQTT_Task(os_event_t *e)
|
||||||
case TCP_RECONNECT_REQ:
|
case TCP_RECONNECT_REQ:
|
||||||
break;
|
break;
|
||||||
case TCP_RECONNECT:
|
case TCP_RECONNECT:
|
||||||
|
mqtt_tcpclient_delete(client);
|
||||||
MQTT_Connect(client);
|
MQTT_Connect(client);
|
||||||
INFO("TCP: Reconnect to: %s:%d\r\n", client->host, client->port);
|
INFO("TCP: Reconnect to: %s:%d\r\n", client->host, client->port);
|
||||||
client->connState = TCP_CONNECTING;
|
client->connState = TCP_CONNECTING;
|
||||||
break;
|
break;
|
||||||
case MQTT_DELETING:
|
case MQTT_DELETING:
|
||||||
case TCP_DISCONNECTING:
|
case TCP_DISCONNECTING:
|
||||||
|
case TCP_RECONNECT_DISCONNECTING:
|
||||||
if (client->security) {
|
if (client->security) {
|
||||||
#ifdef MQTT_SSL_ENABLE
|
#ifdef MQTT_SSL_ENABLE
|
||||||
espconn_secure_connect(client->pCon);
|
espconn_secure_connect(client->pCon);
|
||||||
|
@ -610,6 +621,9 @@ MQTT_Task(os_event_t *e)
|
||||||
INFO("MQTT: Deleted client\r\n");
|
INFO("MQTT: Deleted client\r\n");
|
||||||
mqtt_client_delete(client);
|
mqtt_client_delete(client);
|
||||||
break;
|
break;
|
||||||
|
case MQTT_KEEPALIVE_SEND:
|
||||||
|
mqtt_send_keepalive(client);
|
||||||
|
break;
|
||||||
case MQTT_DATA:
|
case MQTT_DATA:
|
||||||
if (QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0) {
|
if (QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0) {
|
||||||
break;
|
break;
|
||||||
|
@ -737,7 +751,11 @@ MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, ui
|
||||||
void ICACHE_FLASH_ATTR
|
void ICACHE_FLASH_ATTR
|
||||||
MQTT_Connect(MQTT_Client *mqttClient)
|
MQTT_Connect(MQTT_Client *mqttClient)
|
||||||
{
|
{
|
||||||
MQTT_Disconnect(mqttClient);
|
// Do not connect if this client is already connected otherwise the
|
||||||
|
// two espconn connections may interfere causing unexpected behaviour.
|
||||||
|
if (mqttClient->pCon) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));
|
mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));
|
||||||
mqttClient->pCon->type = ESPCONN_TCP;
|
mqttClient->pCon->type = ESPCONN_TCP;
|
||||||
mqttClient->pCon->state = ESPCONN_NONE;
|
mqttClient->pCon->state = ESPCONN_NONE;
|
||||||
|
|
Ładowanie…
Reference in New Issue