kopia lustrzana https://github.com/martin-ger/esp_mqtt
fixes error reconnect, thank @scragill reported
rodzic
d490bb15c6
commit
d810138858
82
mqtt/mqtt.c
82
mqtt/mqtt.c
|
@ -133,31 +133,37 @@ READPACKET:
|
|||
if(len < MQTT_BUF_SIZE && len > 0){
|
||||
os_memcpy(client->mqtt_state.in_buffer, pdata, len);
|
||||
|
||||
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
|
||||
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
|
||||
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
|
||||
switch(client->connState){
|
||||
case MQTT_DATA:
|
||||
client->mqtt_state.message_length_read = len;
|
||||
client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
|
||||
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
|
||||
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
|
||||
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
|
||||
|
||||
switch(msg_type)
|
||||
{
|
||||
case MQTT_MSG_TYPE_CONNACK:
|
||||
if(client->mqtt_state.pending_msg_type != MQTT_MSG_TYPE_CONNECT){
|
||||
INFO("MQTT: Invalid packet\r\n");
|
||||
case MQTT_CONNECT_SENDING:
|
||||
if(msg_type == MQTT_MSG_TYPE_CONNACK){
|
||||
if(client->mqtt_state.pending_msg_type != MQTT_MSG_TYPE_CONNECT){
|
||||
INFO("MQTT: Invalid packet\r\n");
|
||||
if(client->security){
|
||||
espconn_secure_disconnect(client->pCon);
|
||||
}
|
||||
else {
|
||||
espconn_disconnect(client->pCon);
|
||||
}
|
||||
} else {
|
||||
} else {
|
||||
INFO("MQTT: Connected to %s:%d\r\n", client->host, client->port);
|
||||
client->connState = MQTT_DATA;
|
||||
if(client->connectedCb)
|
||||
client->connectedCb((uint32_t*)client);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
break;
|
||||
case MQTT_DATA:
|
||||
client->mqtt_state.message_length_read = len;
|
||||
client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
|
||||
|
||||
|
||||
switch(msg_type)
|
||||
{
|
||||
|
||||
case MQTT_MSG_TYPE_SUBACK:
|
||||
if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
|
||||
INFO("MQTT: Subscribe successful\r\n");
|
||||
|
@ -269,9 +275,21 @@ void ICACHE_FLASH_ATTR mqtt_timer(void *arg)
|
|||
|
||||
INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port);
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
|
||||
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
|
||||
INFO("MQTT: Exceed the amount of queues\r\n");
|
||||
client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ;
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||
|
||||
|
||||
client->sendTimeout = MQTT_SEND_TIMOUT;
|
||||
INFO("MQTT: Sending, type: %d, id: %04X\r\n",client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
|
||||
if(client->security){
|
||||
espconn_secure_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||
}
|
||||
else{
|
||||
espconn_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||
}
|
||||
|
||||
client->mqtt_state.outbound_message = NULL;
|
||||
|
||||
client->keepAliveTick = 0;
|
||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
||||
|
@ -323,11 +341,21 @@ mqtt_tcpclient_connect_cb(void *arg)
|
|||
|
||||
mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length);
|
||||
client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info);
|
||||
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
|
||||
INFO("MQTT: Exceed the amount of queues\r\n");
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
|
||||
client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||
|
||||
|
||||
client->sendTimeout = MQTT_SEND_TIMOUT;
|
||||
INFO("MQTT: Sending, type: %d, id: %04X\r\n",client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
|
||||
if(client->security){
|
||||
espconn_secure_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||
}
|
||||
else{
|
||||
espconn_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
|
||||
}
|
||||
|
||||
client->mqtt_state.outbound_message = NULL;
|
||||
client->connState = MQTT_DATA;
|
||||
client->connState = MQTT_CONNECT_SENDING;
|
||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
||||
}
|
||||
|
||||
|
@ -363,15 +391,16 @@ mqtt_tcpclient_recon_cb(void *arg, sint8 errType)
|
|||
BOOL ICACHE_FLASH_ATTR
|
||||
MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain)
|
||||
{
|
||||
mqtt_message_t* outbound_message;
|
||||
|
||||
|
||||
outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
|
||||
client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
|
||||
topic, data, data_length,
|
||||
qos, retain,
|
||||
&client->mqtt_state.pending_msg_id);
|
||||
INFO("MQTT: queuing publish, length: %d...\r\n", outbound_message->length);
|
||||
if(QUEUE_Puts(&client->msgQueue, outbound_message->data, outbound_message->length) == -1){
|
||||
if(client->mqtt_state.outbound_message->length == 0){
|
||||
INFO("MQTT: Queuing publish failed\r\n");
|
||||
return FALSE;
|
||||
}
|
||||
INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
|
||||
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
|
||||
INFO("MQTT: Exceed the amount of queues\r\n");
|
||||
return FALSE;
|
||||
}
|
||||
|
@ -516,6 +545,7 @@ void MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* clien
|
|||
mqttClient->mqtt_state.connect_info = &mqttClient->connect_info;
|
||||
mqttClient->keepAliveTick = 0;
|
||||
mqttClient->reconnectTick = 0;
|
||||
mqtt_msg_init(&mqttClient->mqtt_state.mqtt_connection, mqttClient->mqtt_state.out_buffer, mqttClient->mqtt_state.out_buffer_length);
|
||||
|
||||
os_timer_disarm(&mqttClient->mqttTimer);
|
||||
os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient);
|
||||
|
|
Ładowanie…
Reference in New Issue