moved uMQTTBroker lib to a separate repository

master
Martin Ger 2017-11-14 18:54:49 +01:00
rodzic 15945718fb
commit 373fdca6a2
38 zmienionych plików z 19 dodań i 4147 usunięć

3
.gitmodules vendored 100644
Wyświetl plik

@ -0,0 +1,3 @@
[submodule "uMQTTBroker"]
path = uMQTTBroker
url = https://github.com/martin-ger/uMQTTBroker

Wyświetl plik

@ -1,40 +0,0 @@
#ifndef _MQTT_SERVER_H_
#define _MQTT_SERVER_H_
#include "user_interface.h"
extern "C" {
// Interface for starting the broker
bool MQTT_server_start(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics);
// Callbacks for message reception, username/password authentication, and client connection
typedef void (*MqttDataCallback)(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t lengh);
typedef bool (*MqttAuthCallback)(const char* username, const char *password, struct espconn *pesp_conn);
typedef bool (*MqttConnectCallback)(struct espconn *pesp_conn, uint16_t client_count);
void MQTT_server_onData(MqttDataCallback dataCb);
void MQTT_server_onAuth(MqttAuthCallback authCb);
void MQTT_server_onConnect(MqttConnectCallback connectCb);
// Interface for local pub/sub interaction with the broker
bool MQTT_local_publish(uint8_t* topic, uint8_t* data, uint16_t data_length, uint8_t qos, uint8_t retain);
bool MQTT_local_subscribe(uint8_t* topic, uint8_t qos);
bool MQTT_local_unsubscribe(uint8_t* topic);
// Interface to cleanup after STA disconnect
void MQTT_server_cleanupClientCons();
// Interface for persistence of retained topics
// Topics can be serialized to a buffer and reinitialized later after reboot
// Application is responsible for saving and restoring that buffer (i.e. to/from flash)
void clear_retainedtopics();
int serialize_retainedtopics(char *buf, int len);
bool deserialize_retainedtopics(char *buf, int len);
}
#endif /* _MQTT_SERVER_H_ */

Wyświetl plik

@ -1,91 +0,0 @@
/*
* esp_uMQTT_broker demo for Arduino
*
* The program starts a broker, subscribes to anything and publishs a topic every second.
* Try to connect from a remote client and publish something - the console will show this as well.
*/
#include <ESP8266WiFi.h>
#include "mqtt_server.h"
/*
* Your WiFi config here
*/
char ssid[] = "MySSID"; // your network SSID (name)
char pass[] = "MyPassword"; // your network password
unsigned int mqttPort = 1883; // the standard MQTT broker port
unsigned int max_subscriptions = 30;
unsigned int max_retained_topics = 30;
void data_callback(uint32_t *client /* we can ignore this */, const char* topic, uint32_t topic_len, const char *data, uint32_t lengh) {
char topic_str[topic_len+1];
os_memcpy(topic_str, topic, topic_len);
topic_str[topic_len] = '\0';
char data_str[lengh+1];
os_memcpy(data_str, data, lengh);
data_str[lengh] = '\0';
Serial.print("received topic '");
Serial.print(topic_str);
Serial.print("' with data '");
Serial.print(data_str);
Serial.println("'");
}
void setup()
{
Serial.begin(115200);
Serial.println();
Serial.println();
// We start by connecting to a WiFi network
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.begin(ssid, pass);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
/*
* Register the callback
*/
MQTT_server_onData(data_callback);
/*
* Start the broker
*/
Serial.println("Starting MQTT broker");
MQTT_server_start(mqttPort, max_subscriptions, max_retained_topics);
/*
* Subscribe to anything
*/
MQTT_local_subscribe((unsigned char *)"#", 0);
}
int counter = 0;
void loop()
{
String myData(counter++);
/*
* Publish the counter value as String
*/
MQTT_local_publish((unsigned char *)"/MyBroker/count", (unsigned char *)myData.c_str(), myData.length(), 0, 0);
// wait a second
delay(1000);
}

Wyświetl plik

@ -12,7 +12,7 @@
# - 2014-11-23: Updated for SDK 0.9.3
# - 2014-12-25: Replaced esptool by esptool.py
BUILD_AREA = /home/martin/github
BUILD_AREA = /home/mfg/github
# Output directors to store intermediate compiled files
# relative to the project directory
@ -33,7 +33,7 @@ ESPPORT ?= /dev/ttyUSB0
TARGET = app
# which modules (subdirectories) of the project to include in compiling
MODULES = driver user mqtt ntp easygpio pwm httpclient adc
MODULES = driver user uMQTTBroker/src ntp easygpio pwm httpclient adc
#EXTRA_INCDIR = $(BUILD_AREA)/esp-open-sdk/esp-open-lwip/include include
EXTRA_INCDIR = include
@ -141,4 +141,3 @@ clean:
$(Q) rm -rf $(FW_BASE) $(BUILD_BASE)
$(foreach bdir,$(BUILD_DIR),$(eval $(call compile-objects,$(bdir))))

Wyświetl plik

@ -36,12 +36,12 @@ TARGET ?= esp_mqtt
TARGET_LIB ?= libmqtt.a
# which modules (subdirectories) of the project to include in compiling
USER_MODULES = user driver mqtt modules
USER_MODULES = user driver uMQTTBroker/src modules
USER_INC = include
USER_LIB =
# which modules (subdirectories) of the project to include when compiling as library
LIB_MODULES = mqtt
LIB_MODULES = uMQTTBroker/src
SDK_LIBDIR = lib
SDK_LIBS = c gcc phy pp net80211 wpa main lwip

Wyświetl plik

@ -209,23 +209,8 @@ The broker does not yet support:
- TLS
"
# Using the esp_uMQTT_broker in an Arduino project
There is a quick-and-dirty hack to add the pure broker functionality (not the CLI and the scripting) to any ESP Arduino project:
You can use the pure broker functionality (not the CLI and the scripting) in any ESP Arduino project by using https://github.com/martin-ger/uMQTTBroker . Just clone (or download the zip-file and extract it) into the libraries directory of your Arduino ESP8266 installation.
- Go to the install directory of the ESP8266 support package (something like: "[yourArduinoDir]/hardware/esp8266com/esp8266")
- Look for the file "platform.txt"
- Search for the line with "compiler.c.elf.libs"
- Add "-lmqtt" to the libs. Now it should look like:
```
compiler.c.elf.libs=-lm -lgcc -lhal -lphy -lpp -lnet80211 -lwpa -lcrypto -lmain -lwps -laxtls -lsmartconfig -lmesh -lwpa2 -lmqtt {build.lwip_lib} -lstdc++
```
- From this directory go to "cd tools/sdk/lib".
- Copy "libmqtt.a" from the "firmware" directory of this repository into that location (where the other C-libs of the SDK are).
- From this directory go to "cd ../include".
- Copy "mqtt_server.h" from the "Arduino" directory of this repository into that location (where the other include files of the SDK are).
- Now you can use it in your sketch. Just include
```c
#include "mqtt_server.h"
```
Now you can use the API as described in the next subsection.
Sample: in the Arduino setup() initialize the WiFi connection (client or SoftAP, whatever you need) and somewhere at the end add these line:
@ -235,7 +220,7 @@ MQTT_server_start(1883, 30, 30);
The MQTT server will now run in the background and you can connect with any MQTT client. Your Arduino project might do other application logic in its loop.
You can find a sample sketch in the "Arduino" directory.
You can find a sample sketch in the examples.
# Using the Source Code
The complete broker functionality is included in the mqtt directory and can be integrated into any NONOS SDK (or ESP Arduino) program ("make -f Makefile.orig lib" will build the mqtt code as a C library). You can find a minimal demo in the directory "user_basic". Rename it to "user", adapt "user_config.h", and do the "make" to build a small demo that just starts an MQTT broker without any additional logic.

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Wyświetl plik

@ -1,2 +1,2 @@
2e1def6f5b226e294097069debab364588066cf6 0x00000.bin
d58a90297c78ac2106abb8ea7f0864927b75727a 0x10000.bin
3fa016262b70ebe0dd23d7e99c34df7d7e6ae5b7 0x00000.bin
bfe86bb5e16d735add5eda99d77c90598d8b59e3 0x10000.bin

Wyświetl plik

@ -1,45 +0,0 @@
#############################################################
# Required variables for each makefile
# Discard this section from all parent makefiles
# Expected variables (with automatic defaults):
# CSRCS (all "C" files in the dir)
# SUBDIRS (all subdirs with a Makefile)
# GEN_LIBS - list of libs to be generated ()
# GEN_IMAGES - list of images to be generated ()
# COMPONENTS_xxx - a list of libs/objs in the form
# subdir/lib to be extracted and rolled up into
# a generated lib/image xxx.a ()
#
ifndef PDIR
GEN_LIBS = libmqtt.a
endif
#############################################################
# Configuration i.e. compile options etc.
# Target specific stuff (defines etc.) goes in here!
# Generally values applying to a tree are captured in the
# makefile at its root level - these are then overridden
# for a subtree within the makefile rooted therein
#
#DEFINES +=
#############################################################
# Recursion Magic - Don't touch this!!
#
# Each subtree potentially has an include directory
# corresponding to the common APIs applicable to modules
# rooted at that subtree. Accordingly, the INCLUDE PATH
# of a module can only contain the include directories up
# its parent path, and not its siblings
#
# Required for each makefile to inherit from the parent
#
INCLUDES := $(INCLUDES) -I $(PDIR)include
INCLUDES += -I ./
PDIR := ../$(PDIR)
sinclude $(PDIR)Makefile

Wyświetl plik

@ -1,18 +0,0 @@
/*
* debug.h
*
* Created on: Dec 4, 2014
* Author: Minh
*/
#ifndef USER_DEBUG_H_
#define USER_DEBUG_H_
#if defined(MQTT_DEBUG_ON)
#define MQTT_INFO( format, ... ) os_printf( format, ## __VA_ARGS__ )
#else
#define MQTT_INFO( format, ... )
#endif
#endif /* USER_DEBUG_H_ */

Wyświetl plik

@ -1,149 +0,0 @@
/* mqtt.h
*
* Copyright (c) 2014-2015, Tuan PM <tuanpm at live dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef USER_AT_MQTT_H_
#define USER_AT_MQTT_H_
#include "user_config.h"
#include "mqtt_msg.h"
#include "user_interface.h"
#include "queue.h"
typedef struct mqtt_event_data_t
{
uint8_t type;
const char* topic;
const char* data;
uint16_t topic_length;
uint16_t data_length;
uint16_t data_offset;
} mqtt_event_data_t;
typedef struct mqtt_state_t
{
uint16_t port;
int auto_reconnect;
mqtt_connect_info_t* connect_info;
uint8_t* in_buffer;
uint8_t* out_buffer;
int in_buffer_length;
int out_buffer_length;
uint16_t message_length;
uint16_t message_length_read;
mqtt_message_t* outbound_message;
mqtt_connection_t mqtt_connection;
uint16_t pending_msg_id;
int pending_msg_type;
int pending_publish_qos;
} mqtt_state_t;
typedef enum {
WIFI_INIT,
WIFI_CONNECTING,
WIFI_CONNECTING_ERROR,
WIFI_CONNECTED,
DNS_RESOLVE,
TCP_DISCONNECTING,
TCP_DISCONNECTED,
TCP_DISCONNECT,
TCP_RECONNECT_DISCONNECTING,
TCP_RECONNECT_REQ,
TCP_RECONNECT,
TCP_CONNECTING,
TCP_CONNECTING_ERROR,
TCP_CONNECTED,
MQTT_CONNECT_SEND,
MQTT_CONNECT_SENDING,
MQTT_SUBSCIBE_SEND,
MQTT_SUBSCIBE_SENDING,
MQTT_DATA,
MQTT_KEEPALIVE_SEND,
MQTT_PUBLISH_RECV,
MQTT_PUBLISHING,
MQTT_DELETING,
MQTT_DELETED,
} tConnState;
typedef void (*MqttCallback)(uint32_t *args);
typedef void (*MqttDataCallback)(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t lengh);
typedef struct {
struct espconn *pCon;
uint8_t security;
uint8_t* host;
uint32_t port;
ip_addr_t ip;
mqtt_state_t mqtt_state;
mqtt_connect_info_t connect_info;
MqttCallback connectedCb;
MqttCallback disconnectedCb;
MqttCallback publishedCb;
MqttCallback timeoutCb;
MqttDataCallback dataCb;
ETSTimer mqttTimer;
uint32_t keepAliveTick;
uint32_t reconnectTick;
uint32_t sendTimeout;
tConnState connState;
QUEUE msgQueue;
void* user_data;
} MQTT_Client;
#define SEC_NONSSL 0
#define SEC_SSL 1
#define MQTT_FLAG_CONNECTED 1
#define MQTT_FLAG_READY 2
#define MQTT_FLAG_EXIT 4
#define MQTT_EVENT_TYPE_NONE 0
#define MQTT_EVENT_TYPE_CONNECTED 1
#define MQTT_EVENT_TYPE_DISCONNECTED 2
#define MQTT_EVENT_TYPE_SUBSCRIBED 3
#define MQTT_EVENT_TYPE_UNSUBSCRIBED 4
#define MQTT_EVENT_TYPE_PUBLISH 5
#define MQTT_EVENT_TYPE_PUBLISHED 6
#define MQTT_EVENT_TYPE_EXITED 7
#define MQTT_EVENT_TYPE_PUBLISH_CONTINUATION 8
void ICACHE_FLASH_ATTR MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32_t port, uint8_t security);
BOOL ICACHE_FLASH_ATTR MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime, uint8_t cleanSession);
void ICACHE_FLASH_ATTR MQTT_DeleteClient(MQTT_Client *mqttClient);
void ICACHE_FLASH_ATTR MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain);
void ICACHE_FLASH_ATTR MQTT_OnConnected(MQTT_Client *mqttClient, MqttCallback connectedCb);
void ICACHE_FLASH_ATTR MQTT_OnDisconnected(MQTT_Client *mqttClient, MqttCallback disconnectedCb);
void ICACHE_FLASH_ATTR MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb);
void ICACHE_FLASH_ATTR MQTT_OnTimeout(MQTT_Client *mqttClient, MqttCallback timeoutCb);
void ICACHE_FLASH_ATTR MQTT_OnData(MQTT_Client *mqttClient, MqttDataCallback dataCb);
BOOL ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos);
BOOL ICACHE_FLASH_ATTR MQTT_UnSubscribe(MQTT_Client *client, char* topic);
void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client *mqttClient);
void ICACHE_FLASH_ATTR MQTT_Disconnect(MQTT_Client *mqttClient);
BOOL ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain);
#endif /* USER_AT_MQTT_H_ */

Wyświetl plik

@ -1,194 +0,0 @@
/*
* File: mqtt_msg.h
* Author: Minh Tuan
*
* Created on July 12, 2014, 1:05 PM
*/
#ifndef MQTT_MSG_H
#define MQTT_MSG_H
#include "user_config.h"
#include "c_types.h"
#ifdef __cplusplus
extern "C" {
#endif
/*
* Copyright (c) 2014, Stephen Robinson
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*/
/* 7 6 5 4 3 2 1 0*/
/*| --- Message Type---- | DUP Flag | QoS Level | Retain |
/* Remaining Length */
enum mqtt_message_type
{
MQTT_MSG_TYPE_CONNECT = 1,
MQTT_MSG_TYPE_CONNACK = 2,
MQTT_MSG_TYPE_PUBLISH = 3,
MQTT_MSG_TYPE_PUBACK = 4,
MQTT_MSG_TYPE_PUBREC = 5,
MQTT_MSG_TYPE_PUBREL = 6,
MQTT_MSG_TYPE_PUBCOMP = 7,
MQTT_MSG_TYPE_SUBSCRIBE = 8,
MQTT_MSG_TYPE_SUBACK = 9,
MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
MQTT_MSG_TYPE_UNSUBACK = 11,
MQTT_MSG_TYPE_PINGREQ = 12,
MQTT_MSG_TYPE_PINGRESP = 13,
MQTT_MSG_TYPE_DISCONNECT = 14
};
enum mqtt_connect_return_code
{
CONNECTION_ACCEPTED = 0,
CONNECTION_REFUSE_PROTOCOL,
CONNECTION_REFUSE_ID_REJECTED,
CONNECTION_REFUSE_SERVER_UNAVAILABLE,
CONNECTION_REFUSE_BAD_USERNAME,
CONNECTION_REFUSE_NOT_AUTHORIZED
};
typedef struct mqtt_message
{
uint8_t* data;
uint16_t length;
} mqtt_message_t;
typedef struct mqtt_connection
{
mqtt_message_t message;
uint16_t message_id;
uint8_t* buffer;
uint16_t buffer_length;
} mqtt_connection_t;
typedef struct mqtt_connect_info
{
char* client_id;
char* username;
char* password;
char* will_topic;
char* will_data;
uint16_t will_data_len;
uint32_t keepalive;
int will_qos;
int will_retain;
int clean_session;
} mqtt_connect_info_t;
#define MQTT_MAX_FIXED_HEADER_SIZE 3
enum mqtt_connect_flag
{
MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
MQTT_CONNECT_FLAG_WILL = 1 << 2,
MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
};
struct __attribute((__packed__)) mqtt_connect_variable_header
{
uint8_t lengthMsb;
uint8_t lengthLsb;
#if defined(PROTOCOL_NAMEv31)
uint8_t magic[6];
#elif defined(PROTOCOL_NAMEv311)
uint8_t magic[4];
#else
#error "Please define protocol name"
#endif
uint8_t version;
uint8_t flags;
uint8_t keepaliveMsb;
uint8_t keepaliveLsb;
};
struct __attribute((__packed__)) mqtt_connect_variable_header3
{
uint8_t lengthMsb;
uint8_t lengthLsb;
uint8_t magic[6];
uint8_t version;
uint8_t flags;
uint8_t keepaliveMsb;
uint8_t keepaliveLsb;
};
struct __attribute((__packed__)) mqtt_connect_variable_header4
{
uint8_t lengthMsb;
uint8_t lengthLsb;
uint8_t magic[4];
uint8_t version;
uint8_t flags;
uint8_t keepaliveMsb;
uint8_t keepaliveLsb;
};
static inline int ICACHE_FLASH_ATTR mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> 4; }
static inline int ICACHE_FLASH_ATTR mqtt_get_connect_return_code(uint8_t* buffer) { return buffer[3]; }
static inline int ICACHE_FLASH_ATTR mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; }
static inline int ICACHE_FLASH_ATTR mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; }
static inline int ICACHE_FLASH_ATTR mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }
void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
int ICACHE_FLASH_ATTR mqtt_get_total_length(uint8_t* buffer, uint16_t length);
char* ICACHE_FLASH_ATTR mqtt_get_str(uint8_t* buffer, uint16_t* length);
const char* ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length);
const char* ICACHE_FLASH_ATTR mqtt_get_publish_data(uint8_t* buffer, uint16_t* length);
uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t* buffer, uint16_t length);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connack(mqtt_connection_t* connection, enum mqtt_connect_return_code retcode);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_suback(mqtt_connection_t* connection, uint8_t *ret_codes, uint8_t ret_codes_len, uint16_t message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_unsuback(mqtt_connection_t* connection, uint16_t message_id);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t* connection);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingresp(mqtt_connection_t* connection);
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_disconnect(mqtt_connection_t* connection);
#ifdef __cplusplus
}
#endif
#endif /* MQTT_MSG_H */

Wyświetl plik

@ -1,28 +0,0 @@
#ifndef _MQTT_RETAINEDLIST_H_
#define _MQTT_RETAINEDLIST_H_
#include "mqtt_server.h"
typedef struct _retained_entry {
uint8_t *topic;
uint8_t *data;
uint16_t data_len;
uint8_t qos;
} retained_entry;
typedef bool (*iterate_retainedtopic_cb)(retained_entry *topic, void *user_data);
typedef bool (*find_retainedtopic_cb)(retained_entry *topic, void *user_data);
typedef void (*on_retainedtopic_cb)(retained_entry *topic);
bool create_retainedlist(uint16_t num_entires);
void clear_retainedtopics();
bool update_retainedtopic(uint8_t *topic, uint8_t *data, uint16_t data_len, uint8_t qos);
bool find_retainedtopic(uint8_t *topic, find_retainedtopic_cb cb, void *user_data);
void iterate_retainedtopics(iterate_retainedtopic_cb cb, void *user_data);
int serialize_retainedtopics(char *buf, int len);
bool deserialize_retainedtopics(char *buf, int len);
void set_on_retainedtopic_cb(on_retainedtopic_cb cb);
#endif /* _MQTT_RETAINEDLIST_H_ */

Wyświetl plik

@ -1,57 +0,0 @@
#ifndef _MQTT_SERVER_H_
#define _MQTT_SERVER_H_
#include "c_types.h"
#include "osapi.h"
#include "os_type.h"
#include "ip_addr.h"
#include "espconn.h"
//#include "lwip/ip.h"
//#include "lwip/app/espconn.h"
//#include "lwip/app/espconn_tcp.h"
#include "mqtt.h"
#define LOCAL_MQTT_CLIENT ((void*)-1)
typedef bool (*MqttAuthCallback)(const char* username, const char *password, struct espconn *pesp_conn);
typedef bool (*MqttConnectCallback)(struct espconn *pesp_conn, uint16_t client_count);
typedef struct _MQTT_ClientCon {
struct espconn *pCon;
// uint8_t security;
// uint32_t port;
// ip_addr_t ip;
mqtt_state_t mqtt_state;
mqtt_connect_info_t connect_info;
// MqttCallback connectedCb;
// MqttCallback disconnectedCb;
// MqttCallback publishedCb;
// MqttCallback timeoutCb;
// MqttDataCallback dataCb;
ETSTimer mqttTimer;
uint32_t sendTimeout;
tConnState connState;
QUEUE msgQueue;
uint8_t protocolVersion;
void* user_data;
struct _MQTT_ClientCon *next;
} MQTT_ClientCon;
extern MQTT_ClientCon *clientcon_list;
uint16_t MQTT_server_countClientCon();
void MQTT_server_disconnectClientCon(MQTT_ClientCon *mqttClientCon);
bool MQTT_server_deleteClientCon(MQTT_ClientCon *mqttClientCon);
void MQTT_server_cleanupClientCons();
bool MQTT_server_start(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics);
void MQTT_server_onConnect(MqttConnectCallback connectCb);
void MQTT_server_onAuth(MqttAuthCallback authCb);
void MQTT_server_onData(MqttDataCallback dataCb);
bool MQTT_local_publish(uint8_t* topic, uint8_t* data, uint16_t data_length, uint8_t qos, uint8_t retain);
bool MQTT_local_subscribe(uint8_t* topic, uint8_t qos);
bool MQTT_local_unsubscribe(uint8_t* topic);
#endif /* _MQTT_SERVER_H_ */

Wyświetl plik

@ -1,21 +0,0 @@
#ifndef _MQTT_TOPICLIST_H_
#define _MQTT_TOPICLIST_H_
#include "mqtt_server.h"
typedef struct _topic_entry {
MQTT_ClientCon *clientcon;
uint8_t *topic;
uint8_t qos;
} topic_entry;
typedef bool (*iterate_topic_cb)(topic_entry *topic, void *user_data);
typedef bool (*find_topic_cb)(topic_entry *topic_e, uint8_t *topic, uint8_t *data, uint16_t data_len);
bool create_topiclist(uint16_t num_entires);
bool add_topic(MQTT_ClientCon *clientcon, uint8_t *topic, uint8_t qos);
bool delete_topic(MQTT_ClientCon *clientcon, uint8_t *topic);
bool find_topic(uint8_t *topic, find_topic_cb cb, uint8_t *data, uint16_t data_len);
void iterate_topics(iterate_topic_cb cb, void *user_data);
#endif /* _MQTT_TOPICLIST_H_ */

Wyświetl plik

@ -1,32 +0,0 @@
/*******************************************************************************
* Copyright (c) 2007, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
#if !defined(MQTT_TOPICS_H)
#define MQTT_TOPICS_H
#define TOPIC_LEVEL_SEPARATOR "/"
#define SINGLE_LEVEL_WILDCARD "+"
#define MULTI_LEVEL_WILDCARD "#"
int Topics_isValidName(char* aName);
int Topics_hasWildcards(char* topic);
int Topics_matches(char* wildTopic, int wildcards, char* topic);
#endif /* MQTT_TOPICS_H */

Wyświetl plik

@ -1,32 +0,0 @@
/*
* File: proto.h
* Author: ThuHien
*
* Created on November 23, 2012, 8:57 AM
*/
#ifndef _PROTO_H_
#define _PROTO_H_
#include <stdlib.h>
#include "typedef.h"
#include "ringbuf_mqtt.h"
typedef void(PROTO_PARSE_CALLBACK)();
typedef struct {
U8 *buf;
U16 bufSize;
U16 dataLen;
U8 isEsc;
U8 isBegin;
PROTO_PARSE_CALLBACK* callback;
} PROTO_PARSER;
I8 ICACHE_FLASH_ATTR PROTO_Init(PROTO_PARSER *parser, PROTO_PARSE_CALLBACK *completeCallback, U8 *buf, U16 bufSize);
I8 ICACHE_FLASH_ATTR PROTO_Parse(PROTO_PARSER *parser, U8 *buf, U16 len);
I16 ICACHE_FLASH_ATTR PROTO_Add(U8 *buf, const U8 *packet, I16 bufSize);
I16 ICACHE_FLASH_ATTR PROTO_AddRb(RINGBUF *rb, const U8 *packet, I16 len);
I8 ICACHE_FLASH_ATTR PROTO_ParseByte(PROTO_PARSER *parser, U8 value);
I16 ICACHE_FLASH_ATTR PROTO_ParseRb(RINGBUF *rb, U8 *bufOut, U16* len, U16 maxBufLen);
#endif

Wyświetl plik

@ -1,44 +0,0 @@
/* str_queue.h --
*
* Copyright (c) 2014-2015, Tuan PM <tuanpm at live dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef USER_QUEUE_H_
#define USER_QUEUE_H_
#include "os_type.h"
#include "ringbuf_mqtt.h"
typedef struct {
uint8_t *buf;
RINGBUF rb;
} QUEUE;
void ICACHE_FLASH_ATTR QUEUE_Init(QUEUE *queue, int bufferSize);
int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t* buffer, uint16_t len);
int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t* buffer, uint16_t* len, uint16_t maxLen);
BOOL ICACHE_FLASH_ATTR QUEUE_IsEmpty(QUEUE *queue);
#endif /* USER_QUEUE_H_ */

Wyświetl plik

@ -1,19 +0,0 @@
#ifndef _RING_BUF_MQTT_H_
#define _RING_BUF_MQTT_H_
#include <os_type.h>
#include <stdlib.h>
#include "typedef.h"
typedef struct {
U8* p_o; /**< Original pointer */
U8* volatile p_r; /**< Read pointer */
U8* volatile p_w; /**< Write pointer */
volatile I32 fill_cnt; /**< Number of filled slots */
I32 size; /**< Buffer size */
} RINGBUF;
I16 ICACHE_FLASH_ATTR RINGBUF_Init(RINGBUF *r, U8* buf, I32 size);
I16 ICACHE_FLASH_ATTR RINGBUF_Put(RINGBUF *r, U8 c);
I16 ICACHE_FLASH_ATTR RINGBUF_Get(RINGBUF *r, U8* c);
#endif

Wyświetl plik

@ -1,17 +0,0 @@
/**
* \file
* Standard Types definition
*/
#ifndef _TYPE_DEF_H_
#define _TYPE_DEF_H_
typedef char I8;
typedef unsigned char U8;
typedef short I16;
typedef unsigned short U16;
typedef long I32;
typedef unsigned long U32;
typedef unsigned long long U64;
#endif

Wyświetl plik

@ -1,9 +0,0 @@
#ifndef _UTILS_H_
#define _UTILS_H_
#include "c_types.h"
uint32_t ICACHE_FLASH_ATTR UTILS_Atoh(const int8_t *s);
uint8_t ICACHE_FLASH_ATTR UTILS_StrToIP(const int8_t* str, void *ip);
uint8_t ICACHE_FLASH_ATTR UTILS_IsIPV4 (int8_t *str);
#endif

Wyświetl plik

@ -1,943 +0,0 @@
/* mqtt.c
* Protocol: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
*
* Copyright (c) 2014-2015, Tuan PM <tuanpm at live dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "user_interface.h"
#include "osapi.h"
#include "espconn.h"
#include "os_type.h"
#include "mem.h"
#include "mqtt_msg.h"
#include "debug.h"
#include "user_config.h"
#include "mqtt.h"
#include "queue.h"
#define MQTT_TASK_PRIO 2
#define MQTT_TASK_QUEUE_SIZE 1
#define MQTT_SEND_TIMOUT 5
#ifndef MQTT_SSL_SIZE
#define MQTT_SSL_SIZE 5120
#endif
#ifndef QUEUE_BUFFER_SIZE
#define QUEUE_BUFFER_SIZE 2048
#endif
unsigned char *default_certificate;
unsigned int default_certificate_len = 0;
unsigned char *default_private_key;
unsigned int default_private_key_len = 0;
os_event_t mqtt_procTaskQueue[MQTT_TASK_QUEUE_SIZE];
#ifdef PROTOCOL_NAMEv311
LOCAL uint8_t zero_len_id[2] = { 0, 0 };
#endif
LOCAL void ICACHE_FLASH_ATTR mqtt_dns_found(const char *name, ip_addr_t * ipaddr, void *arg) {
struct espconn *pConn = (struct espconn *)arg;
MQTT_Client *client = (MQTT_Client *) pConn->reverse;
if (ipaddr == NULL) {
MQTT_INFO("DNS: Found, but got no ip, try to reconnect\r\n");
client->connState = TCP_RECONNECT_REQ;
return;
}
MQTT_INFO("DNS: found ip %d.%d.%d.%d\n",
*((uint8 *) & ipaddr->addr),
*((uint8 *) & ipaddr->addr + 1), *((uint8 *) & ipaddr->addr + 2), *((uint8 *) & ipaddr->addr + 3));
if (client->ip.addr == 0 && ipaddr->addr != 0) {
os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4);
if (client->security) {
#ifdef MQTT_SSL_ENABLE
espconn_secure_set_size(ESPCONN_CLIENT, MQTT_SSL_SIZE);
espconn_secure_connect(client->pCon);
#else
MQTT_INFO("TCP: Do not support SSL\r\n");
#endif
} else {
espconn_connect(client->pCon);
}
client->connState = TCP_CONNECTING;
MQTT_INFO("TCP: connecting...\r\n");
}
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
}
LOCAL void ICACHE_FLASH_ATTR deliver_publish(MQTT_Client * client, uint8_t * message, int length) {
mqtt_event_data_t event_data;
event_data.topic_length = length;
event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length);
event_data.data_length = length;
event_data.data = mqtt_get_publish_data(message, &event_data.data_length);
if (client->dataCb)
client->dataCb((uint32_t *) client, event_data.topic, event_data.topic_length, event_data.data,
event_data.data_length);
}
void ICACHE_FLASH_ATTR mqtt_send_keepalive(MQTT_Client * client) {
MQTT_INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port);
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ;
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_id =
mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
client->sendTimeout = MQTT_SEND_TIMOUT;
MQTT_INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type,
client->mqtt_state.pending_msg_id);
err_t result = ESPCONN_OK;
if (client->security) {
#ifdef MQTT_SSL_ENABLE
result =
espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
#else
MQTT_INFO("TCP: Do not support SSL\r\n");
#endif
} else {
result =
espconn_send(client->pCon, client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
}
client->mqtt_state.outbound_message = NULL;
if (ESPCONN_OK == result) {
client->keepAliveTick = 0;
client->connState = MQTT_DATA;
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
} else {
client->connState = TCP_RECONNECT_DISCONNECTING;
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
}
}
/**
* @brief Delete tcp client and free all memory
* @param mqttClient: The mqtt client which contain TCP client
* @retval None
*/
void ICACHE_FLASH_ATTR mqtt_tcpclient_delete(MQTT_Client * mqttClient) {
if (mqttClient->pCon != NULL) {
MQTT_INFO("TCP: Free memory\r\n");
// Force abort connections
espconn_abort(mqttClient->pCon);
// Delete connections
espconn_delete(mqttClient->pCon);
if (mqttClient->pCon->proto.tcp) {
os_free(mqttClient->pCon->proto.tcp);
mqttClient->pCon->proto.tcp = NULL;
}
os_free(mqttClient->pCon);
mqttClient->pCon = NULL;
}
}
/**
* @brief Delete MQTT client and free all memory
* @param mqttClient: The mqtt client
* @retval None
*/
void ICACHE_FLASH_ATTR mqtt_client_delete(MQTT_Client * mqttClient) {
if (mqttClient == NULL)
return;
if (mqttClient->pCon != NULL) {
mqtt_tcpclient_delete(mqttClient);
}
if (mqttClient->host != NULL) {
os_free(mqttClient->host);
mqttClient->host = NULL;
}
if (mqttClient->user_data != NULL) {
os_free(mqttClient->user_data);
mqttClient->user_data = NULL;
}
if (mqttClient->mqtt_state.in_buffer != NULL) {
os_free(mqttClient->mqtt_state.in_buffer);
mqttClient->mqtt_state.in_buffer = NULL;
}
if (mqttClient->mqtt_state.out_buffer != NULL) {
os_free(mqttClient->mqtt_state.out_buffer);
mqttClient->mqtt_state.out_buffer = NULL;
}
if (mqttClient->mqtt_state.outbound_message != NULL) {
if (mqttClient->mqtt_state.outbound_message->data != NULL) {
os_free(mqttClient->mqtt_state.outbound_message->data);
mqttClient->mqtt_state.outbound_message->data = NULL;
}
}
if (mqttClient->mqtt_state.mqtt_connection.buffer != NULL) {
// Already freed but not NULL
mqttClient->mqtt_state.mqtt_connection.buffer = NULL;
}
if (mqttClient->connect_info.client_id != NULL) {
#ifdef PROTOCOL_NAMEv311
/* Don't attempt to free if it's the zero_len array */
if (((uint8_t *) mqttClient->connect_info.client_id) != zero_len_id)
os_free(mqttClient->connect_info.client_id);
#else
os_free(mqttClient->connect_info.client_id);
#endif
mqttClient->connect_info.client_id = NULL;
}
if (mqttClient->connect_info.username != NULL) {
os_free(mqttClient->connect_info.username);
mqttClient->connect_info.username = NULL;
}
if (mqttClient->connect_info.password != NULL) {
os_free(mqttClient->connect_info.password);
mqttClient->connect_info.password = NULL;
}
if (mqttClient->connect_info.will_topic != NULL) {
os_free(mqttClient->connect_info.will_topic);
mqttClient->connect_info.will_topic = NULL;
}
if (mqttClient->connect_info.will_data != NULL) {
os_free(mqttClient->connect_info.will_data);
mqttClient->connect_info.will_data = NULL;
}
if (mqttClient->msgQueue.buf != NULL) {
os_free(mqttClient->msgQueue.buf);
mqttClient->msgQueue.buf = NULL;
}
// Initialize state
mqttClient->connState = WIFI_INIT;
// Clear callback functions to avoid abnormal callback
mqttClient->connectedCb = NULL;
mqttClient->disconnectedCb = NULL;
mqttClient->publishedCb = NULL;
mqttClient->timeoutCb = NULL;
mqttClient->dataCb = NULL;
MQTT_INFO("MQTT: client already deleted\r\n");
}
/**
* @brief Client received callback function.
* @param arg: contain the ip link information
* @param pdata: received data
* @param len: the lenght of received data
* @retval None
*/
void ICACHE_FLASH_ATTR mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len) {
uint8_t msg_type;
uint8_t msg_qos;
uint16_t msg_id;
uint8_t msg_conn_ret;
struct espconn *pCon = (struct espconn *)arg;
MQTT_Client *client = (MQTT_Client *) pCon->reverse;
//client->keepAliveTick = 0;
READPACKET:
MQTT_INFO("TCP: data received %d bytes\r\n", len);
// MQTT_INFO("STATE: %d\r\n", client->connState);
if (len < MQTT_BUF_SIZE && len > 0) {
os_memcpy(client->mqtt_state.in_buffer, pdata, len);
msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length);
switch (client->connState) {
case MQTT_CONNECT_SENDING:
if (msg_type == MQTT_MSG_TYPE_CONNACK) {
if (client->mqtt_state.pending_msg_type != MQTT_MSG_TYPE_CONNECT) {
MQTT_INFO("MQTT: Invalid packet\r\n");
if (client->security) {
#ifdef MQTT_SSL_ENABLE
espconn_secure_disconnect(client->pCon);
#else
MQTT_INFO("TCP: Do not support SSL\r\n");
#endif
} else {
espconn_disconnect(client->pCon);
}
} else {
msg_conn_ret = mqtt_get_connect_return_code(client->mqtt_state.in_buffer);
switch (msg_conn_ret) {
case CONNECTION_ACCEPTED:
MQTT_INFO("MQTT: Connected to %s:%d\r\n", client->host, client->port);
client->connState = MQTT_DATA;
if (client->connectedCb)
client->connectedCb((uint32_t *) client);
break;
case CONNECTION_REFUSE_PROTOCOL:
case CONNECTION_REFUSE_SERVER_UNAVAILABLE:
case CONNECTION_REFUSE_BAD_USERNAME:
case CONNECTION_REFUSE_NOT_AUTHORIZED:
MQTT_INFO("MQTT: Connection refuse, reason code: %d\r\n", msg_conn_ret);
default:
if (client->security) {
#ifdef MQTT_SSL_ENABLE
espconn_secure_disconnect(client->pCon);
#else
MQTT_INFO("TCP: Do not support SSL\r\n");
#endif
} else {
espconn_disconnect(client->pCon);
}
}
}
}
break;
case MQTT_DATA:
case MQTT_KEEPALIVE_SEND:
client->mqtt_state.message_length_read = len;
client->mqtt_state.message_length =
mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
switch (msg_type) {
case MQTT_MSG_TYPE_SUBACK:
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE
&& client->mqtt_state.pending_msg_id == msg_id)
MQTT_INFO("MQTT: Subscribe successful\r\n");
break;
case MQTT_MSG_TYPE_UNSUBACK:
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE
&& client->mqtt_state.pending_msg_id == msg_id)
MQTT_INFO("MQTT: UnSubscribe successful\r\n");
break;
case MQTT_MSG_TYPE_PUBLISH:
if (msg_qos == 1)
client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
else if (msg_qos == 2)
client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
if (msg_qos == 1 || msg_qos == 2) {
MQTT_INFO("MQTT: Queue response QoS: %d\r\n", msg_qos);
if (QUEUE_Puts
(&client->msgQueue, client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length) == -1) {
MQTT_INFO("MQTT: Queue full\r\n");
}
}
deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
break;
case MQTT_MSG_TYPE_PUBACK:
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH
&& client->mqtt_state.pending_msg_id == msg_id) {
MQTT_INFO("MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\r\n");
}
break;
case MQTT_MSG_TYPE_PUBREC:
client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
if (QUEUE_Puts
(&client->msgQueue, client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length) == -1) {
MQTT_INFO("MQTT: Queue full\r\n");
}
break;
case MQTT_MSG_TYPE_PUBREL:
client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
if (QUEUE_Puts
(&client->msgQueue, client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length) == -1) {
MQTT_INFO("MQTT: Queue full\r\n");
}
break;
case MQTT_MSG_TYPE_PUBCOMP:
if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH
&& client->mqtt_state.pending_msg_id == msg_id) {
MQTT_INFO("MQTT: receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish\r\n");
}
break;
case MQTT_MSG_TYPE_PINGREQ:
client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection);
if (QUEUE_Puts
(&client->msgQueue, client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length) == -1) {
MQTT_INFO("MQTT: Queue full\r\n");
}
break;
case MQTT_MSG_TYPE_PINGRESP:
// Ignore
break;
}
// NOTE: this is done down here and not in the switch case above
// because the PSOCK_READBUF_LEN() won't work inside a switch
// statement due to the way protothreads resume.
if (msg_type == MQTT_MSG_TYPE_PUBLISH) {
len = client->mqtt_state.message_length_read;
if (client->mqtt_state.message_length < client->mqtt_state.message_length_read) {
//client->connState = MQTT_PUBLISH_RECV;
//Not Implement yet
len -= client->mqtt_state.message_length;
pdata += client->mqtt_state.message_length;
MQTT_INFO("Get another published message\r\n");
goto READPACKET;
}
}
break;
}
} else {
MQTT_INFO("ERROR: Message too long\r\n");
}
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
}
/**
* @brief Client send over callback function.
* @param arg: contain the ip link information
* @retval None
*/
void ICACHE_FLASH_ATTR mqtt_tcpclient_sent_cb(void *arg) {
struct espconn *pCon = (struct espconn *)arg;
MQTT_Client *client = (MQTT_Client *) pCon->reverse;
MQTT_INFO("TCP: Sent\r\n");
client->sendTimeout = 0;
client->keepAliveTick = 0;
if ((client->connState == MQTT_DATA || client->connState == MQTT_KEEPALIVE_SEND)
&& client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH) {
if (client->publishedCb)
client->publishedCb((uint32_t *) client);
}
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
}
void ICACHE_FLASH_ATTR mqtt_timer(void *arg) {
MQTT_Client *client = (MQTT_Client *) arg;
if (client->connState == MQTT_DATA) {
client->keepAliveTick++;
if (client->keepAliveTick > (client->mqtt_state.connect_info->keepalive / 2)) {
client->connState = MQTT_KEEPALIVE_SEND;
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
}
} else if (client->connState == TCP_RECONNECT_REQ) {
client->reconnectTick++;
if (client->reconnectTick > MQTT_RECONNECT_TIMEOUT) {
client->reconnectTick = 0;
client->connState = TCP_RECONNECT;
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
if (client->timeoutCb)
client->timeoutCb((uint32_t *) client);
}
}
if (client->sendTimeout > 0)
client->sendTimeout--;
}
void ICACHE_FLASH_ATTR mqtt_tcpclient_discon_cb(void *arg) {
struct espconn *pespconn = (struct espconn *)arg;
MQTT_Client *client = (MQTT_Client *) pespconn->reverse;
MQTT_INFO("TCP: Disconnected callback\r\n");
if (TCP_DISCONNECTING == client->connState) {
client->connState = TCP_DISCONNECTED;
} else if (MQTT_DELETING == client->connState) {
client->connState = MQTT_DELETED;
} else {
client->connState = TCP_RECONNECT_REQ;
}
if (client->disconnectedCb)
client->disconnectedCb((uint32_t *) client);
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
}
/**
* @brief Tcp client connect success callback function.
* @param arg: contain the ip link information
* @retval None
*/
void ICACHE_FLASH_ATTR mqtt_tcpclient_connect_cb(void *arg) {
struct espconn *pCon = (struct espconn *)arg;
MQTT_Client *client = (MQTT_Client *) pCon->reverse;
espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb);
espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv); ////////
espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb); ///////
MQTT_INFO("MQTT: Connected to broker %s:%d\r\n", client->host, client->port);
mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer,
client->mqtt_state.out_buffer_length);
client->mqtt_state.outbound_message =
mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info);
client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
client->mqtt_state.pending_msg_id =
mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
client->sendTimeout = MQTT_SEND_TIMOUT;
MQTT_INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type,
client->mqtt_state.pending_msg_id);
if (client->security) {
#ifdef MQTT_SSL_ENABLE
espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
#else
MQTT_INFO("TCP: Do not support SSL\r\n");
#endif
} else {
espconn_send(client->pCon, client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length);
}
client->mqtt_state.outbound_message = NULL;
client->connState = MQTT_CONNECT_SENDING;
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
}
/**
* @brief Tcp client connect repeat callback function.
* @param arg: contain the ip link information
* @retval None
*/
void ICACHE_FLASH_ATTR mqtt_tcpclient_recon_cb(void *arg, sint8 errType) {
struct espconn *pCon = (struct espconn *)arg;
MQTT_Client *client = (MQTT_Client *) pCon->reverse;
MQTT_INFO("TCP: Reconnect to %s:%d\r\n", client->host, client->port);
client->connState = TCP_RECONNECT_REQ;
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
}
/**
* @brief MQTT publish function.
* @param client: MQTT_Client reference
* @param topic: string topic will publish to
* @param data: buffer data send point to
* @param data_length: length of data
* @param qos: qos
* @param retain: retain
* @retval TRUE if success queue
*/
BOOL ICACHE_FLASH_ATTR
MQTT_Publish(MQTT_Client * client, const char *topic, const char *data, int data_length, int qos, int retain) {
uint8_t dataBuffer[MQTT_BUF_SIZE];
uint16_t dataLen;
client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
topic, data, data_length,
qos, retain, &client->mqtt_state.pending_msg_id);
if (client->mqtt_state.outbound_message->length == 0) {
MQTT_INFO("MQTT: Queuing publish failed\r\n");
return FALSE;
}
MQTT_INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length,
client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
while (QUEUE_Puts
(&client->msgQueue, client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length) == -1) {
MQTT_INFO("MQTT: Queue full\r\n");
if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
MQTT_INFO("MQTT: Serious buffer error\r\n");
return FALSE;
}
}
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
return TRUE;
}
/**
* @brief MQTT subscibe function.
* @param client: MQTT_Client reference
* @param topic: string topic will subscribe
* @param qos: qos
* @retval TRUE if success queue
*/
BOOL ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client * client, char *topic, uint8_t qos) {
uint8_t dataBuffer[MQTT_BUF_SIZE];
uint16_t dataLen;
client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
topic, qos, &client->mqtt_state.pending_msg_id);
MQTT_INFO("MQTT: queue subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
while (QUEUE_Puts
(&client->msgQueue, client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length) == -1) {
MQTT_INFO("MQTT: Queue full\r\n");
if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
MQTT_INFO("MQTT: Serious buffer error\r\n");
return FALSE;
}
}
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
return TRUE;
}
/**
* @brief MQTT un-subscibe function.
* @param client: MQTT_Client reference
* @param topic: String topic will un-subscribe
* @retval TRUE if success queue
*/
BOOL ICACHE_FLASH_ATTR MQTT_UnSubscribe(MQTT_Client * client, char *topic) {
uint8_t dataBuffer[MQTT_BUF_SIZE];
uint16_t dataLen;
client->mqtt_state.outbound_message = mqtt_msg_unsubscribe(&client->mqtt_state.mqtt_connection,
topic, &client->mqtt_state.pending_msg_id);
MQTT_INFO("MQTT: queue un-subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
while (QUEUE_Puts
(&client->msgQueue, client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length) == -1) {
MQTT_INFO("MQTT: Queue full\r\n");
if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
MQTT_INFO("MQTT: Serious buffer error\r\n");
return FALSE;
}
}
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
return TRUE;
}
/**
* @brief MQTT ping function.
* @param client: MQTT_Client reference
* @retval TRUE if success queue
*/
BOOL ICACHE_FLASH_ATTR MQTT_Ping(MQTT_Client * client) {
uint8_t dataBuffer[MQTT_BUF_SIZE];
uint16_t dataLen;
client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
if (client->mqtt_state.outbound_message->length == 0) {
MQTT_INFO("MQTT: Queuing publish failed\r\n");
return FALSE;
}
MQTT_INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length,
client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
while (QUEUE_Puts
(&client->msgQueue, client->mqtt_state.outbound_message->data,
client->mqtt_state.outbound_message->length) == -1) {
MQTT_INFO("MQTT: Queue full\r\n");
if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
MQTT_INFO("MQTT: Serious buffer error\r\n");
return FALSE;
}
}
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) client);
return TRUE;
}
void ICACHE_FLASH_ATTR MQTT_Task(os_event_t * e) {
MQTT_Client *client = (MQTT_Client *) e->par;
uint8_t dataBuffer[MQTT_BUF_SIZE];
uint16_t dataLen;
if (e->par == 0)
return;
MQTT_INFO("MQTT: Client task activated - state %d\r\n", client->connState);
switch (client->connState) {
case TCP_RECONNECT_REQ:
break;
case TCP_RECONNECT:
mqtt_tcpclient_delete(client);
MQTT_Connect(client);
MQTT_INFO("TCP: Reconnect to: %s:%d\r\n", client->host, client->port);
client->connState = TCP_CONNECTING;
break;
case MQTT_DELETING:
case TCP_DISCONNECTING:
case TCP_RECONNECT_DISCONNECTING:
if (client->security) {
#ifdef MQTT_SSL_ENABLE
espconn_secure_disconnect(client->pCon);
#else
MQTT_INFO("TCP: Do not support SSL\r\n");
#endif
} else {
espconn_disconnect(client->pCon);
}
break;
case TCP_DISCONNECTED:
MQTT_INFO("MQTT: Disconnected\r\n");
mqtt_tcpclient_delete(client);
break;
case MQTT_DELETED:
MQTT_INFO("MQTT: Deleted client\r\n");
mqtt_client_delete(client);
break;
case MQTT_KEEPALIVE_SEND:
mqtt_send_keepalive(client);
break;
case MQTT_DATA:
if (QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0) {
break;
}
if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0) {
client->mqtt_state.pending_msg_type = mqtt_get_type(dataBuffer);
client->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen);
client->sendTimeout = MQTT_SEND_TIMOUT;
MQTT_INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type,
client->mqtt_state.pending_msg_id);
client->keepAliveTick = 0;
if (client->security) {
#ifdef MQTT_SSL_ENABLE
espconn_secure_send(client->pCon, dataBuffer, dataLen);
#else
MQTT_INFO("TCP: Do not support SSL\r\n");
#endif
} else {
espconn_send(client->pCon, dataBuffer, dataLen);
}
client->mqtt_state.outbound_message = NULL;
break;
}
break;
}
}
/**
* @brief MQTT initialization connection function
* @param client: MQTT_Client reference
* @param host: Domain or IP string
* @param port: Port to connect
* @param security: 1 for ssl, 0 for none
* @retval None
*/
void ICACHE_FLASH_ATTR MQTT_InitConnection(MQTT_Client * mqttClient, uint8_t * host, uint32_t port, uint8_t security) {
uint32_t temp;
MQTT_INFO("MQTT:InitConnection\r\n");
os_memset(mqttClient, 0, sizeof(MQTT_Client));
temp = os_strlen(host);
mqttClient->host = (uint8_t *) os_zalloc(temp + 1);
os_strcpy(mqttClient->host, host);
mqttClient->host[temp] = 0;
mqttClient->port = port;
mqttClient->security = security;
}
/**
* @brief MQTT initialization mqtt client function
* @param client: MQTT_Client reference
* @param clientid: MQTT client id
* @param client_user:MQTT client user
* @param client_pass:MQTT client password
* @param client_pass:MQTT keep alive timer, in second
* @retval None
*/
BOOL ICACHE_FLASH_ATTR
MQTT_InitClient(MQTT_Client * mqttClient, uint8_t * client_id, uint8_t * client_user, uint8_t * client_pass,
uint32_t keepAliveTime, uint8_t cleanSession) {
uint32_t temp;
MQTT_INFO("MQTT:InitClient\r\n");
os_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t));
if (!client_id) {
/* Should be allowed by broker, but clean session flag must be set. */
#ifdef PROTOCOL_NAMEv311
if (cleanSession) {
mqttClient->connect_info.client_id = zero_len_id;
} else {
MQTT_INFO("cleanSession must be set to use 0 length client_id\r\n");
return false;
}
/* Not supported. Return. */
#else
MQTT_INFO("Client ID required for MQTT < 3.1.1!\r\n");
return false;
#endif
}
/* If connect_info's client_id is still NULL and we get here, we can *
* assume the passed client_id is non-NULL. */
if (!(mqttClient->connect_info.client_id)) {
temp = os_strlen(client_id);
mqttClient->connect_info.client_id = (uint8_t *) os_zalloc(temp + 1);
os_strcpy(mqttClient->connect_info.client_id, client_id);
mqttClient->connect_info.client_id[temp] = 0;
}
if (client_user) {
temp = os_strlen(client_user);
mqttClient->connect_info.username = (uint8_t *) os_zalloc(temp + 1);
os_strcpy(mqttClient->connect_info.username, client_user);
mqttClient->connect_info.username[temp] = 0;
}
if (client_pass) {
temp = os_strlen(client_pass);
mqttClient->connect_info.password = (uint8_t *) os_zalloc(temp + 1);
os_strcpy(mqttClient->connect_info.password, client_pass);
mqttClient->connect_info.password[temp] = 0;
}
mqttClient->connect_info.keepalive = keepAliveTime;
mqttClient->connect_info.clean_session = cleanSession;
mqttClient->mqtt_state.in_buffer = (uint8_t *) os_zalloc(MQTT_BUF_SIZE);
mqttClient->mqtt_state.in_buffer_length = MQTT_BUF_SIZE;
mqttClient->mqtt_state.out_buffer = (uint8_t *) os_zalloc(MQTT_BUF_SIZE);
mqttClient->mqtt_state.out_buffer_length = MQTT_BUF_SIZE;
mqttClient->mqtt_state.connect_info = &mqttClient->connect_info;
mqtt_msg_init(&mqttClient->mqtt_state.mqtt_connection, mqttClient->mqtt_state.out_buffer,
mqttClient->mqtt_state.out_buffer_length);
QUEUE_Init(&mqttClient->msgQueue, QUEUE_BUFFER_SIZE);
system_os_task(MQTT_Task, MQTT_TASK_PRIO, mqtt_procTaskQueue, MQTT_TASK_QUEUE_SIZE);
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) mqttClient);
return true;
}
void ICACHE_FLASH_ATTR
MQTT_InitLWT(MQTT_Client * mqttClient, uint8_t * will_topic, uint8_t * will_msg, uint8_t will_qos, uint8_t will_retain)
{
uint32_t temp;
temp = os_strlen(will_topic);
mqttClient->connect_info.will_topic = (uint8_t *) os_zalloc(temp + 1);
os_strcpy(mqttClient->connect_info.will_topic, will_topic);
mqttClient->connect_info.will_topic[temp] = 0;
temp = os_strlen(will_msg);
mqttClient->connect_info.will_data = (uint8_t *) os_zalloc(temp + 1);
os_strcpy(mqttClient->connect_info.will_data, will_msg);
mqttClient->connect_info.will_data[temp] = 0;
mqttClient->connect_info.will_qos = will_qos;
mqttClient->connect_info.will_retain = will_retain;
}
/**
* @brief Begin connect to MQTT broker
* @param client: MQTT_Client reference
* @retval None
*/
void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client * mqttClient) {
if (mqttClient->pCon) {
// Clean up the old connection forcefully - using MQTT_Disconnect
// does not actually release the old connection until the
// disconnection callback is invoked.
mqtt_tcpclient_delete(mqttClient);
}
mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));
mqttClient->pCon->type = ESPCONN_TCP;
mqttClient->pCon->state = ESPCONN_NONE;
mqttClient->pCon->proto.tcp = (esp_tcp *) os_zalloc(sizeof(esp_tcp));
mqttClient->pCon->proto.tcp->local_port = espconn_port();
mqttClient->pCon->proto.tcp->remote_port = mqttClient->port;
mqttClient->pCon->reverse = mqttClient;
espconn_regist_connectcb(mqttClient->pCon, mqtt_tcpclient_connect_cb);
espconn_regist_reconcb(mqttClient->pCon, mqtt_tcpclient_recon_cb);
mqttClient->keepAliveTick = 0;
mqttClient->reconnectTick = 0;
os_timer_disarm(&mqttClient->mqttTimer);
os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *) mqtt_timer, mqttClient);
os_timer_arm(&mqttClient->mqttTimer, 1000, 1);
if (UTILS_StrToIP(mqttClient->host, &mqttClient->pCon->proto.tcp->remote_ip)) {
MQTT_INFO("TCP: Connect to ip %s:%d\r\n", mqttClient->host, mqttClient->port);
if (mqttClient->security) {
#ifdef MQTT_SSL_ENABLE
espconn_secure_set_size(ESPCONN_CLIENT, MQTT_SSL_SIZE);
espconn_secure_connect(mqttClient->pCon);
#else
MQTT_INFO("TCP: Do not support SSL\r\n");
#endif
} else {
espconn_connect(mqttClient->pCon);
}
} else {
MQTT_INFO("TCP: Connect to domain %s:%d\r\n", mqttClient->host, mqttClient->port);
espconn_gethostbyname(mqttClient->pCon, mqttClient->host, &mqttClient->ip, mqtt_dns_found);
}
mqttClient->connState = TCP_CONNECTING;
}
void ICACHE_FLASH_ATTR MQTT_Disconnect(MQTT_Client * mqttClient) {
mqttClient->connState = TCP_DISCONNECTING;
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) mqttClient);
os_timer_disarm(&mqttClient->mqttTimer);
}
void ICACHE_FLASH_ATTR MQTT_DeleteClient(MQTT_Client * mqttClient) {
if (NULL == mqttClient)
return;
mqttClient->connState = MQTT_DELETED;
// if(TCP_DISCONNECTED == mqttClient->connState) {
// mqttClient->connState = MQTT_DELETED;
// } else if(MQTT_DELETED != mqttClient->connState) {
// mqttClient->connState = MQTT_DELETING;
// }
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t) mqttClient);
os_timer_disarm(&mqttClient->mqttTimer);
}
void ICACHE_FLASH_ATTR MQTT_OnConnected(MQTT_Client * mqttClient, MqttCallback connectedCb) {
mqttClient->connectedCb = connectedCb;
}
void ICACHE_FLASH_ATTR MQTT_OnDisconnected(MQTT_Client * mqttClient, MqttCallback disconnectedCb) {
mqttClient->disconnectedCb = disconnectedCb;
}
void ICACHE_FLASH_ATTR MQTT_OnData(MQTT_Client * mqttClient, MqttDataCallback dataCb) {
mqttClient->dataCb = dataCb;
}
void ICACHE_FLASH_ATTR MQTT_OnPublished(MQTT_Client * mqttClient, MqttCallback publishedCb) {
mqttClient->publishedCb = publishedCb;
}
void ICACHE_FLASH_ATTR MQTT_OnTimeout(MQTT_Client * mqttClient, MqttCallback timeoutCb) {
mqttClient->timeoutCb = timeoutCb;
}

Wyświetl plik

@ -1,475 +0,0 @@
/*
* Copyright (c) 2014, Stephen Robinson
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*/
#include "c_types.h"
#include "ets_sys.h"
#include "osapi.h"
#include "os_type.h"
#include <string.h>
#include "mqtt_msg.h"
#include "user_config.h"
static int ICACHE_FLASH_ATTR append_string(mqtt_connection_t * connection, const char *string, int len) {
if (connection->message.length + len + 2 > connection->buffer_length)
return -1;
connection->buffer[connection->message.length++] = len >> 8;
connection->buffer[connection->message.length++] = len & 0xff;
os_memcpy(connection->buffer + connection->message.length, string, len);
connection->message.length += len;
return len + 2;
}
static uint16_t ICACHE_FLASH_ATTR append_message_id(mqtt_connection_t * connection, uint16_t message_id) {
// If message_id is zero then we should assign one, otherwise
// we'll use the one supplied by the caller
while (message_id == 0)
message_id = ++connection->message_id;
if (connection->message.length + 2 > connection->buffer_length)
return 0;
connection->buffer[connection->message.length++] = message_id >> 8;
connection->buffer[connection->message.length++] = message_id & 0xff;
return message_id;
}
static int ICACHE_FLASH_ATTR init_message(mqtt_connection_t * connection) {
connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE;
return MQTT_MAX_FIXED_HEADER_SIZE;
}
static mqtt_message_t *ICACHE_FLASH_ATTR fail_message(mqtt_connection_t * connection) {
connection->message.data = connection->buffer;
connection->message.length = 0;
return &connection->message;
}
static mqtt_message_t *ICACHE_FLASH_ATTR fini_message(mqtt_connection_t * connection, int type, int dup, int qos,
int retain) {
int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;
if (remaining_length > 127) {
connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
connection->buffer[1] = 0x80 | (remaining_length % 128);
connection->buffer[2] = remaining_length / 128;
connection->message.length = remaining_length + 3;
connection->message.data = connection->buffer;
} else {
connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1);
connection->buffer[2] = remaining_length;
connection->message.length = remaining_length + 2;
connection->message.data = connection->buffer + 1;
}
return &connection->message;
}
void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t * connection, uint8_t * buffer, uint16_t buffer_length) {
os_memset(connection, 0, sizeof(mqtt_connection_t));
connection->buffer = buffer;
connection->buffer_length = buffer_length;
}
int ICACHE_FLASH_ATTR mqtt_get_total_length(uint8_t * buffer, uint16_t length) {
int i;
int totlen = 0;
for (i = 1; i < length; ++i) {
totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}
}
totlen += i;
return totlen;
}
char *ICACHE_FLASH_ATTR mqtt_get_str(uint8_t * buffer, uint16_t * length) {
int i = 0;
int topiclen;
if (i + 2 >= *length)
return NULL;
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
if (i + topiclen > *length)
return NULL;
*length = topiclen;
return buffer + i;
}
const char *ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t * buffer, uint16_t * length) {
int i;
int totlen = 0;
int topiclen;
for (i = 1; i < *length; ++i) {
totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}
}
totlen += i;
if (i + 2 >= *length)
return NULL;
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
if (i + topiclen > *length)
return NULL;
*length = topiclen;
return (const char *)(buffer + i);
}
const char *ICACHE_FLASH_ATTR mqtt_get_publish_data(uint8_t * buffer, uint16_t * length) {
int i;
int totlen = 0;
int topiclen;
int blength = *length;
*length = 0;
for (i = 1; i < blength; ++i) {
totlen += (buffer[i] & 0x7f) << (7 * (i - 1));
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}
}
totlen += i;
if (i + 2 >= blength)
return NULL;
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
if (i + topiclen >= blength)
return NULL;
i += topiclen;
if (mqtt_get_qos(buffer) > 0) {
if (i + 2 >= blength)
return NULL;
i += 2;
}
if (totlen < i)
return NULL;
if (totlen <= blength)
*length = totlen - i;
else
*length = blength - i;
return (const char *)(buffer + i);
}
uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t * buffer, uint16_t length) {
if (length < 1)
return 0;
switch (mqtt_get_type(buffer)) {
case MQTT_MSG_TYPE_PUBLISH:
{
int i;
int topiclen;
for (i = 1; i < length; ++i) {
if ((buffer[i] & 0x80) == 0) {
++i;
break;
}
}
if (i + 2 >= length)
return 0;
topiclen = buffer[i++] << 8;
topiclen |= buffer[i++];
if (i + topiclen >= length)
return 0;
i += topiclen;
if (mqtt_get_qos(buffer) > 0) {
if (i + 2 >= length)
return 0;
//i += 2;
} else {
return 0;
}
return (buffer[i] << 8) | buffer[i + 1];
}
case MQTT_MSG_TYPE_PUBACK:
case MQTT_MSG_TYPE_PUBREC:
case MQTT_MSG_TYPE_PUBREL:
case MQTT_MSG_TYPE_PUBCOMP:
case MQTT_MSG_TYPE_SUBACK:
case MQTT_MSG_TYPE_UNSUBACK:
case MQTT_MSG_TYPE_SUBSCRIBE:
case MQTT_MSG_TYPE_UNSUBSCRIBE:
{
// This requires the remaining length to be encoded in 1 byte,
// which it should be.
if (length >= 4 && (buffer[1] & 0x80) == 0)
return (buffer[2] << 8) | buffer[3];
else
return 0;
}
default:
return 0;
}
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t * connection, mqtt_connect_info_t * info) {
struct mqtt_connect_variable_header *variable_header;
init_message(connection);
if (connection->message.length + sizeof(*variable_header) > connection->buffer_length)
return fail_message(connection);
variable_header = (void *)(connection->buffer + connection->message.length);
connection->message.length += sizeof(*variable_header);
variable_header->lengthMsb = 0;
#if defined(PROTOCOL_NAMEv31)
variable_header->lengthLsb = 6;
os_memcpy(variable_header->magic, "MQIsdp", 6);
variable_header->version = 3;
#elif defined(PROTOCOL_NAMEv311)
variable_header->lengthLsb = 4;
os_memcpy(variable_header->magic, "MQTT", 4);
variable_header->version = 4;
#else
#error "Please define protocol name"
#endif
variable_header->flags = 0;
variable_header->keepaliveMsb = info->keepalive >> 8;
variable_header->keepaliveLsb = info->keepalive & 0xff;
if (info->clean_session)
variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
if (info->client_id == NULL) {
/* Never allowed */
return fail_message(connection);
} else if (info->client_id[0] == '\0') {
#ifdef PROTOCOL_NAMEv311
/* Allowed. Format 0 Length ID */
append_string(connection, info->client_id, 2);
#else
/* 0 Length not allowed */
return fail_message(connection);
#endif
} else {
/* No 0 data and at least 1 long. Good to go. */
if (append_string(connection, info->client_id, os_strlen(info->client_id)) < 0)
return fail_message(connection);
}
if (info->will_topic != NULL && info->will_topic[0] != '\0') {
if (append_string(connection, info->will_topic, os_strlen(info->will_topic)) < 0)
return fail_message(connection);
if (append_string(connection, info->will_data, os_strlen(info->will_data)) < 0)
return fail_message(connection);
variable_header->flags |= MQTT_CONNECT_FLAG_WILL;
if (info->will_retain)
variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
variable_header->flags |= (info->will_qos & 3) << 3;
}
if (info->username != NULL && info->username[0] != '\0') {
if (append_string(connection, info->username, os_strlen(info->username)) < 0)
return fail_message(connection);
variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME;
}
if (info->password != NULL && info->password[0] != '\0') {
if (append_string(connection, info->password, os_strlen(info->password)) < 0)
return fail_message(connection);
variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD;
}
return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_connack(mqtt_connection_t * connection,
enum mqtt_connect_return_code retcode) {
init_message(connection);
connection->buffer[connection->message.length++] = 0; // Connect Acknowledge Flags
connection->buffer[connection->message.length++] = retcode; // Connect Return code
return fini_message(connection, MQTT_MSG_TYPE_CONNACK, 0, 0, 0);
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t * connection, const char *topic, const char *data,
int data_length, int qos, int retain, uint16_t * message_id) {
init_message(connection);
if (topic == NULL || topic[0] == '\0')
return fail_message(connection);
if (append_string(connection, topic, os_strlen(topic)) < 0)
return fail_message(connection);
if (qos > 0) {
if ((*message_id = append_message_id(connection, 0)) == 0)
return fail_message(connection);
} else
*message_id = 0;
if (connection->message.length + data_length > connection->buffer_length)
return fail_message(connection);
os_memcpy(connection->buffer + connection->message.length, data, data_length);
connection->message.length += data_length;
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_puback(mqtt_connection_t * connection, uint16_t message_id) {
init_message(connection);
if (append_message_id(connection, message_id) == 0)
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0);
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pubrec(mqtt_connection_t * connection, uint16_t message_id) {
init_message(connection);
if (append_message_id(connection, message_id) == 0)
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0);
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pubrel(mqtt_connection_t * connection, uint16_t message_id) {
init_message(connection);
if (append_message_id(connection, message_id) == 0)
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0);
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pubcomp(mqtt_connection_t * connection, uint16_t message_id) {
init_message(connection);
if (append_message_id(connection, message_id) == 0)
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0);
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t * connection, const char *topic, int qos,
uint16_t * message_id) {
init_message(connection);
if (topic == NULL || topic[0] == '\0')
return fail_message(connection);
if ((*message_id = append_message_id(connection, 0)) == 0)
return fail_message(connection);
if (append_string(connection, topic, os_strlen(topic)) < 0)
return fail_message(connection);
if (connection->message.length + 1 > connection->buffer_length)
return fail_message(connection);
connection->buffer[connection->message.length++] = qos;
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_suback(mqtt_connection_t * connection, uint8_t * ret_codes,
uint8_t ret_codes_len, uint16_t message_id) {
uint8_t i;
init_message(connection);
if ((append_message_id(connection, message_id)) == 0)
return fail_message(connection);
for (i = 0; i < ret_codes_len; i++)
connection->buffer[connection->message.length++] = ret_codes[i];
return fini_message(connection, MQTT_MSG_TYPE_SUBACK, 0, 0, 0);
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t * connection, const char *topic,
uint16_t * message_id) {
init_message(connection);
if (topic == NULL || topic[0] == '\0')
return fail_message(connection);
if ((*message_id = append_message_id(connection, 0)) == 0)
return fail_message(connection);
if (append_string(connection, topic, os_strlen(topic)) < 0)
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0);
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_unsuback(mqtt_connection_t * connection, uint16_t message_id) {
uint8_t i;
init_message(connection);
if ((append_message_id(connection, message_id)) == 0)
return fail_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_UNSUBACK, 0, 0, 0);
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t * connection) {
init_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0);
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_pingresp(mqtt_connection_t * connection) {
init_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0);
}
mqtt_message_t *ICACHE_FLASH_ATTR mqtt_msg_disconnect(mqtt_connection_t * connection) {
init_message(connection);
return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0);
}

Wyświetl plik

@ -1,191 +0,0 @@
#include "c_types.h"
#include "mem.h"
#include "ets_sys.h"
#include "osapi.h"
#include "os_type.h"
#include <string.h>
//#include "user_config.h"
#include "mqtt_retainedlist.h"
#include "mqtt_topics.h"
static retained_entry *retained_list = NULL;
static uint16_t max_entry;
static on_retainedtopic_cb retained_cb = NULL;
bool ICACHE_FLASH_ATTR create_retainedlist(uint16_t num_entires) {
max_entry = num_entires;
retained_list = (retained_entry *) os_zalloc(num_entires * sizeof(retained_entry));
retained_cb = NULL;
return retained_list != NULL;
}
bool ICACHE_FLASH_ATTR update_retainedtopic(uint8_t * topic, uint8_t * data, uint16_t data_len, uint8_t qos) {
uint16_t i;
if (retained_list == NULL)
return false;
// look for topic in list
for (i = 0; i < max_entry; i++) {
if (retained_list[i].topic != NULL && os_strcmp(retained_list[i].topic, topic) == 0)
break;
}
// not yet in list
if (i >= max_entry) {
// if empty new data - no entry required
if (data_len == 0)
return true;
// find free
for (i = 0; i < max_entry; i++) {
if (retained_list[i].topic == NULL)
break;
}
if (i >= max_entry) {
// list full
return false;
}
retained_list[i].topic = (uint8_t *) os_malloc(os_strlen(topic) + 1);
if (retained_list[i].topic == NULL) {
// out of mem
return false;
}
os_strcpy(retained_list[i].topic, topic);
}
// if empty new data - delete
if (data_len == 0) {
os_free(retained_list[i].topic);
retained_list[i].topic = NULL;
os_free(retained_list[i].data);
retained_list[i].data = NULL;
retained_list[i].data_len = 0;
if (retained_cb != NULL)
retained_cb(NULL);
return true;
}
if (retained_list[i].data == NULL) {
// no data till now, new memory allocation
retained_list[i].data = (uint8_t *) os_malloc(data_len);
} else {
if (data_len != retained_list[i].data_len) {
// not same size as before, new memory allocation
os_free(retained_list[i].data);
retained_list[i].data = (uint8_t *) os_malloc(data_len);
}
}
if (retained_list[i].data == NULL) {
// out of mem
os_free(retained_list[i].topic);
retained_list[i].topic = NULL;
retained_list[i].data_len = 0;
return false;
}
os_memcpy(retained_list[i].data, data, data_len);
retained_list[i].data_len = data_len;
retained_list[i].qos = qos;
if (retained_cb != NULL)
retained_cb(&retained_list[i]);
return true;
}
bool ICACHE_FLASH_ATTR find_retainedtopic(uint8_t * topic, find_retainedtopic_cb cb, void *user_data) {
uint16_t i;
bool retval = false;
if (retained_list == NULL)
return false;
for (i = 0; i < max_entry; i++) {
if (retained_list[i].topic != NULL) {
if (Topics_matches(topic, 1, retained_list[i].topic)) {
(*cb) (&retained_list[i], user_data);
retval = true;
}
}
}
return retval;
}
void ICACHE_FLASH_ATTR iterate_retainedtopics(iterate_retainedtopic_cb cb, void *user_data) {
uint16_t i;
if (retained_list == NULL)
return;
for (i = 0; i < max_entry; i++) {
if (retained_list[i].topic != NULL) {
if ((*cb) (&retained_list[i], user_data) == true)
return;
}
}
}
bool ICACHE_FLASH_ATTR clear_cb(retained_entry *entry, void *user_data) {
update_retainedtopic(entry->topic, "", 0, entry->qos);
return false;
}
void ICACHE_FLASH_ATTR clear_retainedtopics() {
iterate_retainedtopics(clear_cb, NULL);
}
int ICACHE_FLASH_ATTR serialize_retainedtopics(char *buf, int len) {
uint16_t i;
uint16_t pos = 0;
if (retained_list == NULL)
return 0;
for (i = 0; i < max_entry; i++) {
if (retained_list[i].topic != NULL) {
uint16_t data_len = retained_list[i].data_len;
if (pos + os_strlen(retained_list[i].topic) + 4 + data_len + 1 >= len-1)
return 0;
os_strcpy(&buf[pos], retained_list[i].topic);
pos += os_strlen(retained_list[i].topic) + 1;
buf[pos++] = data_len & 0xff;
buf[pos++] = (data_len >> 8) & 0xff;
os_memcpy(&buf[pos], retained_list[i].data, data_len);
pos += data_len;
buf[pos++] = retained_list[i].qos;
buf[pos] = '\0';
}
}
if (pos == 0) {
buf[pos++] = '\0';
}
return pos;
}
bool ICACHE_FLASH_ATTR deserialize_retainedtopics(char *buf, int len) {
uint16_t pos = 0;
while (pos < len && buf[pos] != '\0') {
uint8_t *topic = &buf[pos];
pos += os_strlen(topic) + 1;
if (pos >= len) return false;
uint16_t data_len = buf[pos++] + (buf[pos++] << 8);
uint8_t *data = &buf[pos];
pos += data_len;
if (pos >= len) return false;
uint8_t qos = buf[pos++];
if (update_retainedtopic(topic, data, data_len, qos) == false)
return false;
}
return true;
}
void ICACHE_FLASH_ATTR set_on_retainedtopic_cb(on_retainedtopic_cb cb) {
retained_cb = cb;
}

Wyświetl plik

@ -1,946 +0,0 @@
#include "user_interface.h"
#include "mem.h"
#include "mqtt_server.h"
#include "mqtt_topics.h"
#include "mqtt_topiclist.h"
#include "mqtt_retainedlist.h"
#include "debug.h"
/* Mem Debug
#undef os_free
#define os_free(x) {os_printf("F:%d-> %x\r\n", __LINE__,(x));vPortFree(x, "", 0);}
int my_os_zalloc(int len, int line) {
int _v = pvPortZalloc(len, "", 0);
os_printf("A:%d-> %x (%d)\r\n", line, _v, len);
return _v;
}
#undef os_zalloc
#define os_zalloc(x) my_os_zalloc(x, __LINE__)
#undef os_malloc
#define os_malloc(x) my_os_zalloc(x, __LINE__)
*/
#define MAX_SUBS_PER_REQ 16
#define MQTT_SERVER_TASK_PRIO 1
#define MQTT_TASK_QUEUE_SIZE 1
#define MQTT_SEND_TIMOUT 5
os_event_t mqtt_procServerTaskQueue[MQTT_TASK_QUEUE_SIZE];
LOCAL uint8_t zero_len_id[2] = { 0, 0 };
MQTT_ClientCon *clientcon_list;
LOCAL MqttDataCallback local_data_cb = NULL;
LOCAL MqttConnectCallback local_connect_cb = NULL;
LOCAL MqttAuthCallback local_auth_cb = NULL;
//#undef MQTT_INFO
//#define MQTT_INFO os_printf
#define MQTT_WARNING os_printf
#define MQTT_ERROR os_printf
bool ICACHE_FLASH_ATTR print_topic(topic_entry * topic, void *user_data) {
if (topic->clientcon == LOCAL_MQTT_CLIENT) {
MQTT_INFO("MQTT: Client: LOCAL Topic: \"%s\" QoS: %d\r\n", topic->topic, topic->qos);
} else {
MQTT_INFO("MQTT: Client: %s Topic: \"%s\" QoS: %d\r\n", topic->clientcon->connect_info.client_id, topic->topic,
topic->qos);
}
return false;
}
bool ICACHE_FLASH_ATTR publish_topic(topic_entry * topic_e, uint8_t * topic, uint8_t * data, uint16_t data_len) {
MQTT_ClientCon *clientcon = topic_e->clientcon;
uint16_t message_id = 0;
if (topic_e->clientcon == LOCAL_MQTT_CLIENT) {
MQTT_INFO("MQTT: Client: LOCAL Topic: \"%s\" QoS: %d\r\n", topic_e->topic, topic_e->qos);
if (local_data_cb != NULL)
local_data_cb(NULL, topic, os_strlen(topic), data, data_len);
return true;
}
MQTT_INFO("MQTT: Client: %s Topic: \"%s\" QoS: %d\r\n", clientcon->connect_info.client_id, topic_e->topic,
topic_e->qos);
clientcon->mqtt_state.outbound_message =
mqtt_msg_publish(&clientcon->mqtt_state.mqtt_connection, topic, data, data_len, topic_e->qos, 0, &message_id);
if (QUEUE_Puts
(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data,
clientcon->mqtt_state.outbound_message->length) == -1) {
MQTT_ERROR("MQTT: Queue full\r\n");
return false;
}
return true;
}
bool ICACHE_FLASH_ATTR publish_retainedtopic(retained_entry * entry, void* user_data) {
uint16_t message_id = 0;
MQTT_ClientCon *clientcon = (MQTT_ClientCon *)user_data;
MQTT_INFO("MQTT: Client: %s Topic: \"%s\" QoS: %d\r\n", clientcon->connect_info.client_id, entry->topic,
entry->qos);
clientcon->mqtt_state.outbound_message =
mqtt_msg_publish(&clientcon->mqtt_state.mqtt_connection, entry->topic, entry->data, entry->data_len, entry->qos,
1, &message_id);
if (QUEUE_Puts
(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data,
clientcon->mqtt_state.outbound_message->length) == -1) {
MQTT_ERROR("MQTT: Queue full\r\n");
return false;
}
return true;
}
bool ICACHE_FLASH_ATTR delete_client_by_id(const uint8_t *id) {
MQTT_ClientCon *clientcon = clientcon_list;
for (clientcon = clientcon_list; clientcon != NULL; clientcon = clientcon->next) {
if (os_strcmp(id, clientcon->connect_info.client_id) == 0) {
MQTT_INFO("MQTT: Disconnect client: %s\r\n", clientcon->connect_info.client_id);
clientcon->connState = TCP_DISCONNECT;
system_os_post(MQTT_SERVER_TASK_PRIO, 0, (os_param_t) clientcon);
return true;
}
}
return true;
}
bool ICACHE_FLASH_ATTR activate_next_client() {
MQTT_ClientCon *clientcon = clientcon_list;
for (clientcon = clientcon_list; clientcon != NULL; clientcon = clientcon->next) {
if ((!QUEUE_IsEmpty(&clientcon->msgQueue)) && clientcon->pCon->state != ESPCONN_CLOSE) {
MQTT_INFO("MQTT: Next message to client: %s\r\n", clientcon->connect_info.client_id);
system_os_post(MQTT_SERVER_TASK_PRIO, 0, (os_param_t) clientcon);
return true;
}
}
return true;
}
static uint8_t shared_out_buffer[MQTT_BUF_SIZE];
bool ICACHE_FLASH_ATTR MQTT_server_initClientCon(MQTT_ClientCon * mqttClientCon) {
uint32_t temp;
MQTT_INFO("MQTT:InitClientCon\r\n");
mqttClientCon->connState = TCP_CONNECTED;
os_memset(&mqttClientCon->connect_info, 0, sizeof(mqtt_connect_info_t));
mqttClientCon->connect_info.client_id = zero_len_id;
mqttClientCon->protocolVersion = 0;
mqttClientCon->mqtt_state.in_buffer = (uint8_t *) os_zalloc(MQTT_BUF_SIZE);
mqttClientCon->mqtt_state.in_buffer_length = MQTT_BUF_SIZE;
// mqttClientCon->mqtt_state.out_buffer = (uint8_t *) os_zalloc(MQTT_BUF_SIZE);
mqttClientCon->mqtt_state.out_buffer = shared_out_buffer;
mqttClientCon->mqtt_state.out_buffer_length = MQTT_BUF_SIZE;
mqttClientCon->mqtt_state.connect_info = &mqttClientCon->connect_info;
mqtt_msg_init(&mqttClientCon->mqtt_state.mqtt_connection, mqttClientCon->mqtt_state.out_buffer,
mqttClientCon->mqtt_state.out_buffer_length);
QUEUE_Init(&mqttClientCon->msgQueue, QUEUE_BUFFER_SIZE);
mqttClientCon->next = clientcon_list;
clientcon_list = mqttClientCon;
return true;
}
uint16_t ICACHE_FLASH_ATTR MQTT_server_countClientCon() {
MQTT_ClientCon *p;
uint16_t count = 0;
for (p = clientcon_list; p != NULL; p = p->next, count++);
return count;
}
bool ICACHE_FLASH_ATTR MQTT_server_deleteClientCon(MQTT_ClientCon * mqttClientCon) {
MQTT_INFO("MQTT:DeleteClientCon\r\n");
if (mqttClientCon == NULL)
return;
os_timer_disarm(&mqttClientCon->mqttTimer);
if (mqttClientCon->pCon != NULL) {
espconn_delete(mqttClientCon->pCon);
}
MQTT_ClientCon **p = &clientcon_list;
while (*p != mqttClientCon && *p != NULL) {
p = &((*p)->next);
}
if (*p == mqttClientCon)
*p = (*p)->next;
if (mqttClientCon->user_data != NULL) {
os_free(mqttClientCon->user_data);
mqttClientCon->user_data = NULL;
}
if (mqttClientCon->mqtt_state.in_buffer != NULL) {
os_free(mqttClientCon->mqtt_state.in_buffer);
mqttClientCon->mqtt_state.in_buffer = NULL;
}
/* We use one static buffer for all connections
// if (mqttClientCon->mqtt_state.out_buffer != NULL) {
// os_free(mqttClientCon->mqtt_state.out_buffer);
// mqttClientCon->mqtt_state.out_buffer = NULL;
// }
if (mqttClientCon->mqtt_state.outbound_message != NULL) {
if (mqttClientCon->mqtt_state.outbound_message->data != NULL) {
// Don't think, this is has ever been allocated separately
// os_free(mqttClientCon->mqtt_state.outbound_message->data);
mqttClientCon->mqtt_state.outbound_message->data = NULL;
}
}
*/
if (mqttClientCon->mqtt_state.mqtt_connection.buffer != NULL) {
// Already freed but not NULL
mqttClientCon->mqtt_state.mqtt_connection.buffer = NULL;
}
if (mqttClientCon->connect_info.client_id != NULL) {
/* Don't attempt to free if it's the zero_len array */
if (((uint8_t *) mqttClientCon->connect_info.client_id) != zero_len_id)
os_free(mqttClientCon->connect_info.client_id);
mqttClientCon->connect_info.client_id = NULL;
}
if (mqttClientCon->connect_info.username != NULL) {
os_free(mqttClientCon->connect_info.username);
mqttClientCon->connect_info.username = NULL;
}
if (mqttClientCon->connect_info.password != NULL) {
os_free(mqttClientCon->connect_info.password);
mqttClientCon->connect_info.password = NULL;
}
if (mqttClientCon->connect_info.will_topic != NULL) {
// Publish the LWT
find_topic(mqttClientCon->connect_info.will_topic, publish_topic,
mqttClientCon->connect_info.will_data, mqttClientCon->connect_info.will_data_len);
activate_next_client();
if (mqttClientCon->connect_info.will_retain) {
update_retainedtopic(mqttClientCon->connect_info.will_topic, mqttClientCon->connect_info.will_data,
mqttClientCon->connect_info.will_data_len, mqttClientCon->connect_info.will_qos);
}
os_free(mqttClientCon->connect_info.will_topic);
mqttClientCon->connect_info.will_topic = NULL;
}
if (mqttClientCon->connect_info.will_data != NULL) {
os_free(mqttClientCon->connect_info.will_data);
mqttClientCon->connect_info.will_data = NULL;
}
if (mqttClientCon->msgQueue.buf != NULL) {
os_free(mqttClientCon->msgQueue.buf);
mqttClientCon->msgQueue.buf = NULL;
}
delete_topic(mqttClientCon, 0);
os_free(mqttClientCon);
return true;
}
void ICACHE_FLASH_ATTR MQTT_server_cleanupClientCons() {
MQTT_ClientCon *clientcon, *clientcon_tmp;
for (clientcon = clientcon_list; clientcon != NULL; ) {
clientcon_tmp = clientcon;
clientcon = clientcon->next;
if (clientcon_tmp->pCon->state == ESPCONN_CLOSE) {
MQTT_server_deleteClientCon(clientcon_tmp);
}
}
}
void ICACHE_FLASH_ATTR MQTT_server_disconnectClientCon(MQTT_ClientCon * mqttClientCon) {
MQTT_INFO("MQTT:ServerDisconnect\r\n");
mqttClientCon->mqtt_state.message_length_read = 0;
mqttClientCon->connState = TCP_DISCONNECT;
system_os_post(MQTT_SERVER_TASK_PRIO, 0, (os_param_t) mqttClientCon);
os_timer_disarm(&mqttClientCon->mqttTimer);
}
void ICACHE_FLASH_ATTR mqtt_server_timer(void *arg) {
MQTT_ClientCon *clientcon = (MQTT_ClientCon *) arg;
if (clientcon->sendTimeout > 0)
clientcon->sendTimeout--;
}
static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, unsigned short len) {
uint8_t msg_type;
uint8_t msg_qos;
uint16_t msg_id;
enum mqtt_connect_flag msg_conn_ret;
uint16_t topic_index;
uint16_t topic_len;
uint8_t *topic_str;
uint8_t topic_buffer[MQTT_BUF_SIZE];
uint16_t data_len;
uint8_t *data;
struct espconn *pCon = (struct espconn *)arg;
MQTT_INFO("MQTT_ClientCon_recv_cb(): %d bytes of data received\n", len);
MQTT_ClientCon *clientcon = (MQTT_ClientCon *) pCon->reverse;
if (clientcon == NULL) {
MQTT_ERROR("ERROR: No client status\r\n");
return;
}
MQTT_INFO("MQTT: TCP: data received %d bytes (State: %d)\r\n", len, clientcon->connState);
// Expect minimum the full fixed size header
if (len + clientcon->mqtt_state.message_length_read > MQTT_BUF_SIZE || len < 2) {
MQTT_ERROR("MQTT: Message too short/long\r\n");
clientcon->mqtt_state.message_length_read = 0;
return;
}
READPACKET:
os_memcpy(&clientcon->mqtt_state.in_buffer[clientcon->mqtt_state.message_length_read], pdata, len);
clientcon->mqtt_state.message_length_read += len;
clientcon->mqtt_state.message_length =
mqtt_get_total_length(clientcon->mqtt_state.in_buffer, clientcon->mqtt_state.message_length_read);
MQTT_INFO("MQTT: total_len: %d\r\n", clientcon->mqtt_state.message_length);
if (clientcon->mqtt_state.message_length > clientcon->mqtt_state.message_length_read) {
MQTT_WARNING("MQTT: Partial message received\r\n");
return;
}
msg_type = mqtt_get_type(clientcon->mqtt_state.in_buffer);
MQTT_INFO("MQTT: message_type: %d\r\n", msg_type);
//msg_qos = mqtt_get_qos(clientcon->mqtt_state.in_buffer);
switch (clientcon->connState) {
case TCP_CONNECTED:
switch (msg_type) {
case MQTT_MSG_TYPE_CONNECT:
MQTT_INFO("MQTT: Connect received, message_len: %d\r\n", clientcon->mqtt_state.message_length);
if (clientcon->mqtt_state.message_length < sizeof(struct mqtt_connect_variable_header) + 3) {
MQTT_ERROR("MQTT: Too short Connect message\r\n");
MQTT_server_disconnectClientCon(clientcon);
return;
}
struct mqtt_connect_variable_header4 *variable_header =
(struct mqtt_connect_variable_header4 *)&clientcon->mqtt_state.in_buffer[2];
uint16_t var_header_len = sizeof(struct mqtt_connect_variable_header4);
// We check MQTT v3.11 (version 4)
if ((variable_header->lengthMsb << 8) + variable_header->lengthLsb == 4 &&
variable_header->version == 4 && os_strncmp(variable_header->magic, "MQTT", 4) == 0) {
clientcon->protocolVersion = 4;
} else {
struct mqtt_connect_variable_header3 *variable_header3 =
(struct mqtt_connect_variable_header3 *)&clientcon->mqtt_state.in_buffer[2];
var_header_len = sizeof(struct mqtt_connect_variable_header3);
// We check MQTT v3.1 (version 3)
if ((variable_header3->lengthMsb << 8) + variable_header3->lengthLsb == 6 &&
variable_header3->version == 3 && os_strncmp(variable_header3->magic, "MQIsdp", 6) == 0) {
clientcon->protocolVersion = 3;
// adapt the remaining header fields (dirty as we overlay the two structs of different length)
variable_header->version = variable_header3->version;
variable_header->flags = variable_header3->flags;
variable_header->keepaliveMsb = variable_header3->keepaliveMsb;
variable_header->keepaliveLsb = variable_header3->keepaliveLsb;
} else {
// Neither found
MQTT_WARNING("MQTT: Wrong protocoll version\r\n");
msg_conn_ret = CONNECTION_REFUSE_PROTOCOL;
clientcon->connState = TCP_DISCONNECTING;
break;
}
}
uint16_t msg_used_len = var_header_len;
MQTT_INFO("MQTT: Connect flags %x\r\n", variable_header->flags);
clientcon->connect_info.clean_session = (variable_header->flags & MQTT_CONNECT_FLAG_CLEAN_SESSION) != 0;
clientcon->connect_info.keepalive = (variable_header->keepaliveMsb << 8) + variable_header->keepaliveLsb;
espconn_regist_time(clientcon->pCon, 2 * clientcon->connect_info.keepalive, 1);
MQTT_INFO("MQTT: Keepalive %d\r\n", clientcon->connect_info.keepalive);
// Get the client id
uint16_t id_len = clientcon->mqtt_state.message_length - (2 + msg_used_len);
const char *client_id = mqtt_get_str(&clientcon->mqtt_state.in_buffer[2 + msg_used_len], &id_len);
if (client_id == NULL || id_len > 80) {
MQTT_WARNING("MQTT: Client Id invalid\r\n");
msg_conn_ret = CONNECTION_REFUSE_ID_REJECTED;
clientcon->connState = TCP_DISCONNECTING;
break;
}
if (id_len == 0) {
if (clientcon->protocolVersion == 3) {
MQTT_WARNING("MQTT: Empty client Id in MQTT 3.1\r\n");
msg_conn_ret = CONNECTION_REFUSE_ID_REJECTED;
clientcon->connState = TCP_DISCONNECTING;
break;
}
if (!clientcon->connect_info.clean_session) {
MQTT_WARNING("MQTT: Null client Id and NOT cleansession\r\n");
msg_conn_ret = CONNECTION_REFUSE_ID_REJECTED;
clientcon->connState = TCP_DISCONNECTING;
break;
}
clientcon->connect_info.client_id = zero_len_id;
} else {
uint8_t *new_id = (char *)os_zalloc(id_len + 1);
if (new_id == NULL) {
MQTT_ERROR("MQTT: Out of mem\r\n");
msg_conn_ret = CONNECTION_REFUSE_SERVER_UNAVAILABLE;
clientcon->connState = TCP_DISCONNECTING;
break;
}
os_memcpy(new_id, client_id, id_len);
new_id[id_len] = '\0';
// Delete any existing status for that id
delete_client_by_id(client_id);
clientcon->connect_info.client_id = new_id;
MQTT_INFO("MQTT: Client id %s\r\n", clientcon->connect_info.client_id);
}
msg_used_len += 2 + id_len;
// Get the LWT
clientcon->connect_info.will_retain = (variable_header->flags & MQTT_CONNECT_FLAG_WILL_RETAIN) != 0;
clientcon->connect_info.will_qos = (variable_header->flags & 0x18) >> 3;
if (!(variable_header->flags & MQTT_CONNECT_FLAG_WILL)) {
// Must be all 0 if no lwt is given
if (clientcon->connect_info.will_retain || clientcon->connect_info.will_qos) {
MQTT_WARNING("MQTT: Last Will flags invalid\r\n");
MQTT_server_disconnectClientCon(clientcon);
return;
}
} else {
uint16_t lw_topic_len = clientcon->mqtt_state.message_length - (2 + msg_used_len);
const char *lw_topic =
mqtt_get_str(&clientcon->mqtt_state.in_buffer[2 + msg_used_len], &lw_topic_len);
if (lw_topic == NULL) {
MQTT_WARNING("MQTT: Last Will topic invalid\r\n");
MQTT_server_disconnectClientCon(clientcon);
return;
}
clientcon->connect_info.will_topic = (char *)os_zalloc(lw_topic_len + 1);
if (clientcon->connect_info.will_topic != NULL) {
os_memcpy(clientcon->connect_info.will_topic, lw_topic, lw_topic_len);
clientcon->connect_info.will_topic[lw_topic_len] = 0;
if (Topics_hasWildcards(clientcon->connect_info.will_topic)) {
MQTT_WARNING("MQTT: Last Will topic has wildcards\r\n");
MQTT_server_disconnectClientCon(clientcon);
return;
}
MQTT_INFO("MQTT: LWT topic %s\r\n", clientcon->connect_info.will_topic);
} else {
MQTT_ERROR("MQTT: Out of mem\r\n");
msg_conn_ret = CONNECTION_REFUSE_SERVER_UNAVAILABLE;
clientcon->connState = TCP_DISCONNECTING;
break;
}
msg_used_len += 2 + lw_topic_len;
uint16_t lw_data_len =
clientcon->mqtt_state.message_length - (2 + msg_used_len);
const char *lw_data =
mqtt_get_str(&clientcon->mqtt_state.in_buffer[2 + msg_used_len],
&lw_data_len);
if (lw_data == NULL) {
MQTT_WARNING("MQTT: Last Will data invalid\r\n");
MQTT_server_disconnectClientCon(clientcon);
return;
}
clientcon->connect_info.will_data = (char *)os_zalloc(lw_data_len);
clientcon->connect_info.will_data_len = lw_data_len;
if (clientcon->connect_info.will_data != NULL) {
os_memcpy(clientcon->connect_info.will_data, lw_data, lw_data_len);
MQTT_INFO("MQTT: %d bytes of LWT data\r\n", clientcon->connect_info.will_data_len);
} else {
MQTT_ERROR("MQTT: Out of mem\r\n");
msg_conn_ret = CONNECTION_REFUSE_SERVER_UNAVAILABLE;
clientcon->connState = TCP_DISCONNECTING;
break;
}
msg_used_len += 2 + lw_data_len;
}
// Get the username
if ((variable_header->flags & MQTT_CONNECT_FLAG_USERNAME) != 0) {
uint16_t username_len = clientcon->mqtt_state.message_length - (2 + msg_used_len);
const char *username =
mqtt_get_str(&clientcon->mqtt_state.in_buffer[2 + msg_used_len], &username_len);
if (username == NULL) {
MQTT_WARNING("MQTT: Username invalid\r\n");
MQTT_server_disconnectClientCon(clientcon);
return;
}
clientcon->connect_info.username = (char *)os_zalloc(username_len+1);
if (clientcon->connect_info.username != NULL) {
os_memcpy(clientcon->connect_info.username, username, username_len);
clientcon->connect_info.username[username_len] = '\0';
MQTT_INFO("MQTT: Username %s\r\n", clientcon->connect_info.username);
} else {
MQTT_ERROR("MQTT: Out of mem\r\n");
msg_conn_ret = CONNECTION_REFUSE_SERVER_UNAVAILABLE;
clientcon->connState = TCP_DISCONNECTING;
break;
}
msg_used_len += 2 + username_len;
}
// Get the password
if ((variable_header->flags & MQTT_CONNECT_FLAG_PASSWORD) != 0) {
if ((variable_header->flags & MQTT_CONNECT_FLAG_USERNAME) == 0) {
MQTT_WARNING("MQTT: Password without username\r\n");
MQTT_server_disconnectClientCon(clientcon);
return;
}
uint16_t password_len = clientcon->mqtt_state.message_length - (2 + msg_used_len);
const char *password =
mqtt_get_str(&clientcon->mqtt_state.in_buffer[2 + msg_used_len], &password_len);
clientcon->connect_info.password = (char *)os_zalloc(password_len+1);
if (clientcon->connect_info.password != NULL) {
os_memcpy(clientcon->connect_info.password, password, password_len);
clientcon->connect_info.password[password_len] = '\0';
MQTT_INFO("MQTT: Password %s\r\n", clientcon->connect_info.password);
} else {
MQTT_ERROR("MQTT: Out of mem\r\n");
msg_conn_ret = CONNECTION_REFUSE_SERVER_UNAVAILABLE;
clientcon->connState = TCP_DISCONNECTING;
break;
}
msg_used_len += 2 + password_len;
}
// Check Auth
if ((local_auth_cb != NULL) &&
local_auth_cb(clientcon->connect_info.username==NULL?"":clientcon->connect_info.username,
clientcon->connect_info.password==NULL?"":clientcon->connect_info.password,
clientcon->pCon) == false) {
MQTT_WARNING("MQTT: Authorization failed\r\n");
if (clientcon->connect_info.will_topic != NULL) {
os_free(clientcon->connect_info.will_topic);
clientcon->connect_info.will_topic = NULL;
}
msg_conn_ret = CONNECTION_REFUSE_NOT_AUTHORIZED;
clientcon->connState = TCP_DISCONNECTING;
break;
}
msg_conn_ret = CONNECTION_ACCEPTED;
clientcon->connState = MQTT_DATA;
break;
default:
MQTT_WARNING("MQTT: Invalid message\r\n");
MQTT_server_disconnectClientCon(clientcon);
return;
}
clientcon->mqtt_state.outbound_message = mqtt_msg_connack(&clientcon->mqtt_state.mqtt_connection, msg_conn_ret);
if (QUEUE_Puts
(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data,
clientcon->mqtt_state.outbound_message->length) == -1) {
MQTT_ERROR("MQTT: Queue full\r\n");
}
break;
case MQTT_DATA:
switch (msg_type) {
uint8_t ret_codes[MAX_SUBS_PER_REQ];
uint8_t num_subs;
case MQTT_MSG_TYPE_SUBSCRIBE:
MQTT_INFO("MQTT: Subscribe received, message_len: %d\r\n", clientcon->mqtt_state.message_length);
// 2B fixed header + 2B variable header + 2 len + 1 char + 1 QoS
if (clientcon->mqtt_state.message_length < 8) {
MQTT_ERROR("MQTT: Too short Subscribe message\r\n");
MQTT_server_disconnectClientCon(clientcon);
return;
}
msg_id = mqtt_get_id(clientcon->mqtt_state.in_buffer, clientcon->mqtt_state.in_buffer_length);
MQTT_INFO("MQTT: Message id %d\r\n", msg_id);
topic_index = 4;
num_subs = 0;
while (topic_index < clientcon->mqtt_state.message_length && num_subs < MAX_SUBS_PER_REQ) {
topic_len = clientcon->mqtt_state.message_length - topic_index;
topic_str = mqtt_get_str(&clientcon->mqtt_state.in_buffer[topic_index], &topic_len);
if (topic_str == NULL) {
MQTT_WARNING("MQTT: Subscribe topic invalid\r\n");
MQTT_server_disconnectClientCon(clientcon);
return;
}
topic_index += 2 + topic_len;
if (topic_index >= clientcon->mqtt_state.message_length) {
MQTT_WARNING("MQTT: Subscribe QoS missing\r\n");
MQTT_server_disconnectClientCon(clientcon);
return;
}
uint8_t topic_QoS = clientcon->mqtt_state.in_buffer[topic_index++];
os_memcpy(topic_buffer, topic_str, topic_len);
topic_buffer[topic_len] = 0;
MQTT_INFO("MQTT: Subscribed topic %s QoS %d\r\n", topic_buffer, topic_QoS);
// the return codes, one per topic
// For now we always give back error or QoS = 0 !!
ret_codes[num_subs++] = add_topic(clientcon, topic_buffer, 0) ? 0 : 0x80;
//iterate_topics(print_topic, 0);
}
MQTT_INFO("MQTT: Subscribe successful\r\n");
clientcon->mqtt_state.outbound_message =
mqtt_msg_suback(&clientcon->mqtt_state.mqtt_connection, ret_codes, num_subs, msg_id);
if (QUEUE_Puts
(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data,
clientcon->mqtt_state.outbound_message->length) == -1) {
MQTT_ERROR("MQTT: Queue full\r\n");
}
find_retainedtopic(topic_buffer, publish_retainedtopic, clientcon);
break;
case MQTT_MSG_TYPE_UNSUBSCRIBE:
MQTT_INFO("MQTT: Unsubscribe received, message_len: %d\r\n", clientcon->mqtt_state.message_length);
// 2B fixed header + 2B variable header + 2 len + 1 char
if (clientcon->mqtt_state.message_length < 7) {
MQTT_ERROR("MQTT: Too short Unsubscribe message\r\n");
MQTT_server_disconnectClientCon(clientcon);
return;
}
msg_id = mqtt_get_id(clientcon->mqtt_state.in_buffer, clientcon->mqtt_state.in_buffer_length);
MQTT_INFO("MQTT: Message id %d\r\n", msg_id);
topic_index = 4;
while (topic_index < clientcon->mqtt_state.message_length) {
uint16_t topic_len = clientcon->mqtt_state.message_length - topic_index;
char *topic_str = mqtt_get_str(&clientcon->mqtt_state.in_buffer[topic_index], &topic_len);
if (topic_str == NULL) {
MQTT_WARNING("MQTT: Subscribe topic invalid\r\n");
MQTT_server_disconnectClientCon(clientcon);
return;
}
topic_index += 2 + topic_len;
os_memcpy(topic_buffer, topic_str, topic_len);
topic_buffer[topic_len] = 0;
MQTT_INFO("MQTT: Unsubscribed topic %s\r\n", topic_buffer);
delete_topic(clientcon, topic_buffer);
//iterate_topics(print_topic, 0);
}
MQTT_INFO("MQTT: Unubscribe successful\r\n");
clientcon->mqtt_state.outbound_message = mqtt_msg_unsuback(&clientcon->mqtt_state.mqtt_connection, msg_id);
if (QUEUE_Puts
(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data,
clientcon->mqtt_state.outbound_message->length) == -1) {
MQTT_ERROR("MQTT: Queue full\r\n");
}
break;
case MQTT_MSG_TYPE_PUBLISH:
MQTT_INFO("MQTT: Publish received, message_len: %d\r\n", clientcon->mqtt_state.message_length);
/* if (msg_qos == 1)
clientcon->mqtt_state.outbound_message = mqtt_msg_puback(&clientcon->mqtt_state.mqtt_connection, msg_id);
else if (msg_qos == 2)
clientcon->mqtt_state.outbound_message = mqtt_msg_pubrec(&clientcon->mqtt_state.mqtt_connection, msg_id);
if (msg_qos == 1 || msg_qos == 2) {
MQTT_INFO("MQTT: Queue response QoS: %d\r\n", msg_qos);
if (QUEUE_Puts(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data, clientcon->mqtt_state.outbound_message->length) == -1) {
MQTT_ERROR("MQTT: Queue full\r\n");
}
}
*/
topic_len = clientcon->mqtt_state.in_buffer_length;
topic_str = (uint8_t *) mqtt_get_publish_topic(clientcon->mqtt_state.in_buffer, &topic_len);
os_memcpy(topic_buffer, topic_str, topic_len);
topic_buffer[topic_len] = 0;
data_len = clientcon->mqtt_state.in_buffer_length;
data = (uint8_t *) mqtt_get_publish_data(clientcon->mqtt_state.in_buffer, &data_len);
MQTT_INFO("MQTT: Published topic \"%s\"\r\n", topic_buffer);
MQTT_INFO("MQTT: Matches to:\r\n");
// Now find, if anything matches and enque publish message
find_topic(topic_buffer, publish_topic, data, data_len);
if (mqtt_get_retain(clientcon->mqtt_state.in_buffer)) {
update_retainedtopic(topic_buffer, data, data_len, mqtt_get_qos(clientcon->mqtt_state.in_buffer));
}
break;
case MQTT_MSG_TYPE_PINGREQ:
MQTT_INFO("MQTT: receive MQTT_MSG_TYPE_PINGREQ\r\n");
clientcon->mqtt_state.outbound_message = mqtt_msg_pingresp(&clientcon->mqtt_state.mqtt_connection);
if (QUEUE_Puts
(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data,
clientcon->mqtt_state.outbound_message->length) == -1) {
MQTT_ERROR("MQTT: Queue full\r\n");
}
break;
case MQTT_MSG_TYPE_DISCONNECT:
MQTT_INFO("MQTT: receive MQTT_MSG_TYPE_DISCONNECT\r\n");
// Clean session close: no LWT
if (clientcon->connect_info.will_topic != NULL) {
os_free(clientcon->connect_info.will_topic);
clientcon->connect_info.will_topic = NULL;
}
MQTT_server_disconnectClientCon(clientcon);
return;
/*
case MQTT_MSG_TYPE_PUBACK:
if (clientcon->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && clientcon->mqtt_state.pending_msg_id == msg_id) {
MQTT_INFO("MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\r\n");
}
break;
case MQTT_MSG_TYPE_PUBREC:
clientcon->mqtt_state.outbound_message = mqtt_msg_pubrel(&clientcon->mqtt_state.mqtt_connection, msg_id);
if (QUEUE_Puts(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data, clientcon->mqtt_state.outbound_message->length) == -1) {
MQTT_ERROR("MQTT: Queue full\r\n");
}
break;
case MQTT_MSG_TYPE_PUBREL:
clientcon->mqtt_state.outbound_message = mqtt_msg_pubcomp(&clientcon->mqtt_state.mqtt_connection, msg_id);
if (QUEUE_Puts(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data, clientcon->mqtt_state.outbound_message->length) == -1) {
MQTT_ERROR("MQTT: Queue full\r\n");
}
break;
case MQTT_MSG_TYPE_PUBCOMP:
if (clientcon->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && clientcon->mqtt_state.pending_msg_id == msg_id) {
MQTT_INFO("MQTT: receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish\r\n");
}
break;
case MQTT_MSG_TYPE_PINGRESP:
// Ignore
break;
*/
default:
// Ignore
break;
}
break;
}
// More than one MQTT command in the packet?
len = clientcon->mqtt_state.message_length_read;
if (clientcon->mqtt_state.message_length < len) {
len -= clientcon->mqtt_state.message_length;
pdata += clientcon->mqtt_state.message_length;
clientcon->mqtt_state.message_length_read = 0;
MQTT_INFO("MQTT: Get another received message\r\n");
goto READPACKET;
}
clientcon->mqtt_state.message_length_read = 0;
if (msg_type != MQTT_MSG_TYPE_PUBLISH) {
system_os_post(MQTT_SERVER_TASK_PRIO, 0, (os_param_t) clientcon);
} else {
activate_next_client();
}
}
/* Called when a client has disconnected from the MQTT server */
static void ICACHE_FLASH_ATTR MQTT_ClientCon_discon_cb(void *arg) {
struct espconn *pCon = (struct espconn *)arg;
MQTT_ClientCon *clientcon = (MQTT_ClientCon *) pCon->reverse;
MQTT_INFO("MQTT_ClientCon_discon_cb(): client disconnected\n");
MQTT_server_deleteClientCon(clientcon);
}
static void ICACHE_FLASH_ATTR MQTT_ClientCon_sent_cb(void *arg) {
struct espconn *pCon = (struct espconn *)arg;
MQTT_ClientCon *clientcon = (MQTT_ClientCon *) pCon->reverse;
MQTT_INFO("MQTT_ClientCon_sent_cb(): Data sent\n");
clientcon->sendTimeout = 0;
if (clientcon->connState == TCP_DISCONNECTING) {
clientcon->connState = TCP_DISCONNECT;
system_os_post(MQTT_SERVER_TASK_PRIO, 0, (os_param_t) clientcon);
}
activate_next_client();
}
/* Called when a client connects to the MQTT server */
static void ICACHE_FLASH_ATTR MQTT_ClientCon_connected_cb(void *arg) {
struct espconn *pespconn = (struct espconn *)arg;
MQTT_ClientCon *mqttClientCon;
pespconn->reverse = NULL;
MQTT_INFO("MQTT_ClientCon_connected_cb(): Client connected\r\n");
espconn_regist_sentcb(pespconn, MQTT_ClientCon_sent_cb);
espconn_regist_disconcb(pespconn, MQTT_ClientCon_discon_cb);
espconn_regist_recvcb(pespconn, MQTT_ClientCon_recv_cb);
espconn_regist_time(pespconn, 30, 1);
mqttClientCon = (MQTT_ClientCon *) os_zalloc(sizeof(MQTT_ClientCon));
pespconn->reverse = mqttClientCon;
if (mqttClientCon == NULL) {
MQTT_ERROR("ERROR: Cannot allocate client status\r\n");
return;
}
mqttClientCon->pCon = pespconn;
bool no_mem = (system_get_free_heap_size() < (MQTT_BUF_SIZE + QUEUE_BUFFER_SIZE + 0x400));
if (no_mem) {
MQTT_ERROR("ERROR: No mem for new client connection\r\n");
}
if (no_mem || (local_connect_cb != NULL && local_connect_cb(pespconn, MQTT_server_countClientCon()+1) == false)) {
mqttClientCon->connState = TCP_DISCONNECT;
system_os_post(MQTT_SERVER_TASK_PRIO, 0, (os_param_t) mqttClientCon);
return;
}
MQTT_server_initClientCon(mqttClientCon);
os_timer_setfn(&mqttClientCon->mqttTimer, (os_timer_func_t *) mqtt_server_timer, mqttClientCon);
os_timer_arm(&mqttClientCon->mqttTimer, 1000, 1);
}
void ICACHE_FLASH_ATTR MQTT_ServerTask(os_event_t * e) {
MQTT_ClientCon *clientcon = (MQTT_ClientCon *) e->par;
uint8_t dataBuffer[MQTT_BUF_SIZE];
uint16_t dataLen;
if (e->par == 0)
return;
MQTT_INFO("MQTT: Server task activated - state %d\r\n", clientcon->connState);
switch (clientcon->connState) {
case TCP_DISCONNECT:
MQTT_INFO("MQTT: Disconnect\r\n");
espconn_disconnect(clientcon->pCon);
break;
case TCP_DISCONNECTING:
case MQTT_DATA:
if (!QUEUE_IsEmpty(&clientcon->msgQueue) && clientcon->sendTimeout == 0 &&
QUEUE_Gets(&clientcon->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0) {
clientcon->mqtt_state.pending_msg_type = mqtt_get_type(dataBuffer);
clientcon->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen);
clientcon->sendTimeout = MQTT_SEND_TIMOUT;
MQTT_INFO("MQTT: Sending, type: %d, id: %04X\r\n", clientcon->mqtt_state.pending_msg_type,
clientcon->mqtt_state.pending_msg_id);
espconn_send(clientcon->pCon, dataBuffer, dataLen);
clientcon->mqtt_state.outbound_message = NULL;
break;
}
if (clientcon->connState == TCP_DISCONNECTING) {
MQTT_server_disconnectClientCon(clientcon);
}
break;
}
}
bool ICACHE_FLASH_ATTR MQTT_server_start(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics) {
MQTT_INFO("Starting MQTT server on port %d\r\n", portno);
if (!create_topiclist(max_subscriptions))
return false;
if (!create_retainedlist(max_retained_topics))
return false;
clientcon_list = NULL;
struct espconn *pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));
if (pCon == NULL)
return false;
/* Equivalent to bind */
pCon->type = ESPCONN_TCP;
pCon->state = ESPCONN_NONE;
pCon->proto.tcp = (esp_tcp *) os_zalloc(sizeof(esp_tcp));
if (pCon->proto.tcp == NULL) {
os_free(pCon);
return false;
}
pCon->proto.tcp->local_port = portno;
/* Register callback when clients connect to the server */
espconn_regist_connectcb(pCon, MQTT_ClientCon_connected_cb);
/* Put the connection in accept mode */
espconn_accept(pCon);
system_os_task(MQTT_ServerTask, MQTT_SERVER_TASK_PRIO, mqtt_procServerTaskQueue, MQTT_TASK_QUEUE_SIZE);
return true;
}
bool ICACHE_FLASH_ATTR MQTT_local_publish(uint8_t * topic, uint8_t * data, uint16_t data_length, uint8_t qos,
uint8_t retain) {
find_topic(topic, publish_topic, data, data_length);
if (retain)
update_retainedtopic(topic, data, data_length, qos);
activate_next_client();
return true;
}
bool ICACHE_FLASH_ATTR MQTT_local_subscribe(uint8_t * topic, uint8_t qos) {
return add_topic(LOCAL_MQTT_CLIENT, topic, 0);
}
bool ICACHE_FLASH_ATTR MQTT_local_unsubscribe(uint8_t * topic) {
return delete_topic(LOCAL_MQTT_CLIENT, topic);
}
void ICACHE_FLASH_ATTR MQTT_server_onData(MqttDataCallback dataCb) {
local_data_cb = dataCb;
}
void ICACHE_FLASH_ATTR MQTT_server_onConnect(MqttConnectCallback connectCb) {
local_connect_cb = connectCb;
}
void ICACHE_FLASH_ATTR MQTT_server_onAuth(MqttAuthCallback authCb) {
local_auth_cb = authCb;
}

Wyświetl plik

@ -1,92 +0,0 @@
#include "c_types.h"
#include "mem.h"
#include "ets_sys.h"
#include "osapi.h"
#include "os_type.h"
#include <string.h>
#include "user_config.h"
#include "mqtt_topiclist.h"
#include "mqtt_topics.h"
static topic_entry *topic_list = NULL;
static uint16_t max_entry;
bool ICACHE_FLASH_ATTR create_topiclist(uint16_t num_entires) {
max_entry = num_entires;
topic_list = (topic_entry *) os_zalloc(num_entires * sizeof(topic_entry));
return topic_list != NULL;
}
bool ICACHE_FLASH_ATTR add_topic(MQTT_ClientCon * clientcon, uint8_t * topic, uint8_t qos) {
uint16_t i;
if (topic_list == NULL)
return false;
if (!Topics_isValidName(topic))
return false;
for (i = 0; i < max_entry; i++) {
if (topic_list[i].clientcon == NULL) {
topic_list[i].topic = (uint8_t *) os_malloc(os_strlen(topic) + 1);
if (topic_list[i].topic == NULL)
return false;
os_strcpy(topic_list[i].topic, topic);
topic_list[i].clientcon = clientcon;
topic_list[i].qos = qos;
return true;
}
}
return false;
}
bool ICACHE_FLASH_ATTR delete_topic(MQTT_ClientCon * clientcon, uint8_t * topic) {
uint16_t i;
if (topic_list == NULL)
return false;
for (i = 0; i < max_entry; i++) {
if (topic_list[i].clientcon != NULL && (clientcon == NULL || topic_list[i].clientcon == clientcon)) {
if (topic == NULL || (topic_list[i].topic != NULL && strcmp(topic, topic_list[i].topic) == 0)) {
topic_list[i].clientcon = NULL;
os_free(topic_list[i].topic);
topic_list[i].qos = 0;
}
}
}
return true;
}
bool ICACHE_FLASH_ATTR find_topic(uint8_t * topic, find_topic_cb cb, uint8_t * data, uint16_t data_len) {
uint16_t i;
bool retval = false;
if (topic_list == NULL)
return false;
for (i = 0; i < max_entry; i++) {
if (topic_list[i].clientcon != NULL) {
if (Topics_matches(topic_list[i].topic, 1, topic)) {
(*cb) (&topic_list[i], topic, data, data_len);
retval = true;
}
}
}
return retval;
}
void ICACHE_FLASH_ATTR iterate_topics(iterate_topic_cb cb, void *user_data) {
uint16_t i;
if (topic_list == NULL)
return;
for (i = 0; i < max_entry; i++) {
if (topic_list[i].clientcon != NULL) {
if ((*cb) (&topic_list[i], user_data) == true)
return;
}
}
}

Wyświetl plik

@ -1,280 +0,0 @@
/*******************************************************************************
* Copyright (c) 2007, 2013 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Ian Craggs - initial API and implementation and/or initial documentation
*******************************************************************************/
/**
* @file
* Topic handling functions.
*
* Topic syntax matches that of other MQTT servers such as Micro broker.
*/
#include "mqtt_topics.h"
#include "os_type.h"
#include "osapi.h"
#include "mem.h"
#include "string.h"
/*
char *_strtok_r(char *s, const char *delim, char **last);
char *_strchr(const char *s, int c) {
while (*s != (char)c)
if (!*s++)
return 0;
return (char *)s;
}
*/
char ICACHE_FLASH_ATTR *_strdup(char *src) {
char *str;
char *p;
int len = 0;
while (src[len])
len++;
str = (char *)os_malloc(len + 1);
p = str;
while (*src)
*p++ = *src++;
*p = '\0';
return str;
}
/**
* Checks that the syntax of a topic string is correct.
* @param aName the topic name string
* @return boolean value indicating whether the topic name is valid
*/
int ICACHE_FLASH_ATTR Topics_isValidName(char *aName) {
int rc = true;
char *c = NULL;
int length = os_strlen(aName);
char *hashpos = strchr(aName, '#'); /* '#' wildcard can be only at the beginning or the end of a topic */
if (hashpos != NULL) {
char *second = strchr(hashpos + 1, '#');
if ((hashpos != aName && hashpos != aName + (length - 1)) || second != NULL)
rc = false;
}
/* '#' or '+' only next to a slash separator or end of name */
for (c = "#+"; *c != '\0'; ++c) {
char *pos = strchr(aName, *c);
while (pos != NULL) {
if (pos > aName) { /* check previous char is '/' */
if (*(pos - 1) != '/')
rc = false;
}
if (*(pos + 1) != '\0') { /* check that subsequent char is '/' */
if (*(pos + 1) != '/')
rc = false;
}
pos = strchr(pos + 1, *c);
}
}
return rc;
}
/**
* Reverse a string.
* Linux utility function for Linux to enable Windows/Linux portability
* @param astr the character string to reverse
* @return pointer to the reversed string which was reversed in place
*/
char ICACHE_FLASH_ATTR *_strrev(char *astr) {
char *forwards = astr;
int len = os_strlen(astr);
if (len > 1) {
char *backwards = astr + len - 1;
while (forwards < backwards) {
char temp = *forwards;
*forwards++ = *backwards;
*backwards-- = temp;
}
}
return astr;
}
/**
* Does a topic string contain wildcards?
* @param topic the topic name string
* @return boolean value indicating whether the topic contains a wildcard or not
*/
int ICACHE_FLASH_ATTR Topics_hasWildcards(char *topic) {
return (strchr(topic, '+') != NULL) || (strchr(topic, '#') != NULL);
}
/**
* Tests whether one topic string matches another where one can contain wildcards.
* @param wildTopic a topic name string that can contain wildcards
* @param topic a topic name string that must not contain wildcards
* @return boolean value indicating whether topic matches wildTopic
*/
int ICACHE_FLASH_ATTR Topics_matches(char *wildTopic, int wildcards, char *topic) {
int rc = false;
char *last1 = NULL, *last2 = NULL;
char *pwild = NULL, *pmatch = NULL;
if (!wildcards) {
rc = (os_strcmp(wildTopic, topic) == 0);
goto exit;
}
if (Topics_hasWildcards(topic)) {
//os_printf("Topics_matches: should not be wildcard in topic %s", topic);
goto exit;
}
if (!Topics_isValidName(wildTopic)) {
//os_printf("Topics_matches: invalid topic name %s", wildTopic);
goto exit;
}
if (!Topics_isValidName(topic)) {
//os_printf("Topics_matches: invalid topic name %s", topic);
goto exit;
}
if (strcmp(wildTopic, MULTI_LEVEL_WILDCARD) == 0 || /* Hash matches anything... */
strcmp(wildTopic, topic) == 0) {
rc = true;
goto exit;
}
if (strcmp(wildTopic, "/#") == 0) { /* Special case for /# matches anything starting with / */
rc = (topic[0] == '/') ? true : false;
goto exit;
}
/* because strtok will return bill when matching /bill/ or bill in a topic name for the first time,
* we have to check whether the first character is / explicitly.
*/
if ((wildTopic[0] == TOPIC_LEVEL_SEPARATOR[0]) && (topic[0] != TOPIC_LEVEL_SEPARATOR[0]))
goto exit;
if ((wildTopic[0] == SINGLE_LEVEL_WILDCARD[0]) && (topic[0] == TOPIC_LEVEL_SEPARATOR[0]))
goto exit;
/* We only match hash-first topics in reverse, for speed */
if (wildTopic[0] == MULTI_LEVEL_WILDCARD[0]) {
wildTopic = (char *)_strrev(_strdup(wildTopic));
topic = (char *)_strrev(_strdup(topic));
} else {
wildTopic = (char *)_strdup(wildTopic);
topic = (char *)_strdup(topic);
}
pwild = strtok_r(wildTopic, TOPIC_LEVEL_SEPARATOR, &last1);
pmatch = strtok_r(topic, TOPIC_LEVEL_SEPARATOR, &last2);
/* Step through the subscription, level by level */
while (pwild != NULL) {
/* Have we got # - if so, it matches anything. */
if (strcmp(pwild, MULTI_LEVEL_WILDCARD) == 0) {
rc = true;
break;
}
/* Nope - check for matches... */
if (pmatch != NULL) {
if (strcmp(pwild, SINGLE_LEVEL_WILDCARD) != 0 && strcmp(pwild, pmatch) != 0)
/* The two levels simply don't match... */
break;
} else
break; /* No more tokens to match against further tokens in the wildcard stream... */
pwild = strtok_r(NULL, TOPIC_LEVEL_SEPARATOR, &last1);
pmatch = strtok_r(NULL, TOPIC_LEVEL_SEPARATOR, &last2);
}
/* All tokens up to here matched, and we didn't end in #. If there
are any topic tokens remaining, the match is bad, otherwise it was
a good match. */
if (pmatch == NULL && pwild == NULL)
rc = true;
/* Now free the memory allocated in _strdup() */
os_free(wildTopic);
os_free(topic);
exit:
return rc;
} /* end matches */
#ifdef UNIT_TEST
#if !defined(ARRAY_SIZE)
/**
* Macro to calculate the number of entries in an array
*/
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
#endif
int ICACHE_FLASH_ATTR test() {
int i;
struct {
char *str;
} tests0[] = {
"#", "jj", "+/a", "adkj/a", "+/a", "adsjk/adakjd/a", "a/+", "a/#", "#/a"};
for (i = 0; i < sizeof(tests0) / sizeof(char *); ++i) {
os_printf("topic %s, isValidName %d\n", tests0[i].str, Topics_isValidName(tests0[i].str));
//assert(Topics_isValidName(tests0[i].str) == 1);
}
struct {
char *wild;
char *topic;
int result;
} tests1[] = {
{
"#", "jj", 1}, {
"+/a", "adkj/a", 1}, {
"+/a", "adsjk/adakjd/a", 0}, {
"+/+/a", "adsjk/adakjd/a", 1}, {
"#/a", "adsjk/adakjd/a", 1}, {
"test/#", "test/1", 1}, {
"test/+", "test/1", 1}, {
"+", "test1", 1}, {
"+", "test1/k", 0}, {
"+", "/test1/k", 0}, {
"/+", "test1/k", 0}, {
"+", "/jkj", 0}, {
"/+", "/test1", 1}, {
"+/+", "/test1", 0}, {
"+/+", "test1/k", 1}, {
"/#", "/test1/k", 1}, {
"/#", "test1/k", 0},};
for (i = 0; i < ARRAY_SIZE(tests1); ++i) {
os_printf("wild: %s, topic %s, result %d %d (should)\n", tests1[i].wild, tests1[i].topic,
Topics_matches(_strdup(tests1[i].wild), 1, _strdup(tests1[i].topic)), tests1[i].result);
//assert(Topics_matches(_strdup(tests1[i].wild), _strdup(tests1[i].topic)) == tests1[i].result);
}
struct {
char *str;
char *result;
} tests2[] = {
{
"#", "#"}, {
"ab", "ba"}, {
"abc", "cba"}, {
"abcd", "dcba"}, {
"abcde", "edcba"}
};
for (i = 0; i < 5; ++i) {
os_printf("str: %s, _strrev %s\n", tests2[i].str, _strrev(_strdup(tests2[i].str)));
//assert(strcmp(tests2[i].result, _strrev(_strdup(tests2[i].str))) == 0);
}
}
#endif

Wyświetl plik

@ -1,132 +0,0 @@
#include "proto.h"
#include "ringbuf_mqtt.h"
I8 ICACHE_FLASH_ATTR PROTO_Init(PROTO_PARSER * parser, PROTO_PARSE_CALLBACK * completeCallback, U8 * buf, U16 bufSize) {
parser->buf = buf;
parser->bufSize = bufSize;
parser->dataLen = 0;
parser->callback = completeCallback;
parser->isEsc = 0;
return 0;
}
I8 ICACHE_FLASH_ATTR PROTO_ParseByte(PROTO_PARSER * parser, U8 value) {
switch (value) {
case 0x7D:
parser->isEsc = 1;
break;
case 0x7E:
parser->dataLen = 0;
parser->isEsc = 0;
parser->isBegin = 1;
break;
case 0x7F:
if (parser->callback != NULL)
parser->callback();
parser->isBegin = 0;
return 0;
break;
default:
if (parser->isBegin == 0)
break;
if (parser->isEsc) {
value ^= 0x20;
parser->isEsc = 0;
}
if (parser->dataLen < parser->bufSize)
parser->buf[parser->dataLen++] = value;
break;
}
return -1;
}
I8 ICACHE_FLASH_ATTR PROTO_Parse(PROTO_PARSER * parser, U8 * buf, U16 len) {
while (len--)
PROTO_ParseByte(parser, *buf++);
return 0;
}
I16 ICACHE_FLASH_ATTR PROTO_ParseRb(RINGBUF * rb, U8 * bufOut, U16 * len, U16 maxBufLen) {
U8 c;
PROTO_PARSER proto;
PROTO_Init(&proto, NULL, bufOut, maxBufLen);
while (RINGBUF_Get(rb, &c) == 0) {
if (PROTO_ParseByte(&proto, c) == 0) {
*len = proto.dataLen;
return 0;
}
}
return -1;
}
I16 ICACHE_FLASH_ATTR PROTO_Add(U8 * buf, const U8 * packet, I16 bufSize) {
U16 i = 2;
U16 len = *(U16 *) packet;
if (bufSize < 1)
return -1;
*buf++ = 0x7E;
bufSize--;
while (len--) {
switch (*packet) {
case 0x7D:
case 0x7E:
case 0x7F:
if (bufSize < 2)
return -1;
*buf++ = 0x7D;
*buf++ = *packet++ ^ 0x20;
i += 2;
bufSize -= 2;
break;
default:
if (bufSize < 1)
return -1;
*buf++ = *packet++;
i++;
bufSize--;
break;
}
}
if (bufSize < 1)
return -1;
*buf++ = 0x7F;
return i;
}
I16 ICACHE_FLASH_ATTR PROTO_AddRb(RINGBUF * rb, const U8 * packet, I16 len) {
U16 i = 2;
if (RINGBUF_Put(rb, 0x7E) == -1)
return -1;
while (len--) {
switch (*packet) {
case 0x7D:
case 0x7E:
case 0x7F:
if (RINGBUF_Put(rb, 0x7D) == -1)
return -1;
if (RINGBUF_Put(rb, *packet++ ^ 0x20) == -1)
return -1;
i += 2;
break;
default:
if (RINGBUF_Put(rb, *packet++) == -1)
return -1;
i++;
break;
}
}
if (RINGBUF_Put(rb, 0x7F) == -1)
return -1;
return i;
}

Wyświetl plik

@ -1,53 +0,0 @@
/* str_queue.c
*
* Copyright (c) 2014-2015, Tuan PM <tuanpm at live dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "queue.h"
#include "user_interface.h"
#include "osapi.h"
#include "os_type.h"
#include "mem.h"
#include "proto.h"
void ICACHE_FLASH_ATTR QUEUE_Init(QUEUE * queue, int bufferSize) {
queue->buf = (uint8_t *) os_zalloc(bufferSize);
RINGBUF_Init(&queue->rb, queue->buf, bufferSize);
}
int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE * queue, uint8_t * buffer, uint16_t len) {
return PROTO_AddRb(&queue->rb, buffer, len);
}
int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE * queue, uint8_t * buffer, uint16_t * len, uint16_t maxLen) {
return PROTO_ParseRb(&queue->rb, buffer, len, maxLen);
}
BOOL ICACHE_FLASH_ATTR QUEUE_IsEmpty(QUEUE * queue) {
if (queue->rb.fill_cnt <= 0)
return TRUE;
return FALSE;
}

Wyświetl plik

@ -1,64 +0,0 @@
/**
* \file
* Ring Buffer library
*/
#include "ringbuf_mqtt.h"
/**
* \brief init a RINGBUF object
* \param r pointer to a RINGBUF object
* \param buf pointer to a byte array
* \param size size of buf
* \return 0 if successfull, otherwise failed
*/
I16 ICACHE_FLASH_ATTR RINGBUF_Init(RINGBUF * r, U8 * buf, I32 size) {
if (r == NULL || buf == NULL || size < 2)
return -1;
r->p_o = r->p_r = r->p_w = buf;
r->fill_cnt = 0;
r->size = size;
return 0;
}
/**
* \brief put a character into ring buffer
* \param r pointer to a ringbuf object
* \param c character to be put
* \return 0 if successfull, otherwise failed
*/
I16 ICACHE_FLASH_ATTR RINGBUF_Put(RINGBUF * r, U8 c) {
if (r->fill_cnt >= r->size)
return -1; // ring buffer is full, this should be atomic operation
r->fill_cnt++; // increase filled slots count, this should be atomic operation
*r->p_w++ = c; // put character into buffer
if (r->p_w >= r->p_o + r->size) // rollback if write pointer go pass
r->p_w = r->p_o; // the physical boundary
return 0;
}
/**
* \brief get a character from ring buffer
* \param r pointer to a ringbuf object
* \param c read character
* \return 0 if successfull, otherwise failed
*/
I16 ICACHE_FLASH_ATTR RINGBUF_Get(RINGBUF * r, U8 * c) {
if (r->fill_cnt <= 0)
return -1; // ring buffer is empty, this should be atomic operation
r->fill_cnt--; // decrease filled slots count
*c = *r->p_r++; // get the character out
if (r->p_r >= r->p_o + r->size) // rollback if write pointer go pass
r->p_r = r->p_o; // the physical boundary
return 0;
}

Wyświetl plik

@ -1,144 +0,0 @@
/*
* Copyright (c) 2014, Tuan PM
* Email: tuanpm@live.com
*
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
*/
#include <string.h>
#include <stdio.h>
#include <ctype.h>
#include <math.h>
#include <stddef.h>
#include "utils.h"
uint8_t ICACHE_FLASH_ATTR UTILS_IsIPV4(int8_t * str) {
uint8_t segs = 0; /* Segment count. */
uint8_t chcnt = 0; /* Character count within segment. */
uint8_t accum = 0; /* Accumulator for segment. */
/* Catch NULL pointer. */
if (str == 0)
return 0;
/* Process every character in string. */
while (*str != '\0') {
/* Segment changeover. */
if (*str == '.') {
/* Must have some digits in segment. */
if (chcnt == 0)
return 0;
/* Limit number of segments. */
if (++segs == 4)
return 0;
/* Reset segment values and restart loop. */
chcnt = accum = 0;
str++;
continue;
}
/* Check numeric. */
if ((*str < '0') || (*str > '9'))
return 0;
/* Accumulate and check segment. */
if ((accum = accum * 10 + *str - '0') > 255)
return 0;
/* Advance other segment specific stuff and continue loop. */
chcnt++;
str++;
}
/* Check enough segments and enough characters in last segment. */
if (segs != 3)
return 0;
if (chcnt == 0)
return 0;
/* Address okay. */
return 1;
}
uint8_t ICACHE_FLASH_ATTR UTILS_StrToIP(const int8_t * str, void *ip) {
/* The count of the number of bytes processed. */
int i;
/* A pointer to the next digit to process. */
const char *start;
start = str;
for (i = 0; i < 4; i++) {
/* The digit being processed. */
char c;
/* The value of this byte. */
int n = 0;
while (1) {
c = *start;
start++;
if (c >= '0' && c <= '9') {
n *= 10;
n += c - '0';
}
/* We insist on stopping at "." if we are still parsing
the first, second, or third numbers. If we have reached
the end of the numbers, we will allow any character. */
else if ((i < 3 && c == '.') || i == 3) {
break;
} else {
return 0;
}
}
if (n >= 256) {
return 0;
}
((uint8_t *) ip)[i] = n;
}
return 1;
}
uint32_t ICACHE_FLASH_ATTR UTILS_Atoh(const int8_t * s) {
uint32_t value = 0, digit;
int8_t c;
while ((c = *s++)) {
if ('0' <= c && c <= '9')
digit = c - '0';
else if ('A' <= c && c <= 'F')
digit = c - 'A' + 10;
else if ('a' <= c && c <= 'f')
digit = c - 'a' + 10;
else
break;
value = (value << 4) | digit;
}
return value;
}

1
uMQTTBroker 160000

@ -0,0 +1 @@
Subproject commit 9b654d4257bc007405e8ca0d56183b4b712ec7c9

Wyświetl plik

@ -4,8 +4,8 @@
#include "lang.h"
#include "user_config.h"
#include "config_flash.h"
#include "mqtt_topics.h"
#include "mqtt_retainedlist.h"
#include "mqtt/mqtt_topics.h"
#include "mqtt/mqtt_retainedlist.h"
#ifdef NTP
#include "ntp.h"
#endif

Wyświetl plik

@ -1,7 +1,7 @@
#ifndef _LANG_
#define _LANG_
#include "mqtt_server.h"
#include "mqtt/mqtt_server.h"
typedef enum {SYNTAX_CHECK, CONFIG, INIT, MQTT_CLIENT_CONNECT, WIFI_CONNECT, TOPIC_LOCAL, TOPIC_REMOTE, TIMER, GPIO_INT, CLOCK, HTTP_RESPONSE} Interpreter_Status;

Wyświetl plik

@ -13,9 +13,9 @@
#include "config_flash.h"
#include "sys_time.h"
#include "mqtt_server.h"
#include "mqtt_topiclist.h"
#include "mqtt_retainedlist.h"
#include "mqtt/mqtt_server.h"
#include "mqtt/mqtt_topiclist.h"
#include "mqtt/mqtt_retainedlist.h"
#ifdef GPIO
//#include "easygpio.h"

Wyświetl plik

@ -1,5 +1,5 @@
#include "user_interface.h"
#include "mqtt_server.h"
#include "mqtt/mqtt_server.h"
#include "user_config.h"
void ICACHE_FLASH_ATTR user_init() {