kopia lustrzana https://github.com/martin-ger/esp_mqtt
Initial commit of uMQTT Broker
rodzic
cf87bc521e
commit
7b3905ae16
8
Makefile
8
Makefile
|
@ -9,11 +9,11 @@ THISDIR:=$(dir $(abspath $(lastword $(MAKEFILE_LIST))))
|
|||
XTENSA_TOOLS_ROOT ?=
|
||||
|
||||
# base directory of the ESP8266 SDK package, absolute
|
||||
SDK_BASE ?= /tools/esp8266/sdk/ESP8266_NONOS_SDK
|
||||
SDK_BASE ?= /home/martin/github/esp-open-sdk/sdk/
|
||||
|
||||
#Esptool.py path and port
|
||||
ESPTOOL ?= /tools/esp8266/esptool/esptool.py
|
||||
ESPPORT ?= /dev/tty.SLAB_USBtoUART
|
||||
ESPTOOL ?= /home/martin/github/esp-open-sdk/esptool/esptool.py
|
||||
ESPPORT ?= /dev/ttyUSB0
|
||||
#ESPPORT ?= /dev/tty.wchusbserial1410
|
||||
#ESPDELAY indicates seconds to wait between flashing the two binary images
|
||||
ESPDELAY ?= 3
|
||||
|
@ -56,7 +56,7 @@ FIRMWARE_BASE = firmware
|
|||
|
||||
# Opensdk patches stdint.h when compiled with an internal SDK. If you run into compile problems pertaining to
|
||||
# redefinition of int types, try setting this to 'yes'.
|
||||
USE_OPENSDK ?= no
|
||||
USE_OPENSDK ?= yes
|
||||
|
||||
DATETIME := $(shell date "+%Y-%b-%d_%H:%M:%S_%Z")
|
||||
|
||||
|
|
|
@ -8,7 +8,6 @@
|
|||
#ifndef USER_DEBUG_H_
|
||||
#define USER_DEBUG_H_
|
||||
|
||||
|
||||
#if defined(MQTT_DEBUG_ON)
|
||||
#define MQTT_INFO( format, ... ) os_printf( format, ## __VA_ARGS__ )
|
||||
#else
|
||||
|
|
|
@ -99,7 +99,8 @@ typedef struct mqtt_connect_info
|
|||
char* username;
|
||||
char* password;
|
||||
char* will_topic;
|
||||
char* will_message;
|
||||
char* will_data;
|
||||
uint16_t will_data_len;
|
||||
uint32_t keepalive;
|
||||
int will_qos;
|
||||
int will_retain;
|
||||
|
@ -107,6 +108,55 @@ typedef struct mqtt_connect_info
|
|||
|
||||
} mqtt_connect_info_t;
|
||||
|
||||
#define MQTT_MAX_FIXED_HEADER_SIZE 3
|
||||
|
||||
enum mqtt_connect_flag
|
||||
{
|
||||
MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
|
||||
MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
|
||||
MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
|
||||
MQTT_CONNECT_FLAG_WILL = 1 << 2,
|
||||
MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
|
||||
};
|
||||
|
||||
struct __attribute((__packed__)) mqtt_connect_variable_header
|
||||
{
|
||||
uint8_t lengthMsb;
|
||||
uint8_t lengthLsb;
|
||||
#if defined(PROTOCOL_NAMEv31)
|
||||
uint8_t magic[6];
|
||||
#elif defined(PROTOCOL_NAMEv311)
|
||||
uint8_t magic[4];
|
||||
#else
|
||||
#error "Please define protocol name"
|
||||
#endif
|
||||
uint8_t version;
|
||||
uint8_t flags;
|
||||
uint8_t keepaliveMsb;
|
||||
uint8_t keepaliveLsb;
|
||||
};
|
||||
|
||||
struct __attribute((__packed__)) mqtt_connect_variable_header3
|
||||
{
|
||||
uint8_t lengthMsb;
|
||||
uint8_t lengthLsb;
|
||||
uint8_t magic[6];
|
||||
uint8_t version;
|
||||
uint8_t flags;
|
||||
uint8_t keepaliveMsb;
|
||||
uint8_t keepaliveLsb;
|
||||
};
|
||||
|
||||
struct __attribute((__packed__)) mqtt_connect_variable_header4
|
||||
{
|
||||
uint8_t lengthMsb;
|
||||
uint8_t lengthLsb;
|
||||
uint8_t magic[4];
|
||||
uint8_t version;
|
||||
uint8_t flags;
|
||||
uint8_t keepaliveMsb;
|
||||
uint8_t keepaliveLsb;
|
||||
};
|
||||
|
||||
static inline int ICACHE_FLASH_ATTR mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> 4; }
|
||||
static inline int ICACHE_FLASH_ATTR mqtt_get_connect_return_code(uint8_t* buffer) { return buffer[3]; }
|
||||
|
@ -116,18 +166,22 @@ static inline int ICACHE_FLASH_ATTR mqtt_get_retain(uint8_t* buffer) { return (b
|
|||
|
||||
void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
|
||||
int ICACHE_FLASH_ATTR mqtt_get_total_length(uint8_t* buffer, uint16_t length);
|
||||
char* ICACHE_FLASH_ATTR mqtt_get_str(uint8_t* buffer, uint16_t* length);
|
||||
const char* ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length);
|
||||
const char* ICACHE_FLASH_ATTR mqtt_get_publish_data(uint8_t* buffer, uint16_t* length);
|
||||
uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t* buffer, uint16_t length);
|
||||
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info);
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connack(mqtt_connection_t* connection, enum mqtt_connect_return_code retcode);
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id);
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id);
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id);
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id);
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id);
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id);
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_suback(mqtt_connection_t* connection, uint8_t *ret_codes, uint8_t ret_codes_len, uint16_t message_id);
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id);
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_unsuback(mqtt_connection_t* connection, uint16_t message_id);
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t* connection);
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingresp(mqtt_connection_t* connection);
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_disconnect(mqtt_connection_t* connection);
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
#ifndef _MQTT_RETAINEDLIST_H_
|
||||
#define _MQTT_RETAINEDLIST_H_
|
||||
|
||||
#include "mqtt_server.h"
|
||||
|
||||
typedef struct _retained_entry {
|
||||
uint8_t *topic;
|
||||
uint8_t *data;
|
||||
uint16_t data_len;
|
||||
uint8_t qos;
|
||||
} retained_entry;
|
||||
|
||||
typedef bool (*iterate_retainedtopic_cb)(retained_entry *topic, void *user_data);
|
||||
typedef bool (*find_retainedtopic_cb)(retained_entry *topic, MQTT_ClientCon *clientcon);
|
||||
|
||||
bool create_retainedlist(uint16_t num_entires);
|
||||
bool update_retainedtopic(uint8_t *topic, uint8_t *data, uint16_t data_len, uint8_t qos);
|
||||
bool find_retainedtopic(uint8_t *topic, find_retainedtopic_cb cb, MQTT_ClientCon *clientcon);
|
||||
void iterate_retainedtopics(iterate_retainedtopic_cb cb, void *user_data);
|
||||
|
||||
#endif /* _MQTT_RETAINEDLIST_H_ */
|
|
@ -0,0 +1,47 @@
|
|||
#ifndef _MQTT_SERVER_H_
|
||||
#define _MQTT_SERVER_H_
|
||||
|
||||
#include "c_types.h"
|
||||
#include "osapi.h"
|
||||
#include "os_type.h"
|
||||
#include "ip_addr.h"
|
||||
#include "espconn.h"
|
||||
//#include "lwip/ip.h"
|
||||
//#include "lwip/app/espconn.h"
|
||||
//#include "lwip/app/espconn_tcp.h"
|
||||
|
||||
#include "mqtt.h"
|
||||
|
||||
#define LOCAL_MQTT_CLIENT ((void*)-1)
|
||||
|
||||
typedef struct _MQTT_ClientCon {
|
||||
struct espconn *pCon;
|
||||
// uint8_t security;
|
||||
// uint32_t port;
|
||||
// ip_addr_t ip;
|
||||
mqtt_state_t mqtt_state;
|
||||
mqtt_connect_info_t connect_info;
|
||||
// MqttCallback connectedCb;
|
||||
// MqttCallback disconnectedCb;
|
||||
// MqttCallback publishedCb;
|
||||
// MqttCallback timeoutCb;
|
||||
// MqttDataCallback dataCb;
|
||||
ETSTimer mqttTimer;
|
||||
uint32_t sendTimeout;
|
||||
tConnState connState;
|
||||
QUEUE msgQueue;
|
||||
uint8_t protocolVersion;
|
||||
void* user_data;
|
||||
struct _MQTT_ClientCon *next;
|
||||
} MQTT_ClientCon;
|
||||
|
||||
extern MQTT_ClientCon *clientcon_list;
|
||||
|
||||
bool MQTT_server_start(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics);
|
||||
|
||||
bool MQTT_local_publish(uint8_t* topic, uint8_t* data, uint16_t data_length, uint8_t qos, uint8_t retain);
|
||||
bool MQTT_local_subscribe(uint8_t* topic, uint8_t qos);
|
||||
bool MQTT_local_unsubscribe(uint8_t* topic);
|
||||
void MQTT_local_onData(MqttDataCallback dataCb);
|
||||
|
||||
#endif /* _MQTT_SERVER_H_ */
|
|
@ -0,0 +1,21 @@
|
|||
#ifndef _MQTT_TOPICLIST_H_
|
||||
#define _MQTT_TOPICLIST_H_
|
||||
|
||||
#include "mqtt_server.h"
|
||||
|
||||
typedef struct _topic_entry {
|
||||
MQTT_ClientCon *clientcon;
|
||||
uint8_t *topic;
|
||||
uint8_t qos;
|
||||
} topic_entry;
|
||||
|
||||
typedef bool (*iterate_topic_cb)(topic_entry *topic, void *user_data);
|
||||
typedef bool (*find_topic_cb)(topic_entry *topic_e, uint8_t *topic, uint8_t *data, uint16_t data_len);
|
||||
|
||||
bool create_topiclist(uint16_t num_entires);
|
||||
bool add_topic(MQTT_ClientCon *clientcon, uint8_t *topic, uint8_t qos);
|
||||
bool delete_topic(MQTT_ClientCon *clientcon, uint8_t *topic);
|
||||
bool find_topic(uint8_t *topic, find_topic_cb cb, uint8_t *data, uint16_t data_len);
|
||||
void iterate_topics(iterate_topic_cb cb, void *user_data);
|
||||
|
||||
#endif /* _MQTT_TOPICLIST_H_ */
|
|
@ -0,0 +1,32 @@
|
|||
/*******************************************************************************
|
||||
* Copyright (c) 2007, 2013 IBM Corp.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Ian Craggs - initial API and implementation and/or initial documentation
|
||||
*******************************************************************************/
|
||||
|
||||
|
||||
#if !defined(MQTT_TOPICS_H)
|
||||
#define MQTT_TOPICS_H
|
||||
|
||||
#define TOPIC_LEVEL_SEPARATOR "/"
|
||||
#define SINGLE_LEVEL_WILDCARD "+"
|
||||
#define MULTI_LEVEL_WILDCARD "#"
|
||||
|
||||
int Topics_isValidName(char* aName);
|
||||
|
||||
int Topics_hasWildcards(char* topic);
|
||||
|
||||
int Topics_matches(char* wildTopic, int wildcards, char* topic);
|
||||
|
||||
#endif /* MQTT_TOPICS_H */
|
||||
|
|
@ -9,7 +9,7 @@
|
|||
#define _PROTO_H_
|
||||
#include <stdlib.h>
|
||||
#include "typedef.h"
|
||||
#include "ringbuf.h"
|
||||
#include "ringbuf_mqtt.h"
|
||||
|
||||
typedef void(PROTO_PARSE_CALLBACK)();
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@
|
|||
#ifndef USER_QUEUE_H_
|
||||
#define USER_QUEUE_H_
|
||||
#include "os_type.h"
|
||||
#include "ringbuf.h"
|
||||
#include "ringbuf_mqtt.h"
|
||||
typedef struct {
|
||||
uint8_t *buf;
|
||||
RINGBUF rb;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
#ifndef _RING_BUF_H_
|
||||
#define _RING_BUF_H_
|
||||
#ifndef _RING_BUF_MQTT_H_
|
||||
#define _RING_BUF_MQTT_H_
|
||||
|
||||
#include <os_type.h>
|
||||
#include <stdlib.h>
|
12
mqtt/mqtt.c
12
mqtt/mqtt.c
|
@ -256,9 +256,9 @@ mqtt_client_delete(MQTT_Client *mqttClient)
|
|||
mqttClient->connect_info.will_topic = NULL;
|
||||
}
|
||||
|
||||
if (mqttClient->connect_info.will_message != NULL) {
|
||||
os_free(mqttClient->connect_info.will_message);
|
||||
mqttClient->connect_info.will_message = NULL;
|
||||
if (mqttClient->connect_info.will_data != NULL) {
|
||||
os_free(mqttClient->connect_info.will_data);
|
||||
mqttClient->connect_info.will_data = NULL;
|
||||
}
|
||||
|
||||
if (mqttClient->msgQueue.buf != NULL) {
|
||||
|
@ -876,9 +876,9 @@ MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, ui
|
|||
mqttClient->connect_info.will_topic[temp] = 0;
|
||||
|
||||
temp = os_strlen(will_msg);
|
||||
mqttClient->connect_info.will_message = (uint8_t*)os_zalloc(temp + 1);
|
||||
os_strcpy(mqttClient->connect_info.will_message, will_msg);
|
||||
mqttClient->connect_info.will_message[temp] = 0;
|
||||
mqttClient->connect_info.will_data = (uint8_t*)os_zalloc(temp + 1);
|
||||
os_strcpy(mqttClient->connect_info.will_data, will_msg);
|
||||
mqttClient->connect_info.will_data[temp] = 0;
|
||||
|
||||
|
||||
mqttClient->connect_info.will_qos = will_qos;
|
||||
|
|
111
mqtt/mqtt_msg.c
111
mqtt/mqtt_msg.c
|
@ -29,36 +29,13 @@
|
|||
*
|
||||
*/
|
||||
|
||||
#include "ets_sys.h"
|
||||
#include "osapi.h"
|
||||
#include "os_type.h"
|
||||
|
||||
#include <string.h>
|
||||
#include "mqtt_msg.h"
|
||||
#include "user_config.h"
|
||||
#define MQTT_MAX_FIXED_HEADER_SIZE 3
|
||||
|
||||
enum mqtt_connect_flag
|
||||
{
|
||||
MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
|
||||
MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
|
||||
MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
|
||||
MQTT_CONNECT_FLAG_WILL = 1 << 2,
|
||||
MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
|
||||
};
|
||||
|
||||
struct __attribute((__packed__)) mqtt_connect_variable_header
|
||||
{
|
||||
uint8_t lengthMsb;
|
||||
uint8_t lengthLsb;
|
||||
#if defined(PROTOCOL_NAMEv31)
|
||||
uint8_t magic[6];
|
||||
#elif defined(PROTOCOL_NAMEv311)
|
||||
uint8_t magic[4];
|
||||
#else
|
||||
#error "Please define protocol name"
|
||||
#endif
|
||||
uint8_t version;
|
||||
uint8_t flags;
|
||||
uint8_t keepaliveMsb;
|
||||
uint8_t keepaliveLsb;
|
||||
};
|
||||
|
||||
static int ICACHE_FLASH_ATTR append_string(mqtt_connection_t* connection, const char* string, int len)
|
||||
{
|
||||
|
@ -67,7 +44,7 @@ static int ICACHE_FLASH_ATTR append_string(mqtt_connection_t* connection, const
|
|||
|
||||
connection->buffer[connection->message.length++] = len >> 8;
|
||||
connection->buffer[connection->message.length++] = len & 0xff;
|
||||
memcpy(connection->buffer + connection->message.length, string, len);
|
||||
os_memcpy(connection->buffer + connection->message.length, string, len);
|
||||
connection->message.length += len;
|
||||
|
||||
return len + 2;
|
||||
|
@ -127,7 +104,7 @@ static mqtt_message_t* ICACHE_FLASH_ATTR fini_message(mqtt_connection_t* connect
|
|||
|
||||
void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length)
|
||||
{
|
||||
memset(connection, 0, sizeof(mqtt_connection_t));
|
||||
os_memset(connection, 0, sizeof(mqtt_connection_t));
|
||||
connection->buffer = buffer;
|
||||
connection->buffer_length = buffer_length;
|
||||
}
|
||||
|
@ -151,6 +128,24 @@ int ICACHE_FLASH_ATTR mqtt_get_total_length(uint8_t* buffer, uint16_t length)
|
|||
return totlen;
|
||||
}
|
||||
|
||||
|
||||
char* ICACHE_FLASH_ATTR mqtt_get_str(uint8_t* buffer, uint16_t* length)
|
||||
{
|
||||
int i = 0;
|
||||
int topiclen;
|
||||
|
||||
if (i + 2 >= *length)
|
||||
return NULL;
|
||||
topiclen = buffer[i++] << 8;
|
||||
topiclen |= buffer[i++];
|
||||
|
||||
if (i + topiclen > *length)
|
||||
return NULL;
|
||||
|
||||
*length = topiclen;
|
||||
return buffer + i;
|
||||
}
|
||||
|
||||
const char* ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length)
|
||||
{
|
||||
int i;
|
||||
|
@ -274,6 +269,7 @@ uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t* buffer, uint16_t length)
|
|||
case MQTT_MSG_TYPE_SUBACK:
|
||||
case MQTT_MSG_TYPE_UNSUBACK:
|
||||
case MQTT_MSG_TYPE_SUBSCRIBE:
|
||||
case MQTT_MSG_TYPE_UNSUBSCRIBE:
|
||||
{
|
||||
// This requires the remaining length to be encoded in 1 byte,
|
||||
// which it should be.
|
||||
|
@ -302,11 +298,11 @@ mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection
|
|||
variable_header->lengthMsb = 0;
|
||||
#if defined(PROTOCOL_NAMEv31)
|
||||
variable_header->lengthLsb = 6;
|
||||
memcpy(variable_header->magic, "MQIsdp", 6);
|
||||
os_memcpy(variable_header->magic, "MQIsdp", 6);
|
||||
variable_header->version = 3;
|
||||
#elif defined(PROTOCOL_NAMEv311)
|
||||
variable_header->lengthLsb = 4;
|
||||
memcpy(variable_header->magic, "MQTT", 4);
|
||||
os_memcpy(variable_header->magic, "MQTT", 4);
|
||||
variable_header->version = 4;
|
||||
#else
|
||||
#error "Please define protocol name"
|
||||
|
@ -337,16 +333,16 @@ mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection
|
|||
else
|
||||
{
|
||||
/* No 0 data and at least 1 long. Good to go. */
|
||||
if(append_string(connection, info->client_id, strlen(info->client_id)) < 0)
|
||||
if(append_string(connection, info->client_id, os_strlen(info->client_id)) < 0)
|
||||
return fail_message(connection);
|
||||
}
|
||||
|
||||
if (info->will_topic != NULL && info->will_topic[0] != '\0')
|
||||
{
|
||||
if (append_string(connection, info->will_topic, strlen(info->will_topic)) < 0)
|
||||
if (append_string(connection, info->will_topic, os_strlen(info->will_topic)) < 0)
|
||||
return fail_message(connection);
|
||||
|
||||
if (append_string(connection, info->will_message, strlen(info->will_message)) < 0)
|
||||
if (append_string(connection, info->will_data, os_strlen(info->will_data)) < 0)
|
||||
return fail_message(connection);
|
||||
|
||||
variable_header->flags |= MQTT_CONNECT_FLAG_WILL;
|
||||
|
@ -357,7 +353,7 @@ mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection
|
|||
|
||||
if (info->username != NULL && info->username[0] != '\0')
|
||||
{
|
||||
if (append_string(connection, info->username, strlen(info->username)) < 0)
|
||||
if (append_string(connection, info->username, os_strlen(info->username)) < 0)
|
||||
return fail_message(connection);
|
||||
|
||||
variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME;
|
||||
|
@ -365,7 +361,7 @@ mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection
|
|||
|
||||
if (info->password != NULL && info->password[0] != '\0')
|
||||
{
|
||||
if (append_string(connection, info->password, strlen(info->password)) < 0)
|
||||
if (append_string(connection, info->password, os_strlen(info->password)) < 0)
|
||||
return fail_message(connection);
|
||||
|
||||
variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD;
|
||||
|
@ -374,6 +370,14 @@ mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection
|
|||
return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0);
|
||||
}
|
||||
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connack(mqtt_connection_t* connection, enum mqtt_connect_return_code retcode)
|
||||
{
|
||||
init_message(connection);
|
||||
connection->buffer[connection->message.length++] = 0; // Connect Acknowledge Flags
|
||||
connection->buffer[connection->message.length++] = retcode; // Connect Return code
|
||||
return fini_message(connection, MQTT_MSG_TYPE_CONNACK, 0, 0, 0);
|
||||
}
|
||||
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id)
|
||||
{
|
||||
init_message(connection);
|
||||
|
@ -381,7 +385,7 @@ mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t* connection
|
|||
if (topic == NULL || topic[0] == '\0')
|
||||
return fail_message(connection);
|
||||
|
||||
if (append_string(connection, topic, strlen(topic)) < 0)
|
||||
if (append_string(connection, topic, os_strlen(topic)) < 0)
|
||||
return fail_message(connection);
|
||||
|
||||
if (qos > 0)
|
||||
|
@ -394,7 +398,7 @@ mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t* connection
|
|||
|
||||
if (connection->message.length + data_length > connection->buffer_length)
|
||||
return fail_message(connection);
|
||||
memcpy(connection->buffer + connection->message.length, data, data_length);
|
||||
os_memcpy(connection->buffer + connection->message.length, data, data_length);
|
||||
connection->message.length += data_length;
|
||||
|
||||
return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain);
|
||||
|
@ -442,7 +446,7 @@ mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t* connecti
|
|||
if ((*message_id = append_message_id(connection, 0)) == 0)
|
||||
return fail_message(connection);
|
||||
|
||||
if (append_string(connection, topic, strlen(topic)) < 0)
|
||||
if (append_string(connection, topic, os_strlen(topic)) < 0)
|
||||
return fail_message(connection);
|
||||
|
||||
if (connection->message.length + 1 > connection->buffer_length)
|
||||
|
@ -452,6 +456,21 @@ mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t* connecti
|
|||
return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0);
|
||||
}
|
||||
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_suback(mqtt_connection_t* connection, uint8_t *ret_codes, uint8_t ret_codes_len, uint16_t message_id)
|
||||
{
|
||||
uint8_t i;
|
||||
|
||||
init_message(connection);
|
||||
|
||||
if ((append_message_id(connection, message_id)) == 0)
|
||||
return fail_message(connection);
|
||||
|
||||
for (i = 0; i < ret_codes_len; i++)
|
||||
connection->buffer[connection->message.length++] = ret_codes[i];
|
||||
|
||||
return fini_message(connection, MQTT_MSG_TYPE_SUBACK, 0, 0, 0);
|
||||
}
|
||||
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id)
|
||||
{
|
||||
init_message(connection);
|
||||
|
@ -462,12 +481,24 @@ mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t* connec
|
|||
if ((*message_id = append_message_id(connection, 0)) == 0)
|
||||
return fail_message(connection);
|
||||
|
||||
if (append_string(connection, topic, strlen(topic)) < 0)
|
||||
if (append_string(connection, topic, os_strlen(topic)) < 0)
|
||||
return fail_message(connection);
|
||||
|
||||
return fini_message(connection, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0);
|
||||
}
|
||||
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_unsuback(mqtt_connection_t* connection, uint16_t message_id)
|
||||
{
|
||||
uint8_t i;
|
||||
|
||||
init_message(connection);
|
||||
|
||||
if ((append_message_id(connection, message_id)) == 0)
|
||||
return fail_message(connection);
|
||||
|
||||
return fini_message(connection, MQTT_MSG_TYPE_UNSUBACK, 0, 0, 0);
|
||||
}
|
||||
|
||||
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t* connection)
|
||||
{
|
||||
init_message(connection);
|
||||
|
|
|
@ -0,0 +1,115 @@
|
|||
#include "mem.h"
|
||||
#include "ets_sys.h"
|
||||
#include "osapi.h"
|
||||
#include "os_type.h"
|
||||
|
||||
#include <string.h>
|
||||
//#include "user_config.h"
|
||||
|
||||
#include "mqtt_retainedlist.h"
|
||||
#include "mqtt_topics.h"
|
||||
|
||||
static retained_entry *retained_list = NULL;
|
||||
static uint16_t max_entry;
|
||||
|
||||
bool ICACHE_FLASH_ATTR create_retainedlist(uint16_t num_entires)
|
||||
{
|
||||
max_entry = num_entires;
|
||||
retained_list = (retained_entry *)os_zalloc(num_entires * sizeof(retained_entry));
|
||||
return retained_list != NULL;
|
||||
}
|
||||
|
||||
bool update_retainedtopic(uint8_t *topic, uint8_t *data, uint16_t data_len, uint8_t qos)
|
||||
{
|
||||
uint16_t i;
|
||||
|
||||
if (retained_list == NULL) return false;
|
||||
|
||||
// look for topic in list
|
||||
for (i=0; i<max_entry; i++) {
|
||||
if (retained_list[i].topic != NULL && os_strcmp(retained_list[i].topic, topic)==0)
|
||||
break;
|
||||
}
|
||||
|
||||
// not yet in list
|
||||
if (i>=max_entry) {
|
||||
|
||||
// if empty new data - no entry required
|
||||
if (data_len == 0) return true;
|
||||
|
||||
// find free
|
||||
for (i=0; i<max_entry; i++) {
|
||||
if (retained_list[i].topic == NULL)
|
||||
break;
|
||||
}
|
||||
if (i>=max_entry) {
|
||||
// list full
|
||||
return false;
|
||||
}
|
||||
retained_list[i].topic = (uint8_t *)os_malloc(os_strlen(topic));
|
||||
if (retained_list[i].topic == NULL) {
|
||||
// out of mem
|
||||
return false;
|
||||
}
|
||||
os_strcpy(retained_list[i].topic, topic);
|
||||
}
|
||||
|
||||
// if empty new data - delete
|
||||
if (data_len == 0) {
|
||||
os_free(retained_list[i].topic);
|
||||
retained_list[i].topic = NULL;
|
||||
os_free(retained_list[i].data);
|
||||
retained_list[i].data = NULL;
|
||||
return true;
|
||||
}
|
||||
|
||||
// not same size as before, new memory allocation
|
||||
if (data_len != retained_list[i].data_len) {
|
||||
os_free(retained_list[i].data);
|
||||
retained_list[i].data = (uint8_t *)os_malloc(data_len);
|
||||
if (retained_list[i].data == NULL) {
|
||||
// out of mem
|
||||
os_free(retained_list[i].topic);
|
||||
retained_list[i].topic = NULL;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
os_memcpy(retained_list[i].data, data, data_len);
|
||||
retained_list[i].data_len = data_len;
|
||||
retained_list[i].qos = qos;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ICACHE_FLASH_ATTR find_retainedtopic(uint8_t *topic, find_retainedtopic_cb cb, MQTT_ClientCon *clientcon)
|
||||
{
|
||||
uint16_t i;
|
||||
bool retval = false;
|
||||
|
||||
if (retained_list == NULL) return false;
|
||||
|
||||
for (i=0; i<max_entry; i++) {
|
||||
if (retained_list[i].topic != NULL) {
|
||||
if (Topics_matches(topic, 1, retained_list[i].topic)) {
|
||||
(*cb) (&retained_list[i], clientcon);
|
||||
retval = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return retval;
|
||||
}
|
||||
|
||||
void ICACHE_FLASH_ATTR iterate_retainedtopics(iterate_retainedtopic_cb cb, void *user_data)
|
||||
{
|
||||
uint16_t i;
|
||||
|
||||
if (retained_list == NULL) return;
|
||||
|
||||
for (i=0; i<max_entry; i++) {
|
||||
if (retained_list[i].topic != NULL) {
|
||||
if ((*cb) (&retained_list[i], user_data) == true)
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,832 @@
|
|||
#include "user_interface.h"
|
||||
#include "mem.h"
|
||||
|
||||
/* Mem Debug
|
||||
#undef os_free
|
||||
#define os_free(x) {os_printf("F:%d-> %x\r\n", __LINE__,(x));vPortFree(x, "", 0);}
|
||||
|
||||
int my_os_zalloc(int len, int line) {
|
||||
int _v = pvPortZalloc(len, "", 0);
|
||||
os_printf("A:%d-> %x (%d)\r\n", line, _v, len);
|
||||
return _v;
|
||||
}
|
||||
#undef os_zalloc
|
||||
#define os_zalloc(x) my_os_zalloc(x, __LINE__)
|
||||
*/
|
||||
|
||||
#include "mqtt_server.h"
|
||||
#include "mqtt_topics.h"
|
||||
#include "mqtt_topiclist.h"
|
||||
#include "mqtt_retainedlist.h"
|
||||
|
||||
#ifndef QUEUE_BUFFER_SIZE
|
||||
#define QUEUE_BUFFER_SIZE 2048
|
||||
#endif
|
||||
|
||||
#define MAX_SUBS_PER_REQ 16
|
||||
|
||||
#define MQTT_TASK_PRIO 2
|
||||
#define MQTT_TASK_QUEUE_SIZE 1
|
||||
#define MQTT_SEND_TIMOUT 5
|
||||
|
||||
os_event_t mqtt_procTaskQueue[MQTT_TASK_QUEUE_SIZE];
|
||||
|
||||
LOCAL uint8_t zero_len_id[2] = { 0, 0 };
|
||||
|
||||
MQTT_ClientCon *clientcon_list;
|
||||
LOCAL MqttDataCallback local_data_cb = NULL;
|
||||
|
||||
#define MQTT_INFO //
|
||||
#define MQTT_WARNING os_printf
|
||||
#define MQTT_ERROR os_printf
|
||||
|
||||
|
||||
bool ICACHE_FLASH_ATTR print_topic(topic_entry *topic, void* user_data)
|
||||
{
|
||||
if (topic->clientcon == LOCAL_MQTT_CLIENT) {
|
||||
MQTT_INFO("MQTT: Client: LOCAL Topic: \"%s\" QoS: %d\r\n", topic->topic, topic->qos);
|
||||
} else {
|
||||
MQTT_INFO("MQTT: Client: %s Topic: \"%s\" QoS: %d\r\n", topic->clientcon->connect_info.client_id, topic->topic, topic->qos);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
bool ICACHE_FLASH_ATTR publish_topic(topic_entry *topic_e, uint8_t *topic, uint8_t *data, uint16_t data_len)
|
||||
{
|
||||
MQTT_ClientCon *clientcon = topic_e->clientcon;
|
||||
uint16_t message_id = 0;
|
||||
|
||||
if (topic_e->clientcon == LOCAL_MQTT_CLIENT) {
|
||||
MQTT_INFO("MQTT: Client: LOCAL Topic: \"%s\" QoS: %d\r\n", topic_e->topic, topic_e->qos);
|
||||
if (local_data_cb != NULL)
|
||||
local_data_cb(NULL, topic, os_strlen(topic), data, data_len);
|
||||
return true;
|
||||
}
|
||||
|
||||
MQTT_INFO("MQTT: Client: %s Topic: \"%s\" QoS: %d\r\n", clientcon->connect_info.client_id, topic_e->topic, topic_e->qos);
|
||||
|
||||
clientcon->mqtt_state.outbound_message =
|
||||
mqtt_msg_publish(&clientcon->mqtt_state.mqtt_connection, topic, data, data_len, topic_e->qos, 0, &message_id);
|
||||
if (QUEUE_Puts(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data, clientcon->mqtt_state.outbound_message->length) == -1) {
|
||||
MQTT_ERROR("MQTT: Queue full\r\n");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ICACHE_FLASH_ATTR publish_retainedtopic(retained_entry *entry, MQTT_ClientCon *clientcon)
|
||||
{
|
||||
uint16_t message_id = 0;
|
||||
|
||||
MQTT_INFO("MQTT: Client: %s Topic: \"%s\" QoS: %d\r\n", clientcon->connect_info.client_id, entry->topic, entry->qos);
|
||||
|
||||
clientcon->mqtt_state.outbound_message =
|
||||
mqtt_msg_publish(&clientcon->mqtt_state.mqtt_connection, entry->topic, entry->data, entry->data_len, entry->qos, 0, &message_id);
|
||||
if (QUEUE_Puts(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data, clientcon->mqtt_state.outbound_message->length) == -1) {
|
||||
MQTT_ERROR("MQTT: Queue full\r\n");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ICACHE_FLASH_ATTR activate_next_client()
|
||||
{
|
||||
MQTT_ClientCon *clientcon = clientcon_list;
|
||||
|
||||
for (clientcon = clientcon_list; clientcon != NULL; clientcon = clientcon->next) {
|
||||
if (!QUEUE_IsEmpty(&clientcon->msgQueue)) {
|
||||
MQTT_INFO("MQTT: Next message to client: %s\r\n", clientcon->connect_info.client_id);
|
||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)clientcon);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ICACHE_FLASH_ATTR
|
||||
MQTT_InitClientCon(MQTT_ClientCon *mqttClientCon)
|
||||
{
|
||||
uint32_t temp;
|
||||
MQTT_INFO("MQTT:InitClientCon\r\n");
|
||||
|
||||
mqttClientCon->connState = TCP_CONNECTED;
|
||||
|
||||
os_memset(&mqttClientCon->connect_info, 0, sizeof(mqtt_connect_info_t));
|
||||
|
||||
mqttClientCon->connect_info.client_id = zero_len_id;
|
||||
mqttClientCon->protocolVersion = 0;
|
||||
|
||||
mqttClientCon->mqtt_state.in_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);
|
||||
mqttClientCon->mqtt_state.in_buffer_length = MQTT_BUF_SIZE;
|
||||
mqttClientCon->mqtt_state.out_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);
|
||||
mqttClientCon->mqtt_state.out_buffer_length = MQTT_BUF_SIZE;
|
||||
mqttClientCon->mqtt_state.connect_info = &mqttClientCon->connect_info;
|
||||
|
||||
mqtt_msg_init(&mqttClientCon->mqtt_state.mqtt_connection, mqttClientCon->mqtt_state.out_buffer, mqttClientCon->mqtt_state.out_buffer_length);
|
||||
|
||||
QUEUE_Init(&mqttClientCon->msgQueue, QUEUE_BUFFER_SIZE);
|
||||
|
||||
mqttClientCon->next = clientcon_list;
|
||||
clientcon_list = mqttClientCon;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ICACHE_FLASH_ATTR
|
||||
MQTT_DeleteClientCon(MQTT_ClientCon *mqttClientCon)
|
||||
{
|
||||
MQTT_INFO("MQTT:DeleteClientCon\r\n");
|
||||
|
||||
if (mqttClientCon == NULL)
|
||||
return;
|
||||
|
||||
os_timer_disarm(&mqttClientCon->mqttTimer);
|
||||
|
||||
if (mqttClientCon->pCon != NULL) {
|
||||
espconn_delete(mqttClientCon->pCon);
|
||||
}
|
||||
|
||||
MQTT_ClientCon **p = &clientcon_list;
|
||||
while (*p != mqttClientCon && *p != NULL) {
|
||||
p=&((*p)->next);
|
||||
}
|
||||
if (*p == mqttClientCon)
|
||||
*p = (*p)->next;
|
||||
|
||||
if (mqttClientCon->user_data != NULL) {
|
||||
os_free(mqttClientCon->user_data);
|
||||
mqttClientCon->user_data = NULL;
|
||||
}
|
||||
|
||||
if (mqttClientCon->mqtt_state.in_buffer != NULL) {
|
||||
os_free(mqttClientCon->mqtt_state.in_buffer);
|
||||
mqttClientCon->mqtt_state.in_buffer = NULL;
|
||||
}
|
||||
|
||||
if (mqttClientCon->mqtt_state.out_buffer != NULL) {
|
||||
os_free(mqttClientCon->mqtt_state.out_buffer);
|
||||
mqttClientCon->mqtt_state.out_buffer = NULL;
|
||||
}
|
||||
|
||||
if (mqttClientCon->mqtt_state.outbound_message != NULL) {
|
||||
if (mqttClientCon->mqtt_state.outbound_message->data != NULL)
|
||||
{
|
||||
os_free(mqttClientCon->mqtt_state.outbound_message->data);
|
||||
mqttClientCon->mqtt_state.outbound_message->data = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (mqttClientCon->mqtt_state.mqtt_connection.buffer != NULL) {
|
||||
// Already freed but not NULL
|
||||
mqttClientCon->mqtt_state.mqtt_connection.buffer = NULL;
|
||||
}
|
||||
|
||||
if (mqttClientCon->connect_info.client_id != NULL) {
|
||||
/* Don't attempt to free if it's the zero_len array */
|
||||
if ( ((uint8_t*)mqttClientCon->connect_info.client_id) != zero_len_id )
|
||||
os_free(mqttClientCon->connect_info.client_id);
|
||||
mqttClientCon->connect_info.client_id = NULL;
|
||||
}
|
||||
|
||||
if (mqttClientCon->connect_info.username != NULL) {
|
||||
os_free(mqttClientCon->connect_info.username);
|
||||
mqttClientCon->connect_info.username = NULL;
|
||||
}
|
||||
|
||||
if (mqttClientCon->connect_info.password != NULL) {
|
||||
os_free(mqttClientCon->connect_info.password);
|
||||
mqttClientCon->connect_info.password = NULL;
|
||||
}
|
||||
|
||||
if (mqttClientCon->connect_info.will_topic != NULL) {
|
||||
// Publish the LWT
|
||||
find_topic(mqttClientCon->connect_info.will_topic, publish_topic,
|
||||
mqttClientCon->connect_info.will_data, mqttClientCon->connect_info.will_data_len);
|
||||
activate_next_client();
|
||||
|
||||
if (mqttClientCon->connect_info.will_retain) {
|
||||
update_retainedtopic(mqttClientCon->connect_info.will_topic, mqttClientCon->connect_info.will_data,
|
||||
mqttClientCon->connect_info.will_data_len, mqttClientCon->connect_info.will_qos);
|
||||
}
|
||||
|
||||
os_free(mqttClientCon->connect_info.will_topic);
|
||||
mqttClientCon->connect_info.will_topic = NULL;
|
||||
}
|
||||
|
||||
if (mqttClientCon->connect_info.will_data != NULL) {
|
||||
os_free(mqttClientCon->connect_info.will_data);
|
||||
mqttClientCon->connect_info.will_data = NULL;
|
||||
}
|
||||
|
||||
if (mqttClientCon->msgQueue.buf != NULL) {
|
||||
os_free(mqttClientCon->msgQueue.buf);
|
||||
mqttClientCon->msgQueue.buf = NULL;
|
||||
}
|
||||
|
||||
delete_topic(mqttClientCon, 0);
|
||||
|
||||
os_free(mqttClientCon);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void ICACHE_FLASH_ATTR
|
||||
MQTT_ServerDisconnect(MQTT_ClientCon *mqttClientCon)
|
||||
{
|
||||
MQTT_INFO("MQTT:ServerDisconnect\r\n");
|
||||
|
||||
mqttClientCon->mqtt_state.message_length_read = 0;
|
||||
mqttClientCon->connState = TCP_DISCONNECTED;
|
||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClientCon);
|
||||
os_timer_disarm(&mqttClientCon->mqttTimer);
|
||||
}
|
||||
|
||||
|
||||
void ICACHE_FLASH_ATTR mqtt_server_timer(void *arg)
|
||||
{
|
||||
MQTT_ClientCon *clientcon = (MQTT_ClientCon*)arg;
|
||||
|
||||
if (clientcon->sendTimeout > 0)
|
||||
clientcon->sendTimeout--;
|
||||
}
|
||||
|
||||
|
||||
static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, unsigned short len)
|
||||
{
|
||||
uint8_t msg_type;
|
||||
uint8_t msg_qos;
|
||||
uint16_t msg_id;
|
||||
enum mqtt_connect_flag msg_conn_ret;
|
||||
uint16_t topic_index;
|
||||
uint16_t topic_len;
|
||||
uint8_t *topic_str;
|
||||
uint8_t topic_buffer[MQTT_BUF_SIZE];
|
||||
uint16_t data_len;
|
||||
uint8_t *data;
|
||||
|
||||
struct espconn *pCon = (struct espconn *)arg;
|
||||
|
||||
MQTT_INFO("MQTT_ClientCon_recv_cb(): %d bytes of data received\n", len);
|
||||
|
||||
MQTT_ClientCon *clientcon = (MQTT_ClientCon *)pCon->reverse;
|
||||
if (clientcon == NULL) {
|
||||
MQTT_ERROR("ERROR: No client status\r\n");
|
||||
return;
|
||||
}
|
||||
|
||||
MQTT_INFO("MQTT: TCP: data received %d bytes (State: %d)\r\n", len, clientcon->connState);
|
||||
|
||||
// Expect minimum the full fixed size header
|
||||
if (len+clientcon->mqtt_state.message_length_read > MQTT_BUF_SIZE || len < 2) {
|
||||
MQTT_ERROR("MQTT: Message too short/long\r\n");
|
||||
clientcon->mqtt_state.message_length_read = 0;
|
||||
return;
|
||||
}
|
||||
READPACKET:
|
||||
os_memcpy(&clientcon->mqtt_state.in_buffer[clientcon->mqtt_state.message_length_read], pdata, len);
|
||||
clientcon->mqtt_state.message_length_read += len;
|
||||
|
||||
clientcon->mqtt_state.message_length =
|
||||
mqtt_get_total_length(clientcon->mqtt_state.in_buffer, clientcon->mqtt_state.message_length_read);
|
||||
MQTT_INFO("MQTT: total_len: %d\r\n", clientcon->mqtt_state.message_length);
|
||||
if (clientcon->mqtt_state.message_length > clientcon->mqtt_state.message_length_read) {
|
||||
MQTT_WARNING("MQTT: Partial message received\r\n");
|
||||
return;
|
||||
}
|
||||
|
||||
msg_type = mqtt_get_type(clientcon->mqtt_state.in_buffer);
|
||||
MQTT_INFO("MQTT: message_type: %d\r\n", msg_type);
|
||||
//msg_qos = mqtt_get_qos(clientcon->mqtt_state.in_buffer);
|
||||
switch (clientcon->connState) {
|
||||
case TCP_CONNECTED:
|
||||
switch (msg_type)
|
||||
{
|
||||
case MQTT_MSG_TYPE_CONNECT:
|
||||
|
||||
MQTT_INFO("MQTT: Connect received, message_len: %d\r\n", clientcon->mqtt_state.message_length);
|
||||
|
||||
if (clientcon->mqtt_state.message_length < sizeof(struct mqtt_connect_variable_header)+3) {
|
||||
MQTT_ERROR("MQTT: Too short Connect message\r\n");
|
||||
MQTT_ServerDisconnect(clientcon);
|
||||
return;
|
||||
}
|
||||
|
||||
struct mqtt_connect_variable_header4* variable_header =
|
||||
(struct mqtt_connect_variable_header4*)&clientcon->mqtt_state.in_buffer[2];
|
||||
uint16_t var_header_len = sizeof(struct mqtt_connect_variable_header4);
|
||||
|
||||
// We check MQTT v3.11 (version 4)
|
||||
if ((variable_header->lengthMsb<<8) + variable_header->lengthLsb == 4 &&
|
||||
variable_header->version == 4 &&
|
||||
os_strncmp(variable_header->magic, "MQTT", 4) == 0) {
|
||||
clientcon->protocolVersion = 4;
|
||||
} else {
|
||||
struct mqtt_connect_variable_header3* variable_header3 =
|
||||
(struct mqtt_connect_variable_header3*)&clientcon->mqtt_state.in_buffer[2];
|
||||
var_header_len = sizeof(struct mqtt_connect_variable_header3);
|
||||
|
||||
// We check MQTT v3.1 (version 3)
|
||||
if ((variable_header3->lengthMsb<<8) + variable_header3->lengthLsb == 6 &&
|
||||
variable_header3->version == 3 &&
|
||||
os_strncmp(variable_header3->magic, "MQIsdp", 6) == 0) {
|
||||
clientcon->protocolVersion = 3;
|
||||
// adapt the remaining header fields (dirty as we overlay the two structs of different length)
|
||||
variable_header->version = variable_header3->version;
|
||||
variable_header->flags = variable_header3->flags;
|
||||
variable_header->keepaliveMsb = variable_header3->keepaliveMsb;
|
||||
variable_header->keepaliveLsb = variable_header3->keepaliveLsb;
|
||||
} else {
|
||||
// Neither found
|
||||
MQTT_WARNING("MQTT: Wrong protocoll version\r\n");
|
||||
msg_conn_ret = CONNECTION_REFUSE_PROTOCOL;
|
||||
clientcon->connState = TCP_DISCONNECTING;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
MQTT_INFO("MQTT: Connect flags %x\r\n", variable_header->flags);
|
||||
clientcon->connect_info.clean_session = (variable_header->flags & MQTT_CONNECT_FLAG_CLEAN_SESSION) != 0;
|
||||
if ((variable_header->flags & MQTT_CONNECT_FLAG_USERNAME) != 0 ||
|
||||
(variable_header->flags & MQTT_CONNECT_FLAG_PASSWORD) != 0) {
|
||||
MQTT_WARNING("MQTT: Connect option currently not supported\r\n");
|
||||
msg_conn_ret = CONNECTION_REFUSE_NOT_AUTHORIZED;
|
||||
clientcon->connState = TCP_DISCONNECTING;
|
||||
break;
|
||||
}
|
||||
clientcon->connect_info.keepalive = (variable_header->keepaliveMsb<<8) + variable_header->keepaliveLsb;
|
||||
espconn_regist_time(clientcon->pCon, 2*clientcon->connect_info.keepalive, 1);
|
||||
MQTT_INFO("MQTT: Keepalive %d\r\n", clientcon->connect_info.keepalive);
|
||||
|
||||
// Get the client id
|
||||
uint16_t id_len = clientcon->mqtt_state.message_length - (2+var_header_len);
|
||||
const char *client_id = mqtt_get_str(&clientcon->mqtt_state.in_buffer[2+var_header_len], &id_len);
|
||||
if (client_id == NULL || id_len > 80) {
|
||||
MQTT_WARNING("MQTT: Client Id invalid\r\n");
|
||||
msg_conn_ret = CONNECTION_REFUSE_ID_REJECTED;
|
||||
clientcon->connState = TCP_DISCONNECTING;
|
||||
break;
|
||||
}
|
||||
if (id_len == 0) {
|
||||
if (clientcon->protocolVersion == 3) {
|
||||
MQTT_WARNING("MQTT: Empty client Id in MQTT 3.1\r\n");
|
||||
msg_conn_ret = CONNECTION_REFUSE_ID_REJECTED;
|
||||
clientcon->connState = TCP_DISCONNECTING;
|
||||
break;
|
||||
}
|
||||
if (!clientcon->connect_info.clean_session) {
|
||||
MQTT_WARNING("MQTT: Null client Id and NOT cleansession\r\n");
|
||||
msg_conn_ret = CONNECTION_REFUSE_ID_REJECTED;
|
||||
clientcon->connState = TCP_DISCONNECTING;
|
||||
break;
|
||||
}
|
||||
clientcon->connect_info.client_id = zero_len_id;
|
||||
} else {
|
||||
clientcon->connect_info.client_id = (char *) os_zalloc(id_len+1);
|
||||
if (clientcon->connect_info.client_id != NULL) {
|
||||
os_memcpy(clientcon->connect_info.client_id, client_id, id_len);
|
||||
clientcon->connect_info.client_id[id_len]=0;
|
||||
MQTT_INFO("MQTT: Client id %s\r\n", clientcon->connect_info.client_id);
|
||||
} else {
|
||||
MQTT_ERROR("MQTT: Out of mem\r\n");
|
||||
msg_conn_ret = CONNECTION_REFUSE_SERVER_UNAVAILABLE;
|
||||
clientcon->connState = TCP_DISCONNECTING;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Get the LWT
|
||||
clientcon->connect_info.will_retain = (variable_header->flags & MQTT_CONNECT_FLAG_WILL_RETAIN) != 0;
|
||||
clientcon->connect_info.will_qos = (variable_header->flags & 0x18)>>3;
|
||||
if (!(variable_header->flags & MQTT_CONNECT_FLAG_WILL)) {
|
||||
// Must be all 0 if no lwt is given
|
||||
if (clientcon->connect_info.will_retain || clientcon->connect_info.will_qos) {
|
||||
MQTT_WARNING("MQTT: Last Will flags invalid\r\n");
|
||||
MQTT_ServerDisconnect(clientcon);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
uint16_t lw_topic_len = clientcon->mqtt_state.message_length - (4+var_header_len+id_len);
|
||||
const char *lw_topic = mqtt_get_str(&clientcon->mqtt_state.in_buffer[4+var_header_len+id_len], &lw_topic_len);
|
||||
|
||||
if (lw_topic == NULL) {
|
||||
MQTT_WARNING("MQTT: Last Will topic invalid\r\n");
|
||||
MQTT_ServerDisconnect(clientcon);
|
||||
return;
|
||||
}
|
||||
|
||||
clientcon->connect_info.will_topic = (char *) os_zalloc(lw_topic_len+1);
|
||||
if (clientcon->connect_info.will_topic != NULL) {
|
||||
os_memcpy(clientcon->connect_info.will_topic, lw_topic, lw_topic_len);
|
||||
clientcon->connect_info.will_topic[lw_topic_len]=0;
|
||||
if (Topics_hasWildcards(clientcon->connect_info.will_topic)) {
|
||||
MQTT_WARNING("MQTT: Last Will topic has wildcards\r\n");
|
||||
MQTT_ServerDisconnect(clientcon);
|
||||
return;
|
||||
}
|
||||
MQTT_INFO("MQTT: LWT topic %s\r\n", clientcon->connect_info.will_topic);
|
||||
} else {
|
||||
MQTT_ERROR("MQTT: Out of mem\r\n");
|
||||
msg_conn_ret = CONNECTION_REFUSE_SERVER_UNAVAILABLE;
|
||||
clientcon->connState = TCP_DISCONNECTING;
|
||||
break;
|
||||
}
|
||||
|
||||
uint16_t lw_data_len = clientcon->mqtt_state.message_length - (6+var_header_len+id_len+lw_topic_len);
|
||||
const char *lw_data = mqtt_get_str(&clientcon->mqtt_state.in_buffer[6+var_header_len+id_len+lw_topic_len], &lw_data_len);
|
||||
|
||||
if (lw_data == NULL) {
|
||||
MQTT_WARNING("MQTT: Last Will data invalid\r\n");
|
||||
MQTT_ServerDisconnect(clientcon);
|
||||
return;
|
||||
}
|
||||
|
||||
clientcon->connect_info.will_data = (char *) os_zalloc(lw_data_len);
|
||||
clientcon->connect_info.will_data_len = lw_data_len;
|
||||
if (clientcon->connect_info.will_data != NULL) {
|
||||
os_memcpy(clientcon->connect_info.will_data, lw_data, lw_data_len);
|
||||
MQTT_INFO("MQTT: %d bytes of LWT data\r\n", clientcon->connect_info.will_data_len);
|
||||
} else {
|
||||
MQTT_ERROR("MQTT: Out of mem\r\n");
|
||||
msg_conn_ret = CONNECTION_REFUSE_SERVER_UNAVAILABLE;
|
||||
clientcon->connState = TCP_DISCONNECTING;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
msg_conn_ret = CONNECTION_ACCEPTED;
|
||||
clientcon->connState = MQTT_DATA;
|
||||
break;
|
||||
|
||||
default:
|
||||
MQTT_WARNING("MQTT: Invalid message\r\n");
|
||||
MQTT_ServerDisconnect(clientcon);
|
||||
return;
|
||||
}
|
||||
clientcon->mqtt_state.outbound_message = mqtt_msg_connack(&clientcon->mqtt_state.mqtt_connection, msg_conn_ret);
|
||||
if (QUEUE_Puts(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data, clientcon->mqtt_state.outbound_message->length) == -1) {
|
||||
MQTT_ERROR("MQTT: Queue full\r\n");
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case MQTT_DATA:
|
||||
switch (msg_type)
|
||||
{
|
||||
uint8_t ret_codes[MAX_SUBS_PER_REQ];
|
||||
uint8_t num_subs;
|
||||
|
||||
case MQTT_MSG_TYPE_SUBSCRIBE:
|
||||
MQTT_INFO("MQTT: Subscribe received, message_len: %d\r\n", clientcon->mqtt_state.message_length);
|
||||
// 2B fixed header + 2B variable header + 2 len + 1 char + 1 QoS
|
||||
if (clientcon->mqtt_state.message_length < 8) {
|
||||
MQTT_ERROR("MQTT: Too short Subscribe message\r\n");
|
||||
MQTT_ServerDisconnect(clientcon);
|
||||
return;
|
||||
}
|
||||
msg_id = mqtt_get_id(clientcon->mqtt_state.in_buffer, clientcon->mqtt_state.in_buffer_length);
|
||||
MQTT_INFO("MQTT: Message id %d\r\n", msg_id);
|
||||
topic_index = 4;
|
||||
num_subs = 0;
|
||||
while (topic_index < clientcon->mqtt_state.message_length && num_subs < MAX_SUBS_PER_REQ) {
|
||||
topic_len = clientcon->mqtt_state.message_length - topic_index;
|
||||
topic_str = mqtt_get_str(&clientcon->mqtt_state.in_buffer[topic_index], &topic_len);
|
||||
if (topic_str == NULL) {
|
||||
MQTT_WARNING("MQTT: Subscribe topic invalid\r\n");
|
||||
MQTT_ServerDisconnect(clientcon);
|
||||
return;
|
||||
}
|
||||
topic_index += 2 + topic_len;
|
||||
|
||||
if (topic_index >= clientcon->mqtt_state.message_length) {
|
||||
MQTT_WARNING("MQTT: Subscribe QoS missing\r\n");
|
||||
MQTT_ServerDisconnect(clientcon);
|
||||
return;
|
||||
}
|
||||
uint8_t topic_QoS = clientcon->mqtt_state.in_buffer[topic_index++];
|
||||
|
||||
os_memcpy(topic_buffer, topic_str, topic_len);
|
||||
topic_buffer[topic_len] = 0;
|
||||
MQTT_INFO("MQTT: Subscribed topic %s QoS %d\r\n", topic_buffer, topic_QoS);
|
||||
|
||||
// the return codes, one per topic
|
||||
// For now we always give back error or QoS = 0 !!
|
||||
ret_codes[num_subs++] = add_topic(clientcon, topic_buffer, 0)?0:0x80;
|
||||
//iterate_topics(print_topic, 0);
|
||||
}
|
||||
MQTT_INFO("MQTT: Subscribe successful\r\n");
|
||||
|
||||
clientcon->mqtt_state.outbound_message = mqtt_msg_suback(&clientcon->mqtt_state.mqtt_connection, ret_codes, num_subs, msg_id);
|
||||
if (QUEUE_Puts(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data, clientcon->mqtt_state.outbound_message->length) == -1) {
|
||||
MQTT_ERROR("MQTT: Queue full\r\n");
|
||||
}
|
||||
|
||||
find_retainedtopic(topic_buffer, publish_retainedtopic, clientcon);
|
||||
|
||||
break;
|
||||
|
||||
case MQTT_MSG_TYPE_UNSUBSCRIBE:
|
||||
MQTT_INFO("MQTT: Unsubscribe received, message_len: %d\r\n", clientcon->mqtt_state.message_length);
|
||||
// 2B fixed header + 2B variable header + 2 len + 1 char
|
||||
if (clientcon->mqtt_state.message_length < 7) {
|
||||
MQTT_ERROR("MQTT: Too short Unsubscribe message\r\n");
|
||||
MQTT_ServerDisconnect(clientcon);
|
||||
return;
|
||||
}
|
||||
msg_id = mqtt_get_id(clientcon->mqtt_state.in_buffer, clientcon->mqtt_state.in_buffer_length);
|
||||
MQTT_INFO("MQTT: Message id %d\r\n", msg_id);
|
||||
topic_index = 4;
|
||||
while (topic_index < clientcon->mqtt_state.message_length) {
|
||||
uint16_t topic_len = clientcon->mqtt_state.message_length - topic_index;
|
||||
char *topic_str = mqtt_get_str(&clientcon->mqtt_state.in_buffer[topic_index], &topic_len);
|
||||
if (topic_str == NULL) {
|
||||
MQTT_WARNING("MQTT: Subscribe topic invalid\r\n");
|
||||
MQTT_ServerDisconnect(clientcon);
|
||||
return;
|
||||
}
|
||||
topic_index += 2 + topic_len;
|
||||
|
||||
os_memcpy(topic_buffer, topic_str, topic_len);
|
||||
topic_buffer[topic_len] = 0;
|
||||
MQTT_INFO("MQTT: Unsubscribed topic %s\r\n", topic_buffer);
|
||||
|
||||
delete_topic(clientcon, topic_buffer);
|
||||
//iterate_topics(print_topic, 0);
|
||||
}
|
||||
MQTT_INFO("MQTT: Unubscribe successful\r\n");
|
||||
|
||||
clientcon->mqtt_state.outbound_message = mqtt_msg_unsuback(&clientcon->mqtt_state.mqtt_connection, msg_id);
|
||||
if (QUEUE_Puts(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data, clientcon->mqtt_state.outbound_message->length) == -1) {
|
||||
MQTT_ERROR("MQTT: Queue full\r\n");
|
||||
}
|
||||
break;
|
||||
|
||||
case MQTT_MSG_TYPE_PUBLISH:
|
||||
MQTT_INFO("MQTT: Publish received, message_len: %d\r\n", clientcon->mqtt_state.message_length);
|
||||
|
||||
/* if (msg_qos == 1)
|
||||
clientcon->mqtt_state.outbound_message = mqtt_msg_puback(&clientcon->mqtt_state.mqtt_connection, msg_id);
|
||||
else if (msg_qos == 2)
|
||||
clientcon->mqtt_state.outbound_message = mqtt_msg_pubrec(&clientcon->mqtt_state.mqtt_connection, msg_id);
|
||||
if (msg_qos == 1 || msg_qos == 2) {
|
||||
MQTT_INFO("MQTT: Queue response QoS: %d\r\n", msg_qos);
|
||||
if (QUEUE_Puts(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data, clientcon->mqtt_state.outbound_message->length) == -1) {
|
||||
MQTT_ERROR("MQTT: Queue full\r\n");
|
||||
}
|
||||
}
|
||||
*/
|
||||
topic_len = clientcon->mqtt_state.in_buffer_length;
|
||||
topic_str = (uint8_t*)mqtt_get_publish_topic(clientcon->mqtt_state.in_buffer, &topic_len);
|
||||
os_memcpy(topic_buffer, topic_str, topic_len);
|
||||
topic_buffer[topic_len] = 0;
|
||||
data_len = clientcon->mqtt_state.in_buffer_length;
|
||||
data = (uint8_t*)mqtt_get_publish_data(clientcon->mqtt_state.in_buffer, &data_len);
|
||||
|
||||
MQTT_INFO("MQTT: Published topic \"%s\"\r\n", topic_buffer);
|
||||
MQTT_INFO("MQTT: Matches to:\r\n");
|
||||
|
||||
// Now find, if anything matches and enque publish message
|
||||
find_topic(topic_buffer, publish_topic, data, data_len);
|
||||
|
||||
if (mqtt_get_retain(clientcon->mqtt_state.in_buffer)) {
|
||||
update_retainedtopic(topic_buffer, data, data_len, mqtt_get_qos(clientcon->mqtt_state.in_buffer));
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case MQTT_MSG_TYPE_PINGREQ:
|
||||
MQTT_INFO("MQTT: receive MQTT_MSG_TYPE_PINGREQ\r\n");
|
||||
clientcon->mqtt_state.outbound_message = mqtt_msg_pingresp(&clientcon->mqtt_state.mqtt_connection);
|
||||
if (QUEUE_Puts(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data, clientcon->mqtt_state.outbound_message->length) == -1) {
|
||||
MQTT_ERROR("MQTT: Queue full\r\n");
|
||||
}
|
||||
break;
|
||||
|
||||
case MQTT_MSG_TYPE_DISCONNECT:
|
||||
MQTT_INFO("MQTT: receive MQTT_MSG_TYPE_DISCONNECT\r\n");
|
||||
|
||||
// Clean session close: no LWT
|
||||
if (clientcon->connect_info.will_topic != NULL) {
|
||||
os_free(clientcon->connect_info.will_topic);
|
||||
clientcon->connect_info.will_topic = NULL;
|
||||
}
|
||||
MQTT_ServerDisconnect(clientcon);
|
||||
return;
|
||||
|
||||
/*
|
||||
case MQTT_MSG_TYPE_PUBACK:
|
||||
if (clientcon->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && clientcon->mqtt_state.pending_msg_id == msg_id) {
|
||||
MQTT_INFO("MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\r\n");
|
||||
}
|
||||
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREC:
|
||||
clientcon->mqtt_state.outbound_message = mqtt_msg_pubrel(&clientcon->mqtt_state.mqtt_connection, msg_id);
|
||||
if (QUEUE_Puts(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data, clientcon->mqtt_state.outbound_message->length) == -1) {
|
||||
MQTT_ERROR("MQTT: Queue full\r\n");
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBREL:
|
||||
clientcon->mqtt_state.outbound_message = mqtt_msg_pubcomp(&clientcon->mqtt_state.mqtt_connection, msg_id);
|
||||
if (QUEUE_Puts(&clientcon->msgQueue, clientcon->mqtt_state.outbound_message->data, clientcon->mqtt_state.outbound_message->length) == -1) {
|
||||
MQTT_ERROR("MQTT: Queue full\r\n");
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PUBCOMP:
|
||||
if (clientcon->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && clientcon->mqtt_state.pending_msg_id == msg_id) {
|
||||
MQTT_INFO("MQTT: receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish\r\n");
|
||||
}
|
||||
break;
|
||||
case MQTT_MSG_TYPE_PINGRESP:
|
||||
// Ignore
|
||||
break;
|
||||
*/
|
||||
|
||||
default:
|
||||
// Ignore
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// More than one MQTT command in the packet?
|
||||
len = clientcon->mqtt_state.message_length_read;
|
||||
if (clientcon->mqtt_state.message_length < len) {
|
||||
len -= clientcon->mqtt_state.message_length;
|
||||
pdata += clientcon->mqtt_state.message_length;
|
||||
clientcon->mqtt_state.message_length_read = 0;
|
||||
|
||||
MQTT_INFO("MQTT: Get another received message\r\n");
|
||||
goto READPACKET;
|
||||
}
|
||||
clientcon->mqtt_state.message_length_read = 0;
|
||||
|
||||
if (msg_type != MQTT_MSG_TYPE_PUBLISH) {
|
||||
system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)clientcon);
|
||||
} else {
|
||||
activate_next_client();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Called when a client has disconnected from the MQTT server */
|
||||
static void ICACHE_FLASH_ATTR MQTT_ClientCon_discon_cb(void *arg)
|
||||
{
|
||||
struct espconn *pCon = (struct espconn *)arg;
|
||||
MQTT_ClientCon *clientcon = (MQTT_ClientCon *)pCon->reverse;
|
||||
|
||||
MQTT_INFO("MQTT_ClientCon_discon_cb(): client disconnected\n");
|
||||
MQTT_DeleteClientCon(clientcon);
|
||||
}
|
||||
|
||||
|
||||
static void ICACHE_FLASH_ATTR MQTT_ClientCon_sent_cb(void *arg)
|
||||
{
|
||||
struct espconn *pCon = (struct espconn *)arg;
|
||||
MQTT_ClientCon *clientcon = (MQTT_ClientCon *)pCon->reverse;
|
||||
|
||||
MQTT_INFO("MQTT_ClientCon_sent_cb(): Data sent\n");
|
||||
|
||||
clientcon->sendTimeout = 0;
|
||||
|
||||
if (clientcon->connState == TCP_DISCONNECTING) {
|
||||
clientcon->connState = TCP_DISCONNECTED;
|
||||
espconn_disconnect(clientcon->pCon);
|
||||
}
|
||||
|
||||
activate_next_client();
|
||||
}
|
||||
|
||||
|
||||
/* Called when a client connects to the MQTT server */
|
||||
static void ICACHE_FLASH_ATTR MQTT_ClientCon_connected_cb(void *arg)
|
||||
{
|
||||
struct espconn *pespconn = (struct espconn *)arg;
|
||||
MQTT_ClientCon *mqttClientCon;
|
||||
|
||||
MQTT_INFO("MQTT_ClientCon_connected_cb(): Client connected\r\n");
|
||||
|
||||
espconn_regist_sentcb(pespconn, MQTT_ClientCon_sent_cb);
|
||||
espconn_regist_disconcb(pespconn, MQTT_ClientCon_discon_cb);
|
||||
espconn_regist_recvcb(pespconn, MQTT_ClientCon_recv_cb);
|
||||
espconn_regist_time(pespconn, 30, 1);
|
||||
|
||||
mqttClientCon = (MQTT_ClientCon *)os_zalloc(sizeof(MQTT_ClientCon));
|
||||
pespconn->reverse = mqttClientCon;
|
||||
if (mqttClientCon == NULL) {
|
||||
MQTT_ERROR("ERROR: Cannot allocate client status\r\n");
|
||||
return;
|
||||
}
|
||||
|
||||
MQTT_InitClientCon(mqttClientCon);
|
||||
mqttClientCon->pCon = pespconn;
|
||||
|
||||
os_timer_setfn(&mqttClientCon->mqttTimer, (os_timer_func_t *)mqtt_server_timer, mqttClientCon);
|
||||
os_timer_arm(&mqttClientCon->mqttTimer, 1000, 1);
|
||||
}
|
||||
|
||||
|
||||
void ICACHE_FLASH_ATTR
|
||||
MQTT_ServerTask(os_event_t *e)
|
||||
{
|
||||
MQTT_ClientCon *clientcon = (MQTT_ClientCon *)e->par;
|
||||
uint8_t dataBuffer[MQTT_BUF_SIZE];
|
||||
uint16_t dataLen;
|
||||
if (e->par == 0)
|
||||
return;
|
||||
|
||||
MQTT_INFO("MQTT Task: State %d \r\n", clientcon->connState);
|
||||
|
||||
switch (clientcon->connState) {
|
||||
|
||||
case TCP_DISCONNECTED:
|
||||
MQTT_INFO("MQTT: Disconnect\r\n");
|
||||
espconn_disconnect(clientcon->pCon);
|
||||
break;
|
||||
case TCP_DISCONNECTING:
|
||||
case MQTT_DATA:
|
||||
if (!QUEUE_IsEmpty(&clientcon->msgQueue) && clientcon->sendTimeout == 0 &&
|
||||
QUEUE_Gets(&clientcon->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0) {
|
||||
|
||||
clientcon->mqtt_state.pending_msg_type = mqtt_get_type(dataBuffer);
|
||||
clientcon->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen);
|
||||
|
||||
clientcon->sendTimeout = MQTT_SEND_TIMOUT;
|
||||
MQTT_INFO("MQTT: Sending, type: %d, id: %04X\r\n", clientcon->mqtt_state.pending_msg_type, clientcon->mqtt_state.pending_msg_id);
|
||||
espconn_send(clientcon->pCon, dataBuffer, dataLen);
|
||||
|
||||
clientcon->mqtt_state.outbound_message = NULL;
|
||||
break;
|
||||
}
|
||||
if (clientcon->connState == TCP_DISCONNECTING) {
|
||||
MQTT_ServerDisconnect(clientcon);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool ICACHE_FLASH_ATTR MQTT_server_start(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics)
|
||||
{
|
||||
MQTT_INFO("Starting MQTT server on port %d\r\n", portno);
|
||||
|
||||
if (!create_topiclist(max_subscriptions)) return false;
|
||||
if (!create_retainedlist(max_retained_topics)) return false;
|
||||
clientcon_list = NULL;
|
||||
|
||||
struct espconn *pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));
|
||||
if (pCon == NULL)
|
||||
return false;
|
||||
|
||||
/* Equivalent to bind */
|
||||
pCon->type = ESPCONN_TCP;
|
||||
pCon->state = ESPCONN_NONE;
|
||||
pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp));
|
||||
if (pCon->proto.tcp == NULL) {
|
||||
os_free(pCon);
|
||||
return false;
|
||||
}
|
||||
pCon->proto.tcp->local_port = portno;
|
||||
|
||||
/* Register callback when clients connect to the server */
|
||||
espconn_regist_connectcb(pCon, MQTT_ClientCon_connected_cb);
|
||||
|
||||
/* Put the connection in accept mode */
|
||||
espconn_accept(pCon);
|
||||
|
||||
system_os_task(MQTT_ServerTask, MQTT_TASK_PRIO, mqtt_procTaskQueue, MQTT_TASK_QUEUE_SIZE);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ICACHE_FLASH_ATTR MQTT_local_publish(uint8_t* topic, uint8_t* data, uint16_t data_length, uint8_t qos, uint8_t retain)
|
||||
{
|
||||
find_topic(topic, publish_topic, data, data_length);
|
||||
if (retain)
|
||||
update_retainedtopic(topic, data, data_length, qos);
|
||||
activate_next_client();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ICACHE_FLASH_ATTR MQTT_local_subscribe(uint8_t* topic, uint8_t qos)
|
||||
{
|
||||
return add_topic(LOCAL_MQTT_CLIENT, topic, 0);
|
||||
}
|
||||
|
||||
bool ICACHE_FLASH_ATTR MQTT_local_unsubscribe(uint8_t* topic)
|
||||
{
|
||||
return delete_topic(LOCAL_MQTT_CLIENT, topic);
|
||||
}
|
||||
|
||||
void ICACHE_FLASH_ATTR MQTT_local_onData(MqttDataCallback dataCb)
|
||||
{
|
||||
local_data_cb = dataCb;
|
||||
}
|
||||
|
|
@ -0,0 +1,90 @@
|
|||
#include "mem.h"
|
||||
#include "ets_sys.h"
|
||||
#include "osapi.h"
|
||||
#include "os_type.h"
|
||||
|
||||
#include <string.h>
|
||||
#include "user_config.h"
|
||||
|
||||
#include "mqtt_topiclist.h"
|
||||
#include "mqtt_topics.h"
|
||||
|
||||
static topic_entry *topic_list = NULL;
|
||||
static uint16_t max_entry;
|
||||
|
||||
bool ICACHE_FLASH_ATTR create_topiclist(uint16_t num_entires)
|
||||
{
|
||||
max_entry = num_entires;
|
||||
topic_list = (topic_entry *)os_zalloc(num_entires * sizeof(topic_entry));
|
||||
return topic_list != NULL;
|
||||
}
|
||||
|
||||
bool ICACHE_FLASH_ATTR add_topic(MQTT_ClientCon *clientcon, uint8_t *topic, uint8_t qos)
|
||||
{
|
||||
uint16_t i;
|
||||
|
||||
if (topic_list == NULL) return false;
|
||||
if (!Topics_isValidName(topic)) return false;
|
||||
|
||||
for (i=0; i<max_entry; i++) {
|
||||
if (topic_list[i].clientcon == NULL) {
|
||||
topic_list[i].topic = (uint8_t*)os_malloc(os_strlen(topic)+1);
|
||||
if (topic_list[i].topic == NULL)
|
||||
return false;
|
||||
os_strcpy(topic_list[i].topic, topic);
|
||||
topic_list[i].clientcon = clientcon;
|
||||
topic_list[i].qos = qos;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ICACHE_FLASH_ATTR delete_topic(MQTT_ClientCon *clientcon, uint8_t *topic){
|
||||
uint16_t i;
|
||||
|
||||
if (topic_list == NULL) return false;
|
||||
|
||||
for (i=0; i<max_entry; i++) {
|
||||
if (topic_list[i].clientcon != NULL && (clientcon == NULL || topic_list[i].clientcon == clientcon)) {
|
||||
if (topic == NULL || (topic_list[i].topic != NULL && strcmp(topic, topic_list[i].topic)==0)) {
|
||||
topic_list[i].clientcon = NULL;
|
||||
os_free(topic_list[i].topic);
|
||||
topic_list[i].qos = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ICACHE_FLASH_ATTR find_topic(uint8_t *topic, find_topic_cb cb, uint8_t *data, uint16_t data_len)
|
||||
{
|
||||
uint16_t i;
|
||||
bool retval = false;
|
||||
|
||||
if (topic_list == NULL) return false;
|
||||
|
||||
for (i=0; i<max_entry; i++) {
|
||||
if (topic_list[i].clientcon != NULL) {
|
||||
if (Topics_matches(topic_list[i].topic, 1, topic)) {
|
||||
(*cb) (&topic_list[i], topic, data, data_len);
|
||||
retval = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return retval;
|
||||
}
|
||||
|
||||
void ICACHE_FLASH_ATTR iterate_topics(iterate_topic_cb cb, void *user_data)
|
||||
{
|
||||
uint16_t i;
|
||||
|
||||
if (topic_list == NULL) return;
|
||||
|
||||
for (i=0; i<max_entry; i++) {
|
||||
if (topic_list[i].clientcon != NULL) {
|
||||
if ((*cb) (&topic_list[i], user_data) == true)
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,318 @@
|
|||
/*******************************************************************************
|
||||
* Copyright (c) 2007, 2013 IBM Corp.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Ian Craggs - initial API and implementation and/or initial documentation
|
||||
*******************************************************************************/
|
||||
|
||||
/**
|
||||
* @file
|
||||
* Topic handling functions.
|
||||
*
|
||||
* Topic syntax matches that of other MQTT servers such as Micro broker.
|
||||
*/
|
||||
|
||||
#include "mqtt_topics.h"
|
||||
|
||||
#include "os_type.h"
|
||||
#include "osapi.h"
|
||||
#include "mem.h"
|
||||
|
||||
char *_strtok_r(char *s, const char *delim, char **last);
|
||||
|
||||
char *_strchr(const char *s, int c)
|
||||
{
|
||||
while (*s != (char)c)
|
||||
if (!*s++)
|
||||
return 0;
|
||||
return (char *)s;
|
||||
}
|
||||
|
||||
char *_strdup(char *src)
|
||||
{
|
||||
char *str;
|
||||
char *p;
|
||||
int len = 0;
|
||||
|
||||
while (src[len])
|
||||
len++;
|
||||
str = (char*)os_malloc(len + 1);
|
||||
p = str;
|
||||
while (*src)
|
||||
*p++ = *src++;
|
||||
*p = '\0';
|
||||
return str;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Checks that the syntax of a topic string is correct.
|
||||
* @param aName the topic name string
|
||||
* @return boolean value indicating whether the topic name is valid
|
||||
*/
|
||||
int Topics_isValidName(char* aName)
|
||||
{
|
||||
int rc = true;
|
||||
char *c = NULL;
|
||||
int length = os_strlen(aName);
|
||||
char* hashpos = _strchr(aName, '#'); /* '#' wildcard can be only at the beginning or the end of a topic */
|
||||
|
||||
if (hashpos != NULL)
|
||||
{
|
||||
char* second = _strchr(hashpos+1, '#');
|
||||
if ((hashpos != aName && hashpos != aName+(length-1)) || second != NULL)
|
||||
rc = false;
|
||||
}
|
||||
|
||||
/* '#' or '+' only next to a slash separator or end of name */
|
||||
for (c = "#+"; *c != '\0'; ++c)
|
||||
{
|
||||
char* pos = _strchr(aName, *c);
|
||||
while (pos != NULL)
|
||||
{
|
||||
if (pos > aName) /* check previous char is '/'*/
|
||||
{
|
||||
if (*(pos - 1) != '/')
|
||||
rc = false;
|
||||
}
|
||||
if (*(pos + 1) != '\0') /* check that subsequent char is '/'*/
|
||||
{
|
||||
if (*(pos + 1) != '/')
|
||||
rc = false;
|
||||
}
|
||||
pos = _strchr(pos + 1, *c);
|
||||
}
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Reverse a string.
|
||||
* Linux utility function for Linux to enable Windows/Linux portability
|
||||
* @param astr the character string to reverse
|
||||
* @return pointer to the reversed string which was reversed in place
|
||||
*/
|
||||
char* _strrev(char* astr)
|
||||
{
|
||||
char* forwards = astr;
|
||||
int len = os_strlen(astr);
|
||||
if (len > 1)
|
||||
{
|
||||
char* backwards = astr + len - 1;
|
||||
while (forwards < backwards)
|
||||
{
|
||||
char temp = *forwards;
|
||||
*forwards++ = *backwards;
|
||||
*backwards-- = temp;
|
||||
}
|
||||
}
|
||||
return astr;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Does a topic string contain wildcards?
|
||||
* @param topic the topic name string
|
||||
* @return boolean value indicating whether the topic contains a wildcard or not
|
||||
*/
|
||||
int Topics_hasWildcards(char* topic)
|
||||
{
|
||||
return (_strchr(topic, '+') != NULL) || (_strchr(topic, '#') != NULL);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tests whether one topic string matches another where one can contain wildcards.
|
||||
* @param wildTopic a topic name string that can contain wildcards
|
||||
* @param topic a topic name string that must not contain wildcards
|
||||
* @return boolean value indicating whether topic matches wildTopic
|
||||
*/
|
||||
int Topics_matches(char* wildTopic, int wildcards, char* topic)
|
||||
{
|
||||
int rc = false;
|
||||
char *last1 = NULL, *last2 = NULL;
|
||||
char *pwild = NULL, *pmatch = NULL;
|
||||
|
||||
if (!wildcards)
|
||||
{
|
||||
rc = (os_strcmp(wildTopic, topic) == 0);
|
||||
goto exit;
|
||||
}
|
||||
|
||||
if (Topics_hasWildcards(topic))
|
||||
{
|
||||
//os_printf("Topics_matches: should not be wildcard in topic %s", topic);
|
||||
goto exit;
|
||||
}
|
||||
if (!Topics_isValidName(wildTopic))
|
||||
{
|
||||
//os_printf("Topics_matches: invalid topic name %s", wildTopic);
|
||||
goto exit;
|
||||
}
|
||||
if (!Topics_isValidName(topic))
|
||||
{
|
||||
//os_printf("Topics_matches: invalid topic name %s", topic);
|
||||
goto exit;
|
||||
}
|
||||
|
||||
if (strcmp(wildTopic, MULTI_LEVEL_WILDCARD) == 0 || /* Hash matches anything... */
|
||||
strcmp(wildTopic, topic) == 0)
|
||||
{
|
||||
rc = true;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
if (strcmp(wildTopic, "/#") == 0) /* Special case for /# matches anything starting with / */
|
||||
{
|
||||
rc = (topic[0] == '/') ? true : false;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
/* because strtok will return bill when matching /bill/ or bill in a topic name for the first time,
|
||||
* we have to check whether the first character is / explicitly.
|
||||
*/
|
||||
if ((wildTopic[0] == TOPIC_LEVEL_SEPARATOR[0]) && (topic[0] != TOPIC_LEVEL_SEPARATOR[0]))
|
||||
goto exit;
|
||||
|
||||
if ((wildTopic[0] == SINGLE_LEVEL_WILDCARD[0]) && (topic[0] == TOPIC_LEVEL_SEPARATOR[0]))
|
||||
goto exit;
|
||||
|
||||
/* We only match hash-first topics in reverse, for speed */
|
||||
if (wildTopic[0] == MULTI_LEVEL_WILDCARD[0])
|
||||
{
|
||||
wildTopic = (char*)_strrev(_strdup(wildTopic));
|
||||
topic = (char*)_strrev(_strdup(topic));
|
||||
}
|
||||
else
|
||||
{
|
||||
wildTopic = (char*)_strdup(wildTopic);
|
||||
topic = (char*)_strdup(topic);
|
||||
}
|
||||
|
||||
pwild = _strtok_r(wildTopic, TOPIC_LEVEL_SEPARATOR, &last1);
|
||||
pmatch = _strtok_r(topic, TOPIC_LEVEL_SEPARATOR, &last2);
|
||||
|
||||
/* Step through the subscription, level by level */
|
||||
while (pwild != NULL)
|
||||
{
|
||||
/* Have we got # - if so, it matches anything. */
|
||||
if (strcmp(pwild, MULTI_LEVEL_WILDCARD) == 0)
|
||||
{
|
||||
rc = true;
|
||||
break;
|
||||
}
|
||||
/* Nope - check for matches... */
|
||||
if (pmatch != NULL)
|
||||
{
|
||||
if (strcmp(pwild, SINGLE_LEVEL_WILDCARD) != 0 && strcmp(pwild, pmatch) != 0)
|
||||
/* The two levels simply don't match... */
|
||||
break;
|
||||
}
|
||||
else
|
||||
break; /* No more tokens to match against further tokens in the wildcard stream... */
|
||||
pwild = _strtok_r(NULL, TOPIC_LEVEL_SEPARATOR, &last1);
|
||||
pmatch = _strtok_r(NULL, TOPIC_LEVEL_SEPARATOR, &last2);
|
||||
}
|
||||
|
||||
/* All tokens up to here matched, and we didn't end in #. If there
|
||||
are any topic tokens remaining, the match is bad, otherwise it was
|
||||
a good match. */
|
||||
if (pmatch == NULL && pwild == NULL)
|
||||
rc = true;
|
||||
|
||||
/* Now free the memory allocated in strdup() */
|
||||
os_free(wildTopic);
|
||||
os_free(topic);
|
||||
exit:
|
||||
return rc;
|
||||
} /* end matches*/
|
||||
|
||||
#ifdef UNIT_TEST
|
||||
#if !defined(ARRAY_SIZE)
|
||||
/**
|
||||
* Macro to calculate the number of entries in an array
|
||||
*/
|
||||
#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
|
||||
#endif
|
||||
|
||||
int test()
|
||||
{
|
||||
int i;
|
||||
|
||||
struct
|
||||
{
|
||||
char* str;
|
||||
} tests0[] = {
|
||||
"#", "jj",
|
||||
"+/a", "adkj/a",
|
||||
"+/a", "adsjk/adakjd/a", "a/+", "a/#", "#/a"
|
||||
};
|
||||
|
||||
for (i = 0; i < sizeof(tests0)/sizeof(char*); ++i)
|
||||
{
|
||||
os_printf("topic %s, isValidName %d\n", tests0[i].str, Topics_isValidName(tests0[i].str));
|
||||
//assert(Topics_isValidName(tests0[i].str) == 1);
|
||||
}
|
||||
|
||||
struct
|
||||
{
|
||||
char* wild;
|
||||
char* topic;
|
||||
int result;
|
||||
} tests1[] = {
|
||||
{ "#", "jj" , 1},
|
||||
{ "+/a", "adkj/a", 1},
|
||||
{ "+/a", "adsjk/adakjd/a", 0},
|
||||
{ "+/+/a", "adsjk/adakjd/a", 1},
|
||||
{ "#/a", "adsjk/adakjd/a", 1},
|
||||
{ "test/#", "test/1", 1},
|
||||
{ "test/+", "test/1", 1},
|
||||
{ "+", "test1", 1},
|
||||
{ "+", "test1/k", 0},
|
||||
{ "+", "/test1/k", 0},
|
||||
{ "/+", "test1/k", 0},
|
||||
{ "+", "/jkj", 0},
|
||||
{ "/+", "/test1", 1},
|
||||
{ "+/+", "/test1", 0},
|
||||
{ "+/+", "test1/k", 1},
|
||||
{ "/#", "/test1/k", 1},
|
||||
{ "/#", "test1/k", 0},
|
||||
};
|
||||
|
||||
for (i = 0; i < ARRAY_SIZE(tests1); ++i)
|
||||
{
|
||||
os_printf("wild: %s, topic %s, result %d %d (should)\n", tests1[i].wild, tests1[i].topic,
|
||||
Topics_matches(_strdup(tests1[i].wild), 1, _strdup(tests1[i].topic)), tests1[i].result);
|
||||
//assert(Topics_matches(_strdup(tests1[i].wild), _strdup(tests1[i].topic)) == tests1[i].result);
|
||||
}
|
||||
|
||||
struct
|
||||
{
|
||||
char* str;
|
||||
char* result;
|
||||
} tests2[] = {
|
||||
{ "#", "#" },
|
||||
{ "ab", "ba" },
|
||||
{ "abc", "cba" },
|
||||
{ "abcd", "dcba" },
|
||||
{ "abcde", "edcba" }
|
||||
};
|
||||
for (i = 0; i < 5; ++i)
|
||||
{
|
||||
os_printf("str: %s, _strrev %s\n", tests2[i].str, _strrev(_strdup(tests2[i].str)));
|
||||
//assert(strcmp(tests2[i].result, _strrev(_strdup(tests2[i].str))) == 0);
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
|
@ -1,5 +1,5 @@
|
|||
#include "proto.h"
|
||||
#include "ringbuf.h"
|
||||
#include "ringbuf_mqtt.h"
|
||||
I8 ICACHE_FLASH_ATTR PROTO_Init(PROTO_PARSER *parser, PROTO_PARSE_CALLBACK *completeCallback, U8 *buf, U16 bufSize)
|
||||
{
|
||||
parser->buf = buf;
|
||||
|
|
20
mqtt/queue.c
20
mqtt/queue.c
|
@ -34,11 +34,6 @@
|
|||
#include "os_type.h"
|
||||
#include "mem.h"
|
||||
#include "proto.h"
|
||||
|
||||
uint8_t *last_rb_p_r;
|
||||
uint8_t *last_rb_p_w;
|
||||
uint32_t last_fill_cnt;
|
||||
|
||||
void ICACHE_FLASH_ATTR QUEUE_Init(QUEUE *queue, int bufferSize)
|
||||
{
|
||||
queue->buf = (uint8_t*)os_zalloc(bufferSize);
|
||||
|
@ -46,20 +41,7 @@ void ICACHE_FLASH_ATTR QUEUE_Init(QUEUE *queue, int bufferSize)
|
|||
}
|
||||
int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t* buffer, uint16_t len)
|
||||
{
|
||||
uint32_t ret;
|
||||
|
||||
last_rb_p_r = queue->rb.p_r;
|
||||
last_rb_p_w = queue->rb.p_w;
|
||||
last_fill_cnt = queue->rb.fill_cnt;
|
||||
|
||||
ret = PROTO_AddRb(&queue->rb, buffer, len);
|
||||
if (ret == -1) {
|
||||
// rolling ring buffer back
|
||||
queue->rb.p_r = last_rb_p_r;
|
||||
queue->rb.p_w = last_rb_p_w;
|
||||
queue->rb.fill_cnt = last_fill_cnt;
|
||||
}
|
||||
return ret;
|
||||
return PROTO_AddRb(&queue->rb, buffer, len);
|
||||
}
|
||||
int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t* buffer, uint16_t* len, uint16_t maxLen)
|
||||
{
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
* Ring Buffer library
|
||||
*/
|
||||
|
||||
#include "ringbuf.h"
|
||||
#include "ringbuf_mqtt.h"
|
||||
|
||||
|
||||
/**
|
|
@ -0,0 +1,87 @@
|
|||
/*-
|
||||
* Copyright (c) 1998 Softweyr LLC. All rights reserved.
|
||||
*
|
||||
* strtok_r, from Berkeley strtok
|
||||
* Oct 13, 1998 by Wes Peters <wes@softweyr.com>
|
||||
*
|
||||
* Copyright (c) 1988, 1993
|
||||
* The Regents of the University of California. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions
|
||||
* are met:
|
||||
* 1. Redistributions of source code must retain the above copyright
|
||||
* notices, this list of conditions and the following disclaimer.
|
||||
* 2. Redistributions in binary form must reproduce the above copyright
|
||||
* notices, this list of conditions and the following disclaimer in the
|
||||
* documentation and/or other materials provided with the distribution.
|
||||
* 3. All advertising materials mentioning features or use of this software
|
||||
* must display the following acknowledgement:
|
||||
* This product includes software developed by Softweyr LLC, the
|
||||
* University of California, Berkeley, and its contributors.
|
||||
* 4. Neither the name of the University nor the names of its contributors
|
||||
* may be used to endorse or promote products derived from this software
|
||||
* without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY SOFTWEYR LLC, THE REGENTS AND CONTRIBUTORS
|
||||
* ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
|
||||
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SOFTWEYR LLC, THE
|
||||
* REGENTS, OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
|
||||
* TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
|
||||
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
|
||||
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
|
||||
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
||||
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
#include "os_type.h"
|
||||
#include "osapi.h"
|
||||
#include "mem.h"
|
||||
|
||||
char *
|
||||
_strtok_r(char *s, const char *delim, char **last)
|
||||
{
|
||||
char *spanp, *tok;
|
||||
int c, sc;
|
||||
|
||||
if (s == NULL && (s = *last) == NULL)
|
||||
return (NULL);
|
||||
|
||||
/*
|
||||
* Skip (span) leading delimiters (s += strspn(s, delim), sort of).
|
||||
*/
|
||||
cont:
|
||||
c = *s++;
|
||||
for (spanp = (char *)delim; (sc = *spanp++) != 0;) {
|
||||
if (c == sc)
|
||||
goto cont;
|
||||
}
|
||||
|
||||
if (c == 0) { /* no non-delimiter characters */
|
||||
*last = NULL;
|
||||
return (NULL);
|
||||
}
|
||||
tok = s - 1;
|
||||
|
||||
/*
|
||||
* Scan token (scan for delimiters: s += strcspn(s, delim), sort of).
|
||||
* Note that delim must have one NUL; we stop if we see that, too.
|
||||
*/
|
||||
for (;;) {
|
||||
c = *s++;
|
||||
spanp = (char *)delim;
|
||||
do {
|
||||
if ((sc = *spanp++) == c) {
|
||||
if (c == 0)
|
||||
s = NULL;
|
||||
else
|
||||
s[-1] = '\0';
|
||||
*last = s;
|
||||
return (tok);
|
||||
}
|
||||
} while (sc != 0);
|
||||
}
|
||||
/* NOTREACHED */
|
||||
}
|
Ładowanie…
Reference in New Issue