automatic mem limit; cleanup after STA disconnect

pull/16/head
Martin Ger 2017-11-08 21:34:36 +01:00
rodzic 23c54e7cd7
commit 9250e3b533
10 zmienionych plików z 51 dodań i 32 usunięć

Wyświetl plik

@ -68,7 +68,7 @@ MQTT broker related command:
- set broker_access _mode_: controls the networks that allow MQTT broker access (0: no access, 1: only internal, 2: only external, 3: both (default))
- set broker_subscriptions _max_: sets the max number of subscription the broker can store (default: 30)
- set broker_retained_messages _max_: sets the max number of retained messages the broker can store (default: 30)
- set broker_clients _clients_max_: sets the max number of concurrent client connections (default: 8)
- set broker_clients _clients_max_: sets the max number of concurrent client connections (default: 0 = mem is the only limit)
- save_retained: saves the current state of all retained topics (max. 4096 Bytes in sum) to flash, so they will persist a reboot
- delete_retained: deletes the state of all retained topics in RAM and flash
- set broker_autoretain [0|1]: selects, whether the broker should do a "save_retained" automatically each time it receives a new retained message (default off). With this option on the broker can be resetted at any time without loosing state. However, this is slow and too many writes may damage flash mem.

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Plik binarny nie jest wyświetlany.

Wyświetl plik

@ -1,2 +1,2 @@
007d430ba40457630d310b60c45fad3f1b027a77 0x00000.bin
3cf10a8b1e8c9b77b4db3b2f189f940de2dfe434 0x10000.bin
a726db57bd8cb2567ee8411ebd7d729370222a8b 0x00000.bin
0415e1b1018874ba371502501df2b25cf34eb5fb 0x10000.bin

Wyświetl plik

@ -39,7 +39,10 @@ typedef struct _MQTT_ClientCon {
} MQTT_ClientCon;
extern MQTT_ClientCon *clientcon_list;
uint16_t MQTT_CountClientCon();
uint16_t MQTT_server_countClientCon();
void MQTT_server_disconnectClientCon(MQTT_ClientCon *mqttClientCon);
bool MQTT_server_deleteClientCon(MQTT_ClientCon *mqttClientCon);
bool MQTT_server_start(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics);
void MQTT_server_onConnect(MqttConnectCallback connectCb);

Wyświetl plik

@ -125,7 +125,7 @@ bool ICACHE_FLASH_ATTR activate_next_client() {
static uint8_t shared_out_buffer[MQTT_BUF_SIZE];
bool ICACHE_FLASH_ATTR MQTT_InitClientCon(MQTT_ClientCon * mqttClientCon) {
bool ICACHE_FLASH_ATTR MQTT_server_initClientCon(MQTT_ClientCon * mqttClientCon) {
uint32_t temp;
MQTT_INFO("MQTT:InitClientCon\r\n");
@ -154,14 +154,14 @@ bool ICACHE_FLASH_ATTR MQTT_InitClientCon(MQTT_ClientCon * mqttClientCon) {
return true;
}
uint16_t ICACHE_FLASH_ATTR MQTT_CountClientCon() {
uint16_t ICACHE_FLASH_ATTR MQTT_server_countClientCon() {
MQTT_ClientCon *p;
uint16_t count = 0;
for (p = clientcon_list; p != NULL; p = p->next, count++);
return count;
}
bool ICACHE_FLASH_ATTR MQTT_DeleteClientCon(MQTT_ClientCon * mqttClientCon) {
bool ICACHE_FLASH_ATTR MQTT_server_deleteClientCon(MQTT_ClientCon * mqttClientCon) {
MQTT_INFO("MQTT:DeleteClientCon\r\n");
if (mqttClientCon == NULL)
@ -258,7 +258,7 @@ bool ICACHE_FLASH_ATTR MQTT_DeleteClientCon(MQTT_ClientCon * mqttClientCon) {
return true;
}
void ICACHE_FLASH_ATTR MQTT_ServerDisconnect(MQTT_ClientCon * mqttClientCon) {
void ICACHE_FLASH_ATTR MQTT_server_disconnectClientCon(MQTT_ClientCon * mqttClientCon) {
MQTT_INFO("MQTT:ServerDisconnect\r\n");
mqttClientCon->mqtt_state.message_length_read = 0;
@ -328,7 +328,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns
if (clientcon->mqtt_state.message_length < sizeof(struct mqtt_connect_variable_header) + 3) {
MQTT_ERROR("MQTT: Too short Connect message\r\n");
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
}
@ -421,7 +421,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns
// Must be all 0 if no lwt is given
if (clientcon->connect_info.will_retain || clientcon->connect_info.will_qos) {
MQTT_WARNING("MQTT: Last Will flags invalid\r\n");
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
}
} else {
@ -431,7 +431,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns
if (lw_topic == NULL) {
MQTT_WARNING("MQTT: Last Will topic invalid\r\n");
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
}
@ -441,7 +441,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns
clientcon->connect_info.will_topic[lw_topic_len] = 0;
if (Topics_hasWildcards(clientcon->connect_info.will_topic)) {
MQTT_WARNING("MQTT: Last Will topic has wildcards\r\n");
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
}
MQTT_INFO("MQTT: LWT topic %s\r\n", clientcon->connect_info.will_topic);
@ -461,7 +461,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns
if (lw_data == NULL) {
MQTT_WARNING("MQTT: Last Will data invalid\r\n");
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
}
@ -487,7 +487,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns
if (username == NULL) {
MQTT_WARNING("MQTT: Username invalid\r\n");
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
}
@ -510,7 +510,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns
if ((variable_header->flags & MQTT_CONNECT_FLAG_USERNAME) == 0) {
MQTT_WARNING("MQTT: Password without username\r\n");
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
}
@ -555,7 +555,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns
default:
MQTT_WARNING("MQTT: Invalid message\r\n");
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
}
clientcon->mqtt_state.outbound_message = mqtt_msg_connack(&clientcon->mqtt_state.mqtt_connection, msg_conn_ret);
@ -577,7 +577,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns
// 2B fixed header + 2B variable header + 2 len + 1 char + 1 QoS
if (clientcon->mqtt_state.message_length < 8) {
MQTT_ERROR("MQTT: Too short Subscribe message\r\n");
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
}
msg_id = mqtt_get_id(clientcon->mqtt_state.in_buffer, clientcon->mqtt_state.in_buffer_length);
@ -589,14 +589,14 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns
topic_str = mqtt_get_str(&clientcon->mqtt_state.in_buffer[topic_index], &topic_len);
if (topic_str == NULL) {
MQTT_WARNING("MQTT: Subscribe topic invalid\r\n");
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
}
topic_index += 2 + topic_len;
if (topic_index >= clientcon->mqtt_state.message_length) {
MQTT_WARNING("MQTT: Subscribe QoS missing\r\n");
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
}
uint8_t topic_QoS = clientcon->mqtt_state.in_buffer[topic_index++];
@ -629,7 +629,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns
// 2B fixed header + 2B variable header + 2 len + 1 char
if (clientcon->mqtt_state.message_length < 7) {
MQTT_ERROR("MQTT: Too short Unsubscribe message\r\n");
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
}
msg_id = mqtt_get_id(clientcon->mqtt_state.in_buffer, clientcon->mqtt_state.in_buffer_length);
@ -640,7 +640,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns
char *topic_str = mqtt_get_str(&clientcon->mqtt_state.in_buffer[topic_index], &topic_len);
if (topic_str == NULL) {
MQTT_WARNING("MQTT: Subscribe topic invalid\r\n");
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
}
topic_index += 2 + topic_len;
@ -713,7 +713,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_recv_cb(void *arg, char *pdata, uns
os_free(clientcon->connect_info.will_topic);
clientcon->connect_info.will_topic = NULL;
}
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
return;
/*
@ -777,7 +777,7 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_discon_cb(void *arg) {
MQTT_ClientCon *clientcon = (MQTT_ClientCon *) pCon->reverse;
MQTT_INFO("MQTT_ClientCon_discon_cb(): client disconnected\n");
MQTT_DeleteClientCon(clientcon);
MQTT_server_deleteClientCon(clientcon);
}
static void ICACHE_FLASH_ATTR MQTT_ClientCon_sent_cb(void *arg) {
@ -816,17 +816,23 @@ static void ICACHE_FLASH_ATTR MQTT_ClientCon_connected_cb(void *arg) {
return;
}
MQTT_InitClientCon(mqttClientCon);
mqttClientCon->pCon = pespconn;
os_timer_setfn(&mqttClientCon->mqttTimer, (os_timer_func_t *) mqtt_server_timer, mqttClientCon);
os_timer_arm(&mqttClientCon->mqttTimer, 1000, 1);
bool no_mem = (system_get_free_heap_size() < (MQTT_BUF_SIZE + QUEUE_BUFFER_SIZE + 0x400));
if (no_mem) {
MQTT_ERROR("ERROR: No mem for new client connection\r\n");
}
if (local_connect_cb != NULL && local_connect_cb(pespconn, MQTT_CountClientCon()) == false) {
if (no_mem || (local_connect_cb != NULL && local_connect_cb(pespconn, MQTT_server_countClientCon()+1) == false)) {
mqttClientCon->connState = TCP_DISCONNECT;
system_os_post(MQTT_SERVER_TASK_PRIO, 0, (os_param_t) mqttClientCon);
return;
}
MQTT_server_initClientCon(mqttClientCon);
os_timer_setfn(&mqttClientCon->mqttTimer, (os_timer_func_t *) mqtt_server_timer, mqttClientCon);
os_timer_arm(&mqttClientCon->mqttTimer, 1000, 1);
}
void ICACHE_FLASH_ATTR MQTT_ServerTask(os_event_t * e) {
@ -861,7 +867,7 @@ void ICACHE_FLASH_ATTR MQTT_ServerTask(os_event_t * e) {
break;
}
if (clientcon->connState == TCP_DISCONNECTING) {
MQTT_ServerDisconnect(clientcon);
MQTT_server_disconnectClientCon(clientcon);
}
break;
}

Wyświetl plik

@ -39,7 +39,7 @@ void config_load_default(sysconfig_p config) {
config->max_subscriptions = 30;
config->max_retained_messages = 30;
config->max_clients = 8;
config->max_clients = 0;
config->auto_retained = 0;
os_sprintf(config->mqtt_broker_user, "%s", "none");
config->mqtt_broker_password[0] = 0;

Wyświetl plik

@ -51,7 +51,7 @@ typedef struct
uint16_t max_subscriptions; // Upper limit on subscribed topics
uint16_t max_retained_messages; // Upper limit on stored retained messages
uint16_t max_clients; // Upper limit on concurrently connected clients
uint16_t max_clients; // Upper limit on concurrently connected clients (0: mem is the limit)
uint8_t auto_retained; // Automatically save retained messages to flash (default: off)
uint8_t mqtt_broker_user[32]; // Username for client login, "none" if empty
uint8_t mqtt_broker_password[32]; // Password for client login

Wyświetl plik

@ -620,7 +620,7 @@ void ICACHE_FLASH_ATTR console_handle_command(struct espconn *pespconn) {
MQTT_ClientCon *clientcon;
int ccnt = 0;
os_sprintf(response, "Current clients: %d\r\n", MQTT_CountClientCon());
os_sprintf(response, "Current clients: %d\r\n", MQTT_server_countClientCon());
to_console(response);
for (clientcon = clientcon_list; clientcon != NULL; clientcon = clientcon->next, ccnt++) {
os_sprintf(response, "%s%s", clientcon->connect_info.client_id, clientcon->next != NULL ? ", " : "");
@ -1505,8 +1505,18 @@ void wifi_handle_event_cb(System_Event_t * evt) {
evt->event_info.disconnected.ssid, evt->event_info.disconnected.reason);
connected = false;
MQTT_ClientCon *clientcon, *clientcon_tmp;
for (clientcon = clientcon_list; clientcon != NULL; ) {
clientcon_tmp = clientcon;
clientcon = clientcon->next;
if (clientcon_tmp->pCon->state == ESPCONN_CLOSE) {
MQTT_server_deleteClientCon(clientcon_tmp);
}
}
#ifdef MQTT_CLIENT
if (mqtt_enabled)
// Missing test for local
MQTT_Disconnect(&mqttClient);
#endif /* MQTT_CLIENT */
@ -1675,7 +1685,7 @@ bool ICACHE_FLASH_ATTR mqtt_broker_connect(struct espconn *pesp_conn, uint16_t c
return false;
}
if (client_count > config.max_clients) {
if (config.max_clients != 0 && client_count > config.max_clients) {
os_printf("Client disconnected - too many concurrent clients\r\n");
return false;
}