persistence for retained topics

pull/16/head
Martin Ger 2017-10-20 12:45:56 +02:00
rodzic ecd5d72ffa
commit 969329fedb
10 zmienionych plików z 175 dodań i 19 usunięć

Wyświetl plik

@ -4,19 +4,33 @@
#include "user_interface.h"
extern "C" {
// Interface for starting the broker
bool MQTT_server_start(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics);
// Callbacks for message reception, username/password authentication, and client connection
typedef void (*MqttDataCallback)(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t lengh);
typedef bool (*MqttAuthCallback)(const char* username, const char *password, struct espconn *pesp_conn);
typedef bool (*MqttConnectCallback)(struct espconn *pesp_conn);
bool MQTT_server_start(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics);
void MQTT_server_onData(MqttDataCallback dataCb);
void MQTT_server_onAuth(MqttAuthCallback authCb);
void MQTT_server_onConnect(MqttConnectCallback connectCb);
// Interface for local pub/sub interaction with the broker
bool MQTT_local_publish(uint8_t* topic, uint8_t* data, uint16_t data_length, uint8_t qos, uint8_t retain);
bool MQTT_local_subscribe(uint8_t* topic, uint8_t qos);
bool MQTT_local_unsubscribe(uint8_t* topic);
// Interface for persistence of retained topics
// Topics can be serialized to a buffer and reinitialized later after reboot
// Application is responsible for saving and restoring that buffer (i.e. to/from flash)
void clear_retainedtopics();
int serialize_retainedtopics(char *buf, int len);
bool deserialize_retainedtopics(char *buf, int len);
}
#endif /* _MQTT_SERVER_H_ */

Wyświetl plik

@ -68,6 +68,8 @@ MQTT broker related command:
- set broker_access _mode_: controls the networks that allow MQTT broker access (0: no access, 1: only internal, 2: only external, 3: both (default))
- set broker_subscriptions _max_: sets the max number of subscription the broker can store (default: 30)
- set broker_retained_messages _max_: sets the max number of retained messages the broker can store (default: 30)
- save_retained: saves the current state of all retained topics (max. 4096 Bytes in sum) to flash, so they will persist a reboot
- delete_retained: deletes the state of all retained topics in RAM and flash
# MQTT client/bridging functionality
The broker comes with a "local" and a "remote" client, which means, the broker itself can publish and subscribe topics. The "local" client is a client to the own broker (without the need of an additional TCP connection).

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Wyświetl plik

@ -1,2 +1,2 @@
9721851a0a8243a8b30ae118c0881b16387e4a03 0x00000.bin
ab6551180b6e386ecda34e2fc292152743cfa934 0x10000.bin
71c696ae9c870fb376447990d95424a0ff3f14c0 0x00000.bin
c1a223297832120386ac24137a95070cd7aa077e 0x10000.bin

Wyświetl plik

@ -14,8 +14,12 @@ typedef bool (*iterate_retainedtopic_cb)(retained_entry *topic, void *user_data)
typedef bool (*find_retainedtopic_cb)(retained_entry *topic, MQTT_ClientCon *clientcon);
bool create_retainedlist(uint16_t num_entires);
void clear_retainedtopics();
bool update_retainedtopic(uint8_t *topic, uint8_t *data, uint16_t data_len, uint8_t qos);
bool find_retainedtopic(uint8_t *topic, find_retainedtopic_cb cb, MQTT_ClientCon *clientcon);
void iterate_retainedtopics(iterate_retainedtopic_cb cb, void *user_data);
int serialize_retainedtopics(char *buf, int len);
bool deserialize_retainedtopics(char *buf, int len);
#endif /* _MQTT_RETAINEDLIST_H_ */

Wyświetl plik

@ -19,7 +19,7 @@ bool ICACHE_FLASH_ATTR create_retainedlist(uint16_t num_entires) {
return retained_list != NULL;
}
bool update_retainedtopic(uint8_t * topic, uint8_t * data, uint16_t data_len, uint8_t qos) {
bool ICACHE_FLASH_ATTR update_retainedtopic(uint8_t * topic, uint8_t * data, uint16_t data_len, uint8_t qos) {
uint16_t i;
if (retained_list == NULL)
@ -120,3 +120,57 @@ void ICACHE_FLASH_ATTR iterate_retainedtopics(iterate_retainedtopic_cb cb, void
}
}
}
bool ICACHE_FLASH_ATTR clear_cb(retained_entry *entry, void *user_data) {
update_retainedtopic(entry->topic, "", 0, entry->qos);
return false;
}
void ICACHE_FLASH_ATTR clear_retainedtopics() {
iterate_retainedtopics(clear_cb, NULL);
}
int ICACHE_FLASH_ATTR serialize_retainedtopics(char *buf, int len) {
uint16_t i;
uint16_t pos = 0;
if (retained_list == NULL)
return 0;
for (i = 0; i < max_entry; i++) {
if (retained_list[i].topic != NULL) {
uint16_t data_len = retained_list[i].data_len;
if (pos + os_strlen(retained_list[i].topic) + 4 + data_len + 1 >= len-1)
return 0;
os_strcpy(&buf[pos], retained_list[i].topic);
pos += os_strlen(retained_list[i].topic) + 1;
buf[pos++] = data_len & 0xff;
buf[pos++] = (data_len >> 8) & 0xff;
os_memcpy(&buf[pos], retained_list[i].data, data_len);
pos += data_len;
buf[pos++] = retained_list[i].qos;
buf[pos] = '\0';
}
}
return pos;
}
bool ICACHE_FLASH_ATTR deserialize_retainedtopics(char *buf, int len) {
uint16_t pos = 0;
while (pos < len && buf[pos] != '\0') {
uint8_t *topic = &buf[pos];
pos += os_strlen(topic) + 1;
if (pos >= len) return false;
uint16_t data_len = buf[pos++] + (buf[pos++] << 8);
uint8_t *data = &buf[pos];
pos += data_len;
if (pos >= len) return false;
uint8_t qos = buf[pos++];
if (update_retainedtopic(topic, data, data_len, qos) == false)
return false;
}
return true;
}

Wyświetl plik

@ -104,6 +104,15 @@
#define MAX_CON_SEND_SIZE 1024
#define MAX_CON_CMD_SIZE 160
//
// Flash save slots (currently max. 0-2)
//
#define SCRIPT_SLOT 0
#define VARS_SLOT 1
#define RETAINED_SLOT 2
#define MAX_RETAINED_LEN 0x1000
typedef enum {SIG_DO_NOTHING=0, SIG_START_SERVER=1, SIG_UART0, SIG_TOPIC_RECEIVED, SIG_SCRIPT_LOADED, SIG_SCRIPT_HTTP_LOADED, SIG_CONSOLE_TX_RAW, SIG_CONSOLE_TX, SIG_CONSOLE_RX} USER_SIGNALS;
#define LOCAL_ACCESS 0x01

Wyświetl plik

@ -183,9 +183,9 @@ void ICACHE_FLASH_ATTR http_script_cb(char *response_body, int http_status, char
os_memcpy(&load_script[4], response_body, body_size);
load_script[4 + body_size] = '\0';
*(uint32_t *) load_script = body_size + 5;
blob_save(0, (uint32_t *) load_script, body_size + 5);;
blob_save(SCRIPT_SLOT, (uint32_t *) load_script, body_size + 5);;
os_free(load_script);
blob_zero(1, MAX_FLASH_SLOTS * FLASH_SLOT_LEN);
blob_zero(VARS_SLOT, MAX_FLASH_SLOTS * FLASH_SLOT_LEN);
os_sprintf(response, "\rHTTP script download completed (%d Bytes)\r\n", body_size);
to_console(response);
@ -198,9 +198,9 @@ static void ICACHE_FLASH_ATTR script_discon_cb(void *arg) {
load_script[4 + load_size] = '\0';
*(uint32_t *) load_script = load_size + 5;
blob_save(0, (uint32_t *) load_script, load_size + 5);
blob_save(SCRIPT_SLOT, (uint32_t *) load_script, load_size + 5);
os_free(load_script);
blob_zero(1, MAX_FLASH_SLOTS * FLASH_SLOT_LEN);
blob_zero(VARS_SLOT, MAX_FLASH_SLOTS * FLASH_SLOT_LEN);
os_sprintf(response, "\rScript upload completed (%d Bytes)\r\n", load_size);
to_console(response);
@ -225,7 +225,7 @@ static void ICACHE_FLASH_ATTR script_connected_cb(void *arg) {
uint32_t ICACHE_FLASH_ATTR get_script_size(void) {
uint32_t size;
blob_load(0, &size, 4);
blob_load(SCRIPT_SLOT, &size, 4);
return size;
}
@ -242,7 +242,7 @@ uint32_t ICACHE_FLASH_ATTR read_script(void) {
return 0;
}
blob_load(0, (uint32_t *) my_script, size);
blob_load(SCRIPT_SLOT, (uint32_t *) my_script, size);
uint32_t num_token = text_into_tokens(my_script + 4);
@ -383,6 +383,31 @@ bool ICACHE_FLASH_ATTR printf_retainedtopic(retained_entry * entry, void *user_d
return false;
}
bool ICACHE_FLASH_ATTR delete_retainedtopics() {
clear_retainedtopics();
blob_zero(RETAINED_SLOT, MAX_RETAINED_LEN);
}
bool ICACHE_FLASH_ATTR save_retainedtopics() {
uint8_t buffer[MAX_RETAINED_LEN];
int len = sizeof(buffer);
len = serialize_retainedtopics(buffer, len);
if (len) {
blob_save(RETAINED_SLOT, (uint32_t *)buffer, len);
return true;
}
return false;
}
bool ICACHE_FLASH_ATTR load_retainedtopics() {
uint8_t buffer[MAX_RETAINED_LEN];
int len = sizeof(buffer);
blob_load(RETAINED_SLOT, (uint32_t *)buffer, len);
return deserialize_retainedtopics(buffer, len);
}
void MQTT_local_DataCallback(uint32_t * args, const char *topic, uint32_t topic_len, const char *data, uint32_t length) {
//os_printf("Received: \"%s\" len: %d\r\n", topic, length);
#ifdef SCRIPTED
@ -435,6 +460,8 @@ void ICACHE_FLASH_ATTR console_handle_command(struct espconn *pespconn) {
to_console(response);
os_sprintf(response, "set [broker_subscriptions|broker_retained_messages] <val>\r\n");
to_console(response);
os_sprintf(response, "delete_retained|save_retained\r\n");
to_console(response);
os_sprintf(response, "publish [local|remote] <topic> <data>\r\n");
to_console(response);
#ifdef SCRIPTED
@ -634,7 +661,7 @@ void ICACHE_FLASH_ATTR console_handle_command(struct espconn *pespconn) {
goto command_handled;
}
blob_load(0, (uint32_t *) script, size);
blob_load(SCRIPT_SLOT, (uint32_t *) script, size);
p = script + 4;
for (line_count = 1; line_count < start_line && *p != 0; p++) {
@ -682,7 +709,7 @@ void ICACHE_FLASH_ATTR console_handle_command(struct espconn *pespconn) {
}
uint8_t slots[MAX_FLASH_SLOTS*FLASH_SLOT_LEN];
blob_load(1, (uint32_t *)slots, sizeof(slots));
blob_load(VARS_SLOT, (uint32_t *)slots, sizeof(slots));
for (i = 0; i < MAX_FLASH_SLOTS; i++) {
os_sprintf(response, "@%d: %s\r\n", i+1, &slots[i*FLASH_SLOT_LEN]);
@ -727,9 +754,10 @@ void ICACHE_FLASH_ATTR console_handle_command(struct espconn *pespconn) {
config_load_default(&config);
config_save(&config);
#ifdef SCRIPTED
// Clear script and vars
blob_zero(0, MAX_SCRIPT_SIZE);
blob_zero(1, MAX_FLASH_SLOTS * FLASH_SLOT_LEN);
// Clear script, vars, and retained topics
blob_zero(SCRIPT_SLOT, MAX_SCRIPT_SIZE);
blob_zero(VARS_SLOT, MAX_FLASH_SLOTS * FLASH_SLOT_LEN);
blob_zero(RETAINED_SLOT, MAX_RETAINED_LEN);
#endif
}
os_printf("Restarting ... \r\n");
@ -869,6 +897,45 @@ void ICACHE_FLASH_ATTR console_handle_command(struct espconn *pespconn) {
goto command_handled;
}
if (strcmp(tokens[0], "delete_retained") == 0)
{
if (nTokens != 1) {
os_sprintf(response, INVALID_NUMARGS);
goto command_handled;
}
delete_retainedtopics();
os_sprintf(response, "Deleted retained topics\r\n");
goto command_handled;
}
if (strcmp(tokens[0], "save_retained") == 0)
{
if (nTokens != 1) {
os_sprintf(response, INVALID_NUMARGS);
goto command_handled;
}
bool success = save_retainedtopics();
os_sprintf(response, "Saved retained topics %ssuccessfully\r\n", success?"":"un");
goto command_handled;
}
/*
if (strcmp(tokens[0], "load_retained") == 0)
{
if (nTokens != 1) {
os_sprintf(response, INVALID_NUMARGS);
goto command_handled;
}
bool success = load_retainedtopics();
os_sprintf(response, "Loaded retained topics %ssuccessfully\r\n", success?"":"un");
goto command_handled;
}
*/
if (strcmp(tokens[0], "set") == 0) {
if (config.locked) {
os_sprintf(response, INVALID_LOCKED);
@ -1081,9 +1148,9 @@ void ICACHE_FLASH_ATTR console_handle_command(struct espconn *pespconn) {
} else {
slot_no--;
uint8_t slots[MAX_FLASH_SLOTS*FLASH_SLOT_LEN];
blob_load(1, (uint32_t *)slots, sizeof(slots));
blob_load(VARS_SLOT, (uint32_t *)slots, sizeof(slots));
os_strcpy(&slots[slot_no*FLASH_SLOT_LEN], tokens[2]);
blob_save(1, (uint32_t *)slots, sizeof(slots));
blob_save(VARS_SLOT, (uint32_t *)slots, sizeof(slots));
os_sprintf(response, "%s written to flash\r\n", tokens[1]);
}
goto command_handled;
@ -1576,6 +1643,11 @@ void user_init() {
// Load config
int config_res = config_load(&config);
if (config_res != 0) {
// Clear retained topics slot
blob_zero(RETAINED_SLOT, MAX_RETAINED_LEN);
}
#ifdef SCRIPTED
script_enabled = false;
if ((config_res == 0) && read_script()) {
@ -1592,8 +1664,8 @@ void user_init() {
}
} else {
// Clear script and vars
blob_zero(0, MAX_SCRIPT_SIZE);
blob_zero(1, MAX_FLASH_SLOTS * FLASH_SLOT_LEN);
blob_zero(SCRIPT_SLOT, MAX_SCRIPT_SIZE);
blob_zero(VARS_SLOT, MAX_FLASH_SLOTS * FLASH_SLOT_LEN);
}
#endif
@ -1681,6 +1753,7 @@ void user_init() {
MQTT_server_start(1883 /*port */ , config.max_subscriptions,
config.max_retained_messages);
load_retainedtopics();
}
//Start task