kopia lustrzana https://github.com/martin-ger/esp_mqtt
commit
fb0a26b3ba
|
@ -70,6 +70,7 @@ typedef enum {
|
|||
DNS_RESOLVE,
|
||||
TCP_DISCONNECTING,
|
||||
TCP_DISCONNECTED,
|
||||
TCP_RECONNECT_DISCONNECTING,
|
||||
TCP_RECONNECT_REQ,
|
||||
TCP_RECONNECT,
|
||||
TCP_CONNECTING,
|
||||
|
@ -80,6 +81,7 @@ typedef enum {
|
|||
MQTT_SUBSCIBE_SEND,
|
||||
MQTT_SUBSCIBE_SENDING,
|
||||
MQTT_DATA,
|
||||
MQTT_KEEPALIVE_SEND,
|
||||
MQTT_PUBLISH_RECV,
|
||||
MQTT_PUBLISHING,
|
||||
MQTT_DELETING,
|
||||
|
|
84
mqtt/mqtt.c
84
mqtt/mqtt.c
|
@ -113,6 +113,42 @@ deliver_publish(MQTT_Client* client, uint8_t* message, int length)
|
|||
|
||||
}
|
||||
|
||||
void ICACHE_FLASH_ATTR
|
||||
mqtt_send_keepalive(MQTT_Client *client)
|
||||
{
|
||||
INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port);
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
|
||||
client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ;
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||
|
||||
|
||||
client->sendTimeout = MQTT_SEND_TIMOUT;
|
||||
INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
|
||||
err_t result = ESPCONN_OK;
|
||||
if (client->security) {
|
||||
#ifdef MQTT_SSL_ENABLE
|
||||
result = espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||
#else
|
||||
INFO("TCP: Do not support SSL\r\n");
|
||||
#endif
|
||||
}
|
||||
else {
|
||||
result = espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||
}
|
||||
|
||||
client->mqtt_state.outbound_message = NULL;
|
||||
if(ESPCONN_OK == result) {
|
||||
client->keepAliveTick = 0;
|
||||
client->connState = MQTT_DATA;
|
||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
||||
}
|
||||
else {
|
||||
client->connState = TCP_RECONNECT_DISCONNECTING;
|
||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Delete tcp client and free all memory
|
||||
* @param mqttClient: The mqtt client which contain TCP client
|
||||
|
@ -237,6 +273,7 @@ READPACKET:
|
|||
}
|
||||
break;
|
||||
case MQTT_DATA:
|
||||
case MQTT_KEEPALIVE_SEND:
|
||||
client->mqtt_state.message_length_read = len;
|
||||
client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
|
||||
|
||||
|
@ -338,7 +375,8 @@ mqtt_tcpclient_sent_cb(void *arg)
|
|||
MQTT_Client* client = (MQTT_Client *)pCon->reverse;
|
||||
INFO("TCP: Sent\r\n");
|
||||
client->sendTimeout = 0;
|
||||
if (client->connState == MQTT_DATA && client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH) {
|
||||
if ((client->connState == MQTT_DATA || client->connState == MQTT_KEEPALIVE_SEND)
|
||||
&& client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH) {
|
||||
if (client->publishedCb)
|
||||
client->publishedCb((uint32_t*)client);
|
||||
}
|
||||
|
@ -352,37 +390,8 @@ void ICACHE_FLASH_ATTR mqtt_timer(void *arg)
|
|||
if (client->connState == MQTT_DATA) {
|
||||
client->keepAliveTick ++;
|
||||
if (client->keepAliveTick > client->mqtt_state.connect_info->keepalive) {
|
||||
|
||||
INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port);
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
|
||||
client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ;
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||
|
||||
|
||||
client->sendTimeout = MQTT_SEND_TIMOUT;
|
||||
INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
|
||||
err_t result = ESPCONN_OK;
|
||||
if (client->security) {
|
||||
#ifdef MQTT_SSL_ENABLE
|
||||
result = espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||
#else
|
||||
INFO("TCP: Do not support SSL\r\n");
|
||||
#endif
|
||||
}
|
||||
else {
|
||||
result = espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||
}
|
||||
|
||||
client->mqtt_state.outbound_message = NULL;
|
||||
|
||||
if(ESPCONN_OK == result) {
|
||||
client->keepAliveTick = 0;
|
||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
||||
}
|
||||
else {
|
||||
client->connState = TCP_RECONNECT_REQ;
|
||||
}
|
||||
client->connState = MQTT_KEEPALIVE_SEND;
|
||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
||||
}
|
||||
|
||||
} else if (client->connState == TCP_RECONNECT_REQ) {
|
||||
|
@ -585,12 +594,14 @@ MQTT_Task(os_event_t *e)
|
|||
case TCP_RECONNECT_REQ:
|
||||
break;
|
||||
case TCP_RECONNECT:
|
||||
mqtt_tcpclient_delete(client);
|
||||
MQTT_Connect(client);
|
||||
INFO("TCP: Reconnect to: %s:%d\r\n", client->host, client->port);
|
||||
client->connState = TCP_CONNECTING;
|
||||
break;
|
||||
case MQTT_DELETING:
|
||||
case TCP_DISCONNECTING:
|
||||
case TCP_RECONNECT_DISCONNECTING:
|
||||
if (client->security) {
|
||||
#ifdef MQTT_SSL_ENABLE
|
||||
espconn_secure_connect(client->pCon);
|
||||
|
@ -610,6 +621,9 @@ MQTT_Task(os_event_t *e)
|
|||
INFO("MQTT: Deleted client\r\n");
|
||||
mqtt_client_delete(client);
|
||||
break;
|
||||
case MQTT_KEEPALIVE_SEND:
|
||||
mqtt_send_keepalive(client);
|
||||
break;
|
||||
case MQTT_DATA:
|
||||
if (QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0) {
|
||||
break;
|
||||
|
@ -737,7 +751,11 @@ MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, ui
|
|||
void ICACHE_FLASH_ATTR
|
||||
MQTT_Connect(MQTT_Client *mqttClient)
|
||||
{
|
||||
MQTT_Disconnect(mqttClient);
|
||||
// Do not connect if this client is already connected otherwise the
|
||||
// two espconn connections may interfere causing unexpected behaviour.
|
||||
if (mqttClient->pCon) {
|
||||
return;
|
||||
}
|
||||
mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));
|
||||
mqttClient->pCon->type = ESPCONN_TCP;
|
||||
mqttClient->pCon->state = ESPCONN_NONE;
|
||||
|
|
Ładowanie…
Reference in New Issue