Autosave of retained topics and script acces

pull/16/head
Martin Ger 2017-10-21 14:51:01 +02:00
rodzic b081def24b
commit f480661f9b
11 zmienionych plików z 108 dodań i 17 usunięć

Wyświetl plik

@ -70,6 +70,7 @@ MQTT broker related command:
- set broker_retained_messages _max_: sets the max number of retained messages the broker can store (default: 30)
- save_retained: saves the current state of all retained topics (max. 4096 Bytes in sum) to flash, so they will persist a reboot
- delete_retained: deletes the state of all retained topics in RAM and flash
- set broker_autoretain [0|1]: selects, whether the broker should do a "save_retained" automatically each time it receives a new retained message (thus, the broker can be resetted at any time without loosing state. However, slow and too many writes may damage flash)
# MQTT client/bridging functionality
The broker comes with a "local" and a "remote" client, which means, the broker itself can publish and subscribe topics. The "local" client is a client to the own broker (without the need of an additional TCP connection).
@ -81,7 +82,7 @@ By default the "remote" MQTT client is disabled. It can be enabled by setting th
- set mqtt_user _password_: Password for authentication
- set mqtt_ssl [0|1]: Use SSL for connection to the remote broker (default: 0 = off)
- set mqtt_id _clientId_: Id of the client at the broker (default: "ESPRouter_xxxxxx" derived from the MAC address)
- publish [local|remote] [topic] [data]: this publishes a topic (mainly for testing)
- publish [local|remote] _topic_ _data_ [retained]: this publishes a topic (mainly for testing)
The remote MQTT server can be accessed via SSL, e.g. a secure test connection to test.mosquitto.org can be configured as following:
```

Wyświetl plik

@ -52,7 +52,8 @@ In general, scripts conform to the following BNF:
system <expr> |
<action> <action>
<expr> ::= <val> | <val> <op> <expr> | (<expr>) | not (<expr>) | json_parse (<expr>,<expr>)
<expr> ::= <val> | <val> <op> <expr> | (<expr>) | not (<expr>) | |
retained_topic(<expr>) | json_parse (<expr>,<expr>)
<op> := '=' | '>' | gte | str_ge | str_gte | '+' | '-' | '*' | '|' | div
@ -189,6 +190,11 @@ not(<expr>)
```
Interpretes the argument expression as boolean and inverts the result.
```
retained_topic(<expr>)
```
Interpretes the argument as topic name (incl. wildcards) and searches the first local retained topic that matches this name. The stored value of this topic is returned (empty, if nothing found). Can be used to check the status of the system synchronously without the need to subscribe for that retained topic, wait for status changes and store them in a variable.
```
json_parse (<expr>,<expr>)
```

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Wyświetl plik

@ -1,2 +1,2 @@
9dad862d14876b1c50a0f8777e7a9b72e449082a 0x00000.bin
edd4dd0092fe30212e168c4bf091ba7849dbdf34 0x10000.bin
22ebe22a250ca88f3fa0171622de438124613d4f 0x00000.bin
b94613a0f1410928080aa2209f67aa927b630868 0x10000.bin

Wyświetl plik

@ -11,15 +11,18 @@ typedef struct _retained_entry {
} retained_entry;
typedef bool (*iterate_retainedtopic_cb)(retained_entry *topic, void *user_data);
typedef bool (*find_retainedtopic_cb)(retained_entry *topic, MQTT_ClientCon *clientcon);
typedef bool (*find_retainedtopic_cb)(retained_entry *topic, void *user_data);
typedef void (*on_retainedtopic_cb)(retained_entry *topic);
bool create_retainedlist(uint16_t num_entires);
void clear_retainedtopics();
bool update_retainedtopic(uint8_t *topic, uint8_t *data, uint16_t data_len, uint8_t qos);
bool find_retainedtopic(uint8_t *topic, find_retainedtopic_cb cb, MQTT_ClientCon *clientcon);
bool find_retainedtopic(uint8_t *topic, find_retainedtopic_cb cb, void *user_data);
void iterate_retainedtopics(iterate_retainedtopic_cb cb, void *user_data);
int serialize_retainedtopics(char *buf, int len);
bool deserialize_retainedtopics(char *buf, int len);
void set_on_retainedtopic_cb(on_retainedtopic_cb cb);
#endif /* _MQTT_RETAINEDLIST_H_ */

Wyświetl plik

@ -12,10 +12,12 @@
static retained_entry *retained_list = NULL;
static uint16_t max_entry;
static on_retainedtopic_cb retained_cb = NULL;
bool ICACHE_FLASH_ATTR create_retainedlist(uint16_t num_entires) {
max_entry = num_entires;
retained_list = (retained_entry *) os_zalloc(num_entires * sizeof(retained_entry));
retained_cb = NULL;
return retained_list != NULL;
}
@ -61,6 +63,8 @@ bool ICACHE_FLASH_ATTR update_retainedtopic(uint8_t * topic, uint8_t * data, uin
os_free(retained_list[i].data);
retained_list[i].data = NULL;
retained_list[i].data_len = 0;
if (retained_cb != NULL)
retained_cb(NULL);
return true;
}
@ -85,11 +89,13 @@ bool ICACHE_FLASH_ATTR update_retainedtopic(uint8_t * topic, uint8_t * data, uin
os_memcpy(retained_list[i].data, data, data_len);
retained_list[i].data_len = data_len;
retained_list[i].qos = qos;
if (retained_cb != NULL)
retained_cb(&retained_list[i]);
return true;
}
bool ICACHE_FLASH_ATTR find_retainedtopic(uint8_t * topic, find_retainedtopic_cb cb, MQTT_ClientCon * clientcon) {
bool ICACHE_FLASH_ATTR find_retainedtopic(uint8_t * topic, find_retainedtopic_cb cb, void *user_data) {
uint16_t i;
bool retval = false;
@ -99,7 +105,7 @@ bool ICACHE_FLASH_ATTR find_retainedtopic(uint8_t * topic, find_retainedtopic_cb
for (i = 0; i < max_entry; i++) {
if (retained_list[i].topic != NULL) {
if (Topics_matches(topic, 1, retained_list[i].topic)) {
(*cb) (&retained_list[i], clientcon);
(*cb) (&retained_list[i], user_data);
retval = true;
}
}
@ -153,6 +159,11 @@ int ICACHE_FLASH_ATTR serialize_retainedtopics(char *buf, int len) {
buf[pos] = '\0';
}
}
if (pos == 0) {
buf[pos++] = '\0';
}
return pos;
}
@ -174,3 +185,7 @@ bool ICACHE_FLASH_ATTR deserialize_retainedtopics(char *buf, int len) {
}
return true;
}
void ICACHE_FLASH_ATTR set_on_retainedtopic_cb(on_retainedtopic_cb cb) {
retained_cb = cb;
}

Wyświetl plik

@ -76,8 +76,9 @@ bool ICACHE_FLASH_ATTR publish_topic(topic_entry * topic_e, uint8_t * topic, uin
return true;
}
bool ICACHE_FLASH_ATTR publish_retainedtopic(retained_entry * entry, MQTT_ClientCon * clientcon) {
bool ICACHE_FLASH_ATTR publish_retainedtopic(retained_entry * entry, void* user_data) {
uint16_t message_id = 0;
MQTT_ClientCon *clientcon = (MQTT_ClientCon *)user_data;
MQTT_INFO("MQTT: Client: %s Topic: \"%s\" QoS: %d\r\n", clientcon->connect_info.client_id, entry->topic,
entry->qos);

Wyświetl plik

@ -5,6 +5,7 @@
#include "user_config.h"
#include "config_flash.h"
#include "mqtt_topics.h"
#include "mqtt_retainedlist.h"
#ifdef NTP
#include "ntp.h"
#endif
@ -1033,6 +1034,11 @@ int ICACHE_FLASH_ATTR parse_action(int next_token, bool doit) {
return next_token;
}
bool ICACHE_FLASH_ATTR retained_cb(retained_entry *topic, void *user_data) {
*(retained_entry **)user_data = topic;
return true;
}
int ICACHE_FLASH_ATTR parse_expression(int next_token, char **data, int *data_len, Value_Type * data_type, bool doit) {
if (is_token(next_token, "not")) {
@ -1051,6 +1057,38 @@ int ICACHE_FLASH_ATTR parse_expression(int next_token, char **data, int *data_le
*data_len = 1;
*data_type = STRING_T;
}
else if (is_token(next_token, "retained_topic")) {
lang_debug("val retained_topic\r\n");
len_check(3);
if (syn_chk && !is_token(next_token+1, "("))
return syntax_error(next_token, "expected '('");
char *topic_data;
int topic_data_len;
Value_Type topic_data_type;
// parse path string
if ((next_token = parse_expression(next_token + 2, &topic_data, &topic_data_len, &topic_data_type, doit)) == -1)
return -1;
if (syn_chk && !is_token(next_token, ")"))
return syntax_error(next_token, "expected ')'");
next_token += 1;
*data = "";
*data_len = 0;
*data_type = DATA_T;
if (doit) {
retained_entry *retained_entry_p;
if (find_retainedtopic(topic_data, retained_cb, &retained_entry_p)) {
*data_len = retained_entry_p->data_len > sizeof(tmp_buffer)-1? sizeof(tmp_buffer)-1 : retained_entry_p->data_len;
os_memcpy(tmp_buffer, retained_entry_p->data, *data_len);
tmp_buffer[*data_len] = '\0';
*data = tmp_buffer;
}
}
}
#ifdef GPIO
else if (is_token(next_token, "gpio_in")) {
lang_debug("val gpio_in\r\n");

Wyświetl plik

@ -423,7 +423,7 @@ static char INVALID_NUMARGS[] = "Invalid number of arguments\r\n";
static char INVALID_ARG[] = "Invalid argument\r\n";
void ICACHE_FLASH_ATTR console_handle_command(struct espconn *pespconn) {
#define MAX_CMD_TOKENS 4
#define MAX_CMD_TOKENS 6
char cmd_line[MAX_CON_CMD_SIZE + 1];
char response[256];
@ -458,11 +458,11 @@ void ICACHE_FLASH_ATTR console_handle_command(struct espconn *pespconn) {
to_console(response);
os_sprintf(response, "set [broker_user|broker_password|broker_access] <val>\r\n");
to_console(response);
os_sprintf(response, "set [broker_subscriptions|broker_retained_messages] <val>\r\n");
os_sprintf(response, "set [broker_subscriptions|broker_retained_messages|broker_autoretain] <val>\r\n");
to_console(response);
os_sprintf(response, "delete_retained|save_retained\r\n");
to_console(response);
os_sprintf(response, "publish [local|remote] <topic> <data>\r\n");
os_sprintf(response, "publish [local|remote] <topic> <data> [retained]\r\n");
to_console(response);
#ifdef SCRIPTED
os_sprintf(response, "script <port>|<url>|delete\r\nshow [script|vars]\r\n");
@ -527,8 +527,8 @@ void ICACHE_FLASH_ATTR console_handle_command(struct espconn *pespconn) {
to_console(response);
#endif
os_sprintf(response, "MQTT broker max. subscription: %d\r\nMQTT broker max. retained messages: %d\r\n",
config.max_subscriptions, config.max_retained_messages);
os_sprintf(response, "MQTT broker max. subscription: %d\r\nMQTT broker max. retained messages: %d%s\r\n",
config.max_subscriptions, config.max_retained_messages, config.auto_retained?" (auto saved)":"");
to_console(response);
if (os_strcmp(config.mqtt_broker_user, "none") != 0) {
os_sprintf(response,
@ -882,18 +882,32 @@ void ICACHE_FLASH_ATTR console_handle_command(struct espconn *pespconn) {
if (strcmp(tokens[0], "publish") == 0)
{
if (nTokens != 4) {
uint8_t retained = 0;
if (nTokens < 4 || nTokens > 5) {
os_sprintf(response, INVALID_NUMARGS);
goto command_handled;
}
if (nTokens == 5) {
if (strcmp(tokens[4], "retained")==0) {
retained = 1;
} else {
os_sprintf(response, "Invalid arg %s\r\n", tokens[4]);
goto command_handled;
}
}
if (strcmp(tokens[1], "local") == 0) {
MQTT_local_publish(tokens[2], tokens[3], os_strlen(tokens[3]), 0, 0);
MQTT_local_publish(tokens[2], tokens[3], os_strlen(tokens[3]), 0, retained);
}
#ifdef MQTT_CLIENT
else if (strcmp(tokens[1], "remote") == 0 && mqtt_connected) {
MQTT_Publish(&mqttClient, tokens[2], tokens[3], os_strlen(tokens[3]), 0, 0);
MQTT_Publish(&mqttClient, tokens[2], tokens[3], os_strlen(tokens[3]), 0, retained);
}
#endif
else {
os_sprintf(response, "Invalid arg %s\r\n", tokens[1]);
goto command_handled;
}
os_sprintf(response, "Published topic\r\n");
goto command_handled;
}
@ -1135,6 +1149,12 @@ void ICACHE_FLASH_ATTR console_handle_command(struct espconn *pespconn) {
os_sprintf(response, "Broker access set\r\n", config.config_port);
goto command_handled;
}
if (strcmp(tokens[1], "broker_autoretain") == 0) {
config.auto_retained = atoi(tokens[2]) != 0;
os_sprintf(response, "Broker autoretain set\r\n", config.config_port);
goto command_handled;
}
#ifdef SCRIPTED
if (strcmp(tokens[1], "script_logging") == 0) {
lang_logging = atoi(tokens[2]);
@ -1631,6 +1651,12 @@ bool ICACHE_FLASH_ATTR mqtt_broker_connect(struct espconn *pesp_conn) {
}
void ICACHE_FLASH_ATTR mqtt_got_retained(retained_entry *topic) {
if (config.auto_retained)
save_retainedtopics();
}
void user_init() {
struct ip_info info;
@ -1764,6 +1790,7 @@ void user_init() {
MQTT_server_start(1883 /*port */ , config.max_subscriptions,
config.max_retained_messages);
load_retainedtopics();
set_on_retainedtopic_cb(mqtt_got_retained);
}
//Start task