kopia lustrzana https://github.com/martin-ger/esp_mqtt
rodzic
a8c335eb3d
commit
47bb4ad229
45
README.md
45
README.md
|
@ -1,12 +1,12 @@
|
|||
**esp_mqtt**
|
||||
==========
|
||||
This is MQTT client library for ESP8266, port from: [MQTT client library for Contiki](https://github.com/esar/contiki-mqtt)
|
||||
This is MQTT client library for ESP8266, port from: [MQTT client library for Contiki](https://github.com/esar/contiki-mqtt) (thanks)
|
||||
|
||||
**Features:**
|
||||
|
||||
* Support subscribing, publishing, authentication, will messages, keep alive pings and all 3 QoS levels (it should be a fully functional client).
|
||||
* Support multiple connection (to multiple hosts).
|
||||
* **Support SSL connection (max 1024 bit key size)**
|
||||
* Support SSL connection (max 1024 bit key size)
|
||||
* Easy to setup and use
|
||||
|
||||
|
||||
|
@ -22,6 +22,7 @@ This is MQTT client library for ESP8266, port from: [MQTT client library for Con
|
|||
#include "debug.h"
|
||||
#include "gpio.h"
|
||||
#include "user_interface.h"
|
||||
#include "mem.h"
|
||||
|
||||
MQTT_Client mqttClient;
|
||||
|
||||
|
@ -35,10 +36,14 @@ void mqttConnectedCb(uint32_t *args)
|
|||
{
|
||||
MQTT_Client* client = (MQTT_Client*)args;
|
||||
INFO("MQTT: Connected\r\n");
|
||||
MQTT_Subscribe(client, "/mqtt/topic/1", 0);
|
||||
MQTT_Subscribe(client, "/mqtt/topic/2", 0);
|
||||
MQTT_Publish(client, "/mqtt/topic/2", "hello2", 6, 0, 0);
|
||||
MQTT_Publish(client, "/mqtt/topic/1", "hello1", 6, 0, 0);
|
||||
MQTT_Subscribe(client, "/mqtt/topic/0", 0);
|
||||
MQTT_Subscribe(client, "/mqtt/topic/1", 1);
|
||||
MQTT_Subscribe(client, "/mqtt/topic/2", 2);
|
||||
|
||||
MQTT_Publish(client, "/mqtt/topic/0", "hello0", 6, 0, 0);
|
||||
MQTT_Publish(client, "/mqtt/topic/1", "hello1", 6, 1, 0);
|
||||
MQTT_Publish(client, "/mqtt/topic/2", "hello2", 6, 2, 0);
|
||||
|
||||
}
|
||||
|
||||
void mqttDisconnectedCb(uint32_t *args)
|
||||
|
@ -55,7 +60,9 @@ void mqttPublishedCb(uint32_t *args)
|
|||
|
||||
void mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t data_len)
|
||||
{
|
||||
char topicBuf[64], dataBuf[64];
|
||||
char *topicBuf = (char*)os_zalloc(topic_len+1),
|
||||
*dataBuf = (char*)os_zalloc(data_len+1);
|
||||
|
||||
MQTT_Client* client = (MQTT_Client*)args;
|
||||
|
||||
os_memcpy(topicBuf, topic, topic_len);
|
||||
|
@ -64,7 +71,9 @@ void mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const cha
|
|||
os_memcpy(dataBuf, data, data_len);
|
||||
dataBuf[data_len] = 0;
|
||||
|
||||
INFO("MQTT topic: %s, data: %s \r\n", topicBuf, dataBuf);
|
||||
INFO("Receive topic: %s, data: %s \r\n", topicBuf, dataBuf);
|
||||
os_free(topicBuf);
|
||||
os_free(dataBuf);
|
||||
}
|
||||
|
||||
|
||||
|
@ -76,7 +85,12 @@ void user_init(void)
|
|||
CFG_Load();
|
||||
|
||||
MQTT_InitConnection(&mqttClient, sysCfg.mqtt_host, sysCfg.mqtt_port, sysCfg.security);
|
||||
MQTT_InitClient(&mqttClient, sysCfg.device_id, sysCfg.mqtt_user, sysCfg.mqtt_pass, sysCfg.mqtt_keepalive);
|
||||
//MQTT_InitConnection(&mqttClient, "192.168.11.122", 1880, 0);
|
||||
|
||||
MQTT_InitClient(&mqttClient, sysCfg.device_id, sysCfg.mqtt_user, sysCfg.mqtt_pass, sysCfg.mqtt_keepalive, 1);
|
||||
//MQTT_InitClient(&mqttClient, "client_id", "user", "pass", 120, 1);
|
||||
|
||||
MQTT_InitLWT(&mqttClient, "/lwt", "offline", 0, 0);
|
||||
MQTT_OnConnected(&mqttClient, mqttConnectedCb);
|
||||
MQTT_OnDisconnected(&mqttClient, mqttDisconnectedCb);
|
||||
MQTT_OnPublished(&mqttClient, mqttPublishedCb);
|
||||
|
@ -100,15 +114,12 @@ BOOL MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int
|
|||
```
|
||||
|
||||
**Already support LWT: (Last Will and Testament)***
|
||||
Setup in **MQTT_InitClient** file ***mqtt.c***
|
||||
```c
|
||||
char willTopic[] = "/lwt";
|
||||
char willMessage[] = "offline";
|
||||
|
||||
mqttClient->connect_info.will_topic = willTopic;
|
||||
mqttClient->connect_info.will_message = willMessage;
|
||||
mqttClient->connect_info.will_qos = 0;
|
||||
mqttClient->connect_info.will_retain = 0;
|
||||
```c
|
||||
|
||||
/* Broker will publish a message with qos = 0, retain = 0, data = "offline" to topic "/lwt" if client don't send keepalive packet */
|
||||
MQTT_InitLWT(&mqttClient, "/lwt", "offline", 0, 0);
|
||||
|
||||
```
|
||||
|
||||
**Default configuration**
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
#ifndef _USER_CONFIG_H_
|
||||
#define _USER_CONFIG_H_
|
||||
|
||||
#define CFG_HOLDER 0x00FF55A4
|
||||
#define CFG_HOLDER 0x00FF55A4 /* Change this value to load default configurations */
|
||||
#define CFG_LOCATION 0x3C /* Please don't change or if you know what you doing */
|
||||
|
||||
/*DEFAULT CONFIGURATIONS*/
|
||||
|
||||
#define MQTT_HOST "mqtt.yourdomain.com" //or "192.168.11.1"
|
||||
#define MQTT_HOST "192.168.11.122" //or "mqtt.yourdomain.com"
|
||||
#define MQTT_PORT 1880
|
||||
#define MQTT_BUF_SIZE 1024
|
||||
#define MQTT_KEEPALIVE 120 /*second*/
|
||||
|
@ -23,6 +23,7 @@
|
|||
#define MQTT_RECONNECT_TIMEOUT 5 /*second*/
|
||||
|
||||
#define CLIENT_SSL_ENABLE
|
||||
|
||||
#define DEFAULT_SECURITY 0
|
||||
|
||||
#define QUEUE_BUFFER_SIZE 2048
|
||||
|
|
112
user/mqtt.c
112
user/mqtt.c
|
@ -44,6 +44,7 @@
|
|||
|
||||
#define MQTT_TASK_PRIO 0
|
||||
#define MQTT_TASK_QUEUE_SIZE 1
|
||||
#define MQTT_SEND_TIMOUT 5
|
||||
|
||||
#ifndef QUEUE_BUFFER_SIZE
|
||||
#define QUEUE_BUFFER_SIZE 2048
|
||||
|
@ -98,7 +99,6 @@ mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
|
|||
LOCAL void ICACHE_FLASH_ATTR
|
||||
deliver_publish(MQTT_Client* client, uint8_t* message, int length)
|
||||
{
|
||||
INFO("deliver_publish\r\n");
|
||||
mqtt_event_data_t event_data;
|
||||
|
||||
event_data.topic_length = length;
|
||||
|
@ -111,21 +111,6 @@ deliver_publish(MQTT_Client* client, uint8_t* message, int length)
|
|||
|
||||
}
|
||||
|
||||
LOCAL void ICACHE_FLASH_ATTR
|
||||
deliver_publish_continuation(MQTT_Client* client, uint16_t offset, uint8_t* buffer, uint16_t length)
|
||||
{
|
||||
mqtt_event_data_t event_data;
|
||||
|
||||
event_data.type = MQTT_EVENT_TYPE_PUBLISH_CONTINUATION;
|
||||
event_data.topic_length = 0;
|
||||
event_data.topic = NULL;
|
||||
event_data.data_length = length;
|
||||
event_data.data_offset = offset;
|
||||
event_data.data = (char*)buffer;
|
||||
((char*)event_data.data)[event_data.data_length] = '\0';
|
||||
|
||||
//process_post_synch(state->calling_process, mqtt_event, &event_data);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Client received callback function.
|
||||
|
@ -156,6 +141,7 @@ mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
|
|||
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
|
||||
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
|
||||
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
|
||||
INFO("MQTT: type: %d, qos: %d, id: %04X\r\n", msg_type, msg_qos, msg_id);
|
||||
switch(msg_type)
|
||||
{
|
||||
case MQTT_MSG_TYPE_CONNACK:
|
||||
|
@ -175,7 +161,7 @@ mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
|
|||
break;
|
||||
case MQTT_MSG_TYPE_SUBACK:
|
||||
if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
|
||||
INFO("MQTT: Subscribe successful to %s:%d\r\n", client->host, client->port);
|
||||
INFO("MQTT: Subscribe successful\r\n");
|
||||
break;
|
||||
case MQTT_MSG_TYPE_UNSUBACK:
|
||||
if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
|
||||
|
@ -187,6 +173,7 @@ mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
|
|||
else if(msg_qos == 2)
|
||||
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
|
||||
if(msg_qos == 1 || msg_qos == 2){
|
||||
INFO("MQTT: Queue response QoS: %d\r\n", msg_qos);
|
||||
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
|
||||
INFO("MQTT: Exceed the amount of queues\r\n");
|
||||
}
|
||||
|
@ -196,9 +183,7 @@ mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
|
|||
break;
|
||||
case MQTT_MSG_TYPE_PUBACK:
|
||||
if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id){
|
||||
INFO("MQTT: Publish successful qos = 1\r\n");
|
||||
if(client->publishedCb)
|
||||
client->publishedCb((uint32_t*)client);
|
||||
INFO("MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\r\n");
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -216,9 +201,7 @@ mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
|
|||
break;
|
||||
case MQTT_MSG_TYPE_PUBCOMP:
|
||||
if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id){
|
||||
INFO("MQTT: Public successful qos = 2\r\n");
|
||||
if(client->publishedCb)
|
||||
client->publishedCb((uint32_t*)client);
|
||||
INFO("MQTT: receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish\r\n");
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PINGREQ:
|
||||
|
@ -252,17 +235,15 @@ mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len)
|
|||
if(client->mqtt_state.message_length_read < client->mqtt_state.message_length)
|
||||
{
|
||||
//client->connState = MQTT_PUBLISH_RECV;
|
||||
//Not Implement yet
|
||||
INFO("We have more data, read: %d, total: %d\r\n", client->mqtt_state.message_length_read, client->mqtt_state.message_length);
|
||||
}
|
||||
|
||||
}
|
||||
break;
|
||||
case MQTT_PUBLISH_RECV:
|
||||
/*
|
||||
* Long publish message, not implement yet
|
||||
* TODO: Implement method used deliver_publish_continuation
|
||||
*/
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
INFO("ERROR: Too long message\r\n");
|
||||
}
|
||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
||||
}
|
||||
|
@ -278,7 +259,8 @@ mqtt_tcpclient_sent_cb(void *arg)
|
|||
struct espconn *pCon = (struct espconn *)arg;
|
||||
MQTT_Client* client = (MQTT_Client *)pCon->reverse;
|
||||
INFO("TCP: Sent\r\n");
|
||||
if(client->connState == MQTT_DATA && client->mqtt_state.pending_publish_qos == 0){
|
||||
client->sendTimeout = 0;
|
||||
if(client->connState == MQTT_DATA && client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH){
|
||||
if(client->publishedCb)
|
||||
client->publishedCb((uint32_t*)client);
|
||||
}
|
||||
|
@ -311,6 +293,8 @@ void ICACHE_FLASH_ATTR mqtt_timer(void *arg)
|
|||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);
|
||||
}
|
||||
}
|
||||
if(client->sendTimeout > 0)
|
||||
client->sendTimeout --;
|
||||
}
|
||||
|
||||
void ICACHE_FLASH_ATTR
|
||||
|
@ -413,12 +397,12 @@ MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_
|
|||
BOOL ICACHE_FLASH_ATTR
|
||||
MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos)
|
||||
{
|
||||
INFO("MQTT: subscribe, topic\"%s\" at broker %s:%d\r\n", topic, client->host, client->port);
|
||||
|
||||
|
||||
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
|
||||
topic, 0,
|
||||
&client->mqtt_state.pending_msg_id);
|
||||
|
||||
INFO("MQTT: queue subscribe, topic\"%s\", id: %d\r\n",topic, client->mqtt_state.pending_msg_id);
|
||||
if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
|
||||
INFO("MQTT: Exceed the amount of queues\r\n");
|
||||
return FALSE;
|
||||
|
@ -443,18 +427,16 @@ MQTT_Task(os_event_t *e)
|
|||
client->connState = TCP_CONNECTING;
|
||||
break;
|
||||
case MQTT_DATA:
|
||||
|
||||
if(QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0) {
|
||||
break;
|
||||
}
|
||||
if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0){
|
||||
uint8_t type = mqtt_get_type(dataBuffer);
|
||||
client->mqtt_state.pending_msg_type = type;
|
||||
|
||||
if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH)
|
||||
client->mqtt_state.pending_publish_qos = mqtt_get_qos(dataBuffer);
|
||||
else
|
||||
client->mqtt_state.pending_publish_qos = 4;
|
||||
client->mqtt_state.pending_msg_type = mqtt_get_type(dataBuffer);
|
||||
client->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen);
|
||||
|
||||
INFO("MQTT: Sending..type: %d,qos: %d\r\n",client->mqtt_state.pending_msg_type, client->mqtt_state.pending_publish_qos);
|
||||
|
||||
client->sendTimeout = MQTT_SEND_TIMOUT;
|
||||
INFO("MQTT: Sending, type: %d, id: %04X\r\n",client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
|
||||
if(client->security){
|
||||
espconn_secure_sent(client->pCon, dataBuffer, dataLen);
|
||||
}
|
||||
|
@ -463,8 +445,6 @@ MQTT_Task(os_event_t *e)
|
|||
}
|
||||
|
||||
client->mqtt_state.outbound_message = NULL;
|
||||
if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == 0)
|
||||
INFO("MQTT: Publish message is done!\r\n");
|
||||
break;
|
||||
}
|
||||
break;
|
||||
|
@ -481,6 +461,7 @@ MQTT_Task(os_event_t *e)
|
|||
*/
|
||||
void MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32 port, uint8_t security)
|
||||
{
|
||||
uint32_t temp;
|
||||
INFO("MQTT_InitConnection\r\n");
|
||||
os_memset(mqttClient, 0, sizeof(MQTT_Client));
|
||||
|
||||
|
@ -490,7 +471,10 @@ void MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32 port, ui
|
|||
mqttClient->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp));
|
||||
mqttClient->pCon->proto.tcp->local_port = espconn_port();
|
||||
mqttClient->pCon->proto.tcp->remote_port = port;
|
||||
mqttClient->host = host;
|
||||
temp = os_strlen(host);
|
||||
mqttClient->host = (uint8_t*)os_zalloc(temp + 1);
|
||||
os_strcpy(mqttClient->host, host);
|
||||
mqttClient->host[temp] = 0;
|
||||
mqttClient->port = port;
|
||||
mqttClient->security = security;
|
||||
mqttClient->pCon->reverse = mqttClient;
|
||||
|
@ -507,17 +491,31 @@ void MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32 port, ui
|
|||
* @param client_pass:MQTT keep alive timer, in second
|
||||
* @retval None
|
||||
*/
|
||||
void MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime)
|
||||
void MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime, uint8_t cleanSession)
|
||||
{
|
||||
uint32_t temp;
|
||||
INFO("MQTT_InitClient\r\n");
|
||||
|
||||
os_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t));
|
||||
mqttClient->connect_info.client_id = client_id;
|
||||
mqttClient->connect_info.username = client_user;
|
||||
mqttClient->connect_info.password = client_pass;
|
||||
|
||||
temp = os_strlen(client_id);
|
||||
mqttClient->connect_info.client_id = (uint8_t*)os_zalloc(temp + 1);
|
||||
os_strcpy(mqttClient->connect_info.client_id, client_id);
|
||||
mqttClient->connect_info.client_id[temp] = 0;
|
||||
|
||||
temp = os_strlen(client_user);
|
||||
mqttClient->connect_info.username = (uint8_t*)os_zalloc(temp + 1);
|
||||
os_strcpy(mqttClient->connect_info.username, client_user);
|
||||
mqttClient->connect_info.username[temp] = 0;
|
||||
|
||||
temp = os_strlen(client_pass);
|
||||
mqttClient->connect_info.password = (uint8_t*)os_zalloc(temp + 1);
|
||||
os_strcpy(mqttClient->connect_info.password, client_pass);
|
||||
mqttClient->connect_info.password[temp] = 0;
|
||||
|
||||
|
||||
mqttClient->connect_info.keepalive = keepAliveTime;
|
||||
mqttClient->connect_info.clean_session = 1;
|
||||
mqttClient->connect_info.clean_session = cleanSession;
|
||||
|
||||
mqttClient->mqtt_state.in_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);
|
||||
mqttClient->mqtt_state.in_buffer_length = MQTT_BUF_SIZE;
|
||||
|
@ -536,7 +534,23 @@ void MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* clien
|
|||
system_os_task(MQTT_Task, MQTT_TASK_PRIO, mqtt_procTaskQueue, MQTT_TASK_QUEUE_SIZE);
|
||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);
|
||||
}
|
||||
void MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain)
|
||||
{
|
||||
uint32_t temp;
|
||||
temp = os_strlen(will_topic);
|
||||
mqttClient->connect_info.will_topic = (uint8_t*)os_zalloc(temp + 1);
|
||||
os_strcpy(mqttClient->connect_info.will_topic, will_topic);
|
||||
mqttClient->connect_info.will_topic[temp] = 0;
|
||||
|
||||
temp = os_strlen(will_msg);
|
||||
mqttClient->connect_info.will_message = (uint8_t*)os_zalloc(temp + 1);
|
||||
os_strcpy(mqttClient->connect_info.will_message, will_msg);
|
||||
mqttClient->connect_info.will_message[temp] = 0;
|
||||
|
||||
|
||||
mqttClient->connect_info.will_qos = will_qos;
|
||||
mqttClient->connect_info.will_retain = will_retain;
|
||||
}
|
||||
/**
|
||||
* @brief Begin connect to MQTT broker
|
||||
* @param client: MQTT_Client reference
|
||||
|
|
|
@ -100,6 +100,7 @@ typedef struct {
|
|||
ETSTimer mqttTimer;
|
||||
uint32_t keepAliveTick;
|
||||
uint32_t reconnectTick;
|
||||
uint32_t sendTimeout;
|
||||
tConnState connState;
|
||||
QUEUE msgQueue;
|
||||
} MQTT_Client;
|
||||
|
@ -122,7 +123,8 @@ typedef struct {
|
|||
#define MQTT_EVENT_TYPE_PUBLISH_CONTINUATION 8
|
||||
|
||||
void MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32 port, uint8_t security);
|
||||
void MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime);
|
||||
void MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime, uint8_t cleanSession);
|
||||
void MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain);
|
||||
void MQTT_OnConnected(MQTT_Client *mqttClient, MqttCallback connectedCb);
|
||||
void MQTT_OnDisconnected(MQTT_Client *mqttClient, MqttCallback disconnectedCb);
|
||||
void MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb);
|
||||
|
|
|
@ -253,7 +253,9 @@ uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length)
|
|||
{
|
||||
if(i + 2 >= length)
|
||||
return 0;
|
||||
i += 2;
|
||||
//i += 2;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return (buffer[i] << 8) | buffer[i + 1];
|
||||
|
@ -264,6 +266,7 @@ uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length)
|
|||
case MQTT_MSG_TYPE_PUBCOMP:
|
||||
case MQTT_MSG_TYPE_SUBACK:
|
||||
case MQTT_MSG_TYPE_UNSUBACK:
|
||||
case MQTT_MSG_TYPE_SUBSCRIBE:
|
||||
{
|
||||
// This requires the remaining length to be encoded in 1 byte,
|
||||
// which it should be.
|
||||
|
@ -272,6 +275,7 @@ uint16_t mqtt_get_id(uint8_t* buffer, uint16_t length)
|
|||
else
|
||||
return 0;
|
||||
}
|
||||
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -84,11 +84,11 @@ typedef struct mqtt_connection
|
|||
|
||||
typedef struct mqtt_connect_info
|
||||
{
|
||||
const char* client_id;
|
||||
const char* username;
|
||||
const char* password;
|
||||
const char* will_topic;
|
||||
const char* will_message;
|
||||
char* client_id;
|
||||
char* username;
|
||||
char* password;
|
||||
char* will_topic;
|
||||
char* will_message;
|
||||
int keepalive;
|
||||
int will_qos;
|
||||
int will_retain;
|
||||
|
|
|
@ -49,3 +49,9 @@ int32_t QUEUE_Gets(QUEUE *queue, uint8_t* buffer, uint16_t* len, uint16_t maxLen
|
|||
return PROTO_ParseRb(&queue->rb, buffer, len, maxLen);
|
||||
}
|
||||
|
||||
BOOL QUEUE_IsEmpty(QUEUE *queue)
|
||||
{
|
||||
if(queue->rb.fill_cnt<=0)
|
||||
return TRUE;
|
||||
return FALSE;
|
||||
}
|
||||
|
|
|
@ -40,5 +40,5 @@ typedef struct {
|
|||
void QUEUE_Init(QUEUE *queue, int bufferSize);
|
||||
int32_t QUEUE_Puts(QUEUE *queue, uint8_t* buffer, uint16_t len);
|
||||
int32_t QUEUE_Gets(QUEUE *queue, uint8_t* buffer, uint16_t* len, uint16_t maxLen);
|
||||
int32_t QUEUE_IsEmpty(QUEUE *queue);
|
||||
BOOL QUEUE_IsEmpty(QUEUE *queue);
|
||||
#endif /* USER_QUEUE_H_ */
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
#include "debug.h"
|
||||
#include "gpio.h"
|
||||
#include "user_interface.h"
|
||||
#include "mem.h"
|
||||
|
||||
MQTT_Client mqttClient;
|
||||
|
||||
|
@ -52,9 +53,11 @@ void mqttConnectedCb(uint32_t *args)
|
|||
MQTT_Subscribe(client, "/mqtt/topic/0", 0);
|
||||
MQTT_Subscribe(client, "/mqtt/topic/1", 1);
|
||||
MQTT_Subscribe(client, "/mqtt/topic/2", 2);
|
||||
|
||||
MQTT_Publish(client, "/mqtt/topic/0", "hello0", 6, 0, 0);
|
||||
MQTT_Publish(client, "/mqtt/topic/1", "hello1", 6, 1, 0);
|
||||
MQTT_Publish(client, "/mqtt/topic/2", "hello2", 6, 2, 0);
|
||||
|
||||
}
|
||||
|
||||
void mqttDisconnectedCb(uint32_t *args)
|
||||
|
@ -71,7 +74,9 @@ void mqttPublishedCb(uint32_t *args)
|
|||
|
||||
void mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t data_len)
|
||||
{
|
||||
char topicBuf[64], dataBuf[64];
|
||||
char *topicBuf = (char*)os_zalloc(topic_len+1),
|
||||
*dataBuf = (char*)os_zalloc(data_len+1);
|
||||
|
||||
MQTT_Client* client = (MQTT_Client*)args;
|
||||
|
||||
os_memcpy(topicBuf, topic, topic_len);
|
||||
|
@ -80,7 +85,9 @@ void mqttDataCb(uint32_t *args, const char* topic, uint32_t topic_len, const cha
|
|||
os_memcpy(dataBuf, data, data_len);
|
||||
dataBuf[data_len] = 0;
|
||||
|
||||
INFO("MQTT topic: %s, data: %s \r\n", topicBuf, dataBuf);
|
||||
INFO("Receive topic: %s, data: %s \r\n", topicBuf, dataBuf);
|
||||
os_free(topicBuf);
|
||||
os_free(dataBuf);
|
||||
}
|
||||
|
||||
|
||||
|
@ -92,7 +99,12 @@ void user_init(void)
|
|||
CFG_Load();
|
||||
|
||||
MQTT_InitConnection(&mqttClient, sysCfg.mqtt_host, sysCfg.mqtt_port, sysCfg.security);
|
||||
MQTT_InitClient(&mqttClient, sysCfg.device_id, sysCfg.mqtt_user, sysCfg.mqtt_pass, sysCfg.mqtt_keepalive);
|
||||
//MQTT_InitConnection(&mqttClient, "192.168.11.122", 1880, 0);
|
||||
|
||||
MQTT_InitClient(&mqttClient, sysCfg.device_id, sysCfg.mqtt_user, sysCfg.mqtt_pass, sysCfg.mqtt_keepalive, 1);
|
||||
//MQTT_InitClient(&mqttClient, "client_id", "user", "pass", 120, 1);
|
||||
|
||||
MQTT_InitLWT(&mqttClient, "/lwt", "offline", 0, 0);
|
||||
MQTT_OnConnected(&mqttClient, mqttConnectedCb);
|
||||
MQTT_OnDisconnected(&mqttClient, mqttDisconnectedCb);
|
||||
MQTT_OnPublished(&mqttClient, mqttPublishedCb);
|
||||
|
|
Ładowanie…
Reference in New Issue